1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::cli::ContainerChainCli,
19
    crate::rpc::generate_rpc_builder::{GenerateRpcBuilder, GenerateRpcBuilderParams},
20
    cumulus_client_consensus_common::{
21
        ParachainBlockImport as TParachainBlockImport, ParachainBlockImportMarker,
22
    },
23
    cumulus_client_service::{
24
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, ParachainHostFunctions,
25
        StartRelayChainTasksParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::{call_runtime_api, OverseerHandle, RelayChainInterface},
29
    dancebox_runtime::{
30
        opaque::{Block, Hash},
31
        RuntimeApi,
32
    },
33
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
34
    dp_slot_duration_runtime_api::TanssiSlotDurationApi,
35
    nimbus_primitives::{NimbusId, NimbusPair},
36
    node_common::service::node_builder::{
37
        MinimalCumulusRuntimeApi, NodeBuilder, NodeBuilderConfig,
38
    },
39
    sc_basic_authorship::ProposerFactory,
40
    sc_consensus::{BasicQueue, BlockImport},
41
    sc_executor::WasmExecutor,
42
    sc_network::NetworkBackend,
43
    sc_network::NetworkBlock,
44
    sc_network_sync::SyncingService,
45
    sc_service::{
46
        Configuration, ImportQueue, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
47
    },
48
    sc_telemetry::TelemetryHandle,
49
    sc_tracing::tracing::Instrument,
50
    sc_transaction_pool::TransactionPoolHandle,
51
    sp_api::ProvideRuntimeApi,
52
    sp_consensus::EnableProofRecording,
53
    sp_consensus_aura::SlotDuration,
54
    sp_keystore::KeystorePtr,
55
    std::{marker::PhantomData, sync::Arc, time::Duration},
56
    substrate_prometheus_endpoint::Registry,
57
    tc_consensus::{
58
        collators::lookahead::{
59
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
60
        },
61
        OrchestratorAuraWorkerAuxData,
62
    },
63
    tokio_util::sync::CancellationToken,
64
};
65

            
66
type FullBackend = TFullBackend<Block>;
67

            
68
#[derive(Default, Copy, Clone)]
69
pub struct ContainerChainNodeConfig<RuntimeApi>(PhantomData<RuntimeApi>);
70
impl<RuntimeApi> NodeBuilderConfig for ContainerChainNodeConfig<RuntimeApi> {
71
    type Block = Block;
72
    /// RuntimeApi is customizable to allow supporting more features than the common subset of
73
    /// runtime api features.
74
    type RuntimeApi = RuntimeApi;
75
    type ParachainExecutor = ContainerChainExecutor;
76
}
77

            
78
impl<RuntimeApi> ContainerChainNodeConfig<RuntimeApi> {
79
    pub fn new() -> Self {
80
        Self(PhantomData)
81
    }
82
}
83

            
84
/// Orchestrator Parachain Block import. We cannot use the one in cumulus as it overrides the best
85
/// chain selection rule
86
#[derive(Clone)]
87
pub struct OrchestratorParachainBlockImport<BI> {
88
    inner: BI,
89
}
90

            
91
impl<BI> OrchestratorParachainBlockImport<BI> {
92
    /// Create a new instance.
93
198
    pub fn new(inner: BI) -> Self {
94
198
        Self { inner }
95
198
    }
96
}
97

            
98
/// We simply rely on the inner
99
#[async_trait::async_trait]
100
impl<BI> BlockImport<Block> for OrchestratorParachainBlockImport<BI>
101
where
102
    BI: BlockImport<Block> + Send + Sync,
103
{
104
    type Error = BI::Error;
105

            
106
    async fn check_block(
107
        &self,
108
        block: sc_consensus::BlockCheckParams<Block>,
109
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
110
        self.inner.check_block(block).await
111
    }
112

            
113
    async fn import_block(
114
        &self,
115
        params: sc_consensus::BlockImportParams<Block>,
116
15808
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
117
7904
        let res = self.inner.import_block(params).await?;
118

            
119
7904
        Ok(res)
120
15808
    }
121
}
122

            
123
/// But we need to implement the ParachainBlockImportMarker trait to fullfil
124
impl<BI> ParachainBlockImportMarker for OrchestratorParachainBlockImport<BI> {}
125

            
126
// Orchestrator chain types
127
pub type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
128
pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
129
pub type ParachainBackend = TFullBackend<Block>;
130
pub type DevParachainBlockImport = OrchestratorParachainBlockImport<Arc<ParachainClient>>;
131
pub type ParachainBlockImport =
132
    TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
133
pub type ParachainProposerFactory = ProposerFactory<
134
    TransactionPoolHandle<Block, ParachainClient>,
135
    ParachainClient,
136
    EnableProofRecording,
137
>;
138

            
139
// Container chains types
140
type ContainerChainExecutor = WasmExecutor<ParachainHostFunctions>;
141
pub type ContainerChainClient<RuntimeApi> = TFullClient<Block, RuntimeApi, ContainerChainExecutor>;
142
pub type ContainerChainBackend = TFullBackend<Block>;
143
type ContainerChainBlockImport<RuntimeApi> =
144
    TParachainBlockImport<Block, Arc<ContainerChainClient<RuntimeApi>>, ContainerChainBackend>;
145

            
146
tp_traits::alias!(
147
    pub trait MinimalContainerRuntimeApi:
148
        MinimalCumulusRuntimeApi<Block, ContainerChainClient<Self>>
149
        + sp_api::ConstructRuntimeApi<
150
            Block,
151
            ContainerChainClient<Self>,
152
            RuntimeApi:
153
                TanssiSlotDurationApi<Block>
154
                + async_backing_primitives::UnincludedSegmentApi<Block>,
155
        >
156
        + Sized
157
);
158

            
159
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
160
///
161
/// This is the actual implementation that is abstract over the executor and the runtime api.
162
pub fn start_node_impl_container<
163
    'a,
164
    RuntimeApi: MinimalContainerRuntimeApi + 'a,
165
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi> + 'a,
166
    Net: NetworkBackend<Block, Hash>,
167
>(
168
    parachain_config: Configuration,
169
    relay_chain_interface: Arc<dyn RelayChainInterface>,
170
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
171
    keystore: KeystorePtr,
172
    para_id: ParaId,
173
    collation_params: Option<crate::spawner::CollationParams>,
174
    generate_rpc_builder: TGenerateRpcBuilder,
175
    container_chain_cli: &'a ContainerChainCli,
176
    data_preserver: bool,
177
) -> impl std::future::Future<
178
    Output = sc_service::error::Result<(
179
        TaskManager,
180
        Arc<ContainerChainClient<RuntimeApi>>,
181
        Arc<ParachainBackend>,
182
    )>,
183
> + 'a {
184
    async move {
185
        let parachain_config = prepare_node_config(parachain_config);
186

            
187
        // Create a `NodeBuilder` which helps setup parachain nodes common systems.
188
        let node_builder = ContainerChainNodeConfig::new_builder(&parachain_config, None)?;
189

            
190
        let (block_import, import_queue) = container_chain_import_queue(
191
            &parachain_config,
192
            &node_builder,
193
            container_chain_cli,
194
            data_preserver,
195
        );
196
        let import_queue_service = import_queue.service();
197

            
198
        let node_builder = node_builder
199
            .build_cumulus_network::<_, Net>(
200
                &parachain_config,
201
                para_id,
202
                import_queue,
203
                relay_chain_interface.clone(),
204
            )
205
            .await?;
206

            
207
        let force_authoring = parachain_config.force_authoring;
208

            
209
        let prometheus_registry = parachain_config.prometheus_registry().cloned();
210

            
211
        // Disable RPC if the flag is set
212
        let rpc_builder = if !container_chain_cli.base.disable_rpc {
213
            generate_rpc_builder.generate(GenerateRpcBuilderParams {
214
                task_manager: &node_builder.task_manager,
215
                container_chain_config: &parachain_config,
216
                client: node_builder.client.clone(),
217
                backend: node_builder.backend.clone(),
218
                sync_service: node_builder.network.sync_service.clone(),
219
                transaction_pool: node_builder.transaction_pool.clone(),
220
                prometheus_registry: node_builder.prometheus_registry.clone(),
221
                command_sink: None,
222
                xcm_senders: None,
223
                network: node_builder.network.network.clone(),
224
            })?
225
        } else {
226
            log::info!("RPC service disabled for bootnode-only node");
227
            crate::rpc::dummy_rpc_builder()
228
        };
229

            
230
        let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
231

            
232
        let announce_block = {
233
            let sync_service = node_builder.network.sync_service.clone();
234
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
235
        };
236

            
237
        let relay_chain_slot_duration = Duration::from_secs(6);
238

            
239
        let overseer_handle = relay_chain_interface
240
            .overseer_handle()
241
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
242
        let (mut node_builder, _) = node_builder.extract_import_queue_service();
243

            
244
        start_relay_chain_tasks(StartRelayChainTasksParams {
245
            client: node_builder.client.clone(),
246
            announce_block: announce_block.clone(),
247
            para_id,
248
            relay_chain_interface: relay_chain_interface.clone(),
249
            task_manager: &mut node_builder.task_manager,
250
            da_recovery_profile: if collation_params.is_some() {
251
                DARecoveryProfile::Collator
252
            } else {
253
                DARecoveryProfile::FullNode
254
            },
255
            import_queue: import_queue_service,
256
            relay_chain_slot_duration,
257
            recovery_handle: Box::new(overseer_handle.clone()),
258
            sync_service: node_builder.network.sync_service.clone(),
259
            prometheus_registry: prometheus_registry.as_ref(),
260
        })?;
261

            
262
        if let Some(collation_params) = collation_params {
263
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
264
            let node_client = node_builder.client.clone();
265
            let node_backend = node_builder.backend.clone();
266

            
267
            start_consensus_container(
268
                node_client.clone(),
269
                node_backend.clone(),
270
                collation_params,
271
                block_import.clone(),
272
                prometheus_registry.clone(),
273
                node_builder.telemetry.as_ref().map(|t| t.handle()).clone(),
274
                node_spawn_handle.clone(),
275
                relay_chain_interface.clone(),
276
                orchestrator_chain_interface.clone(),
277
                node_builder.transaction_pool.clone(),
278
                node_builder.network.sync_service.clone(),
279
                keystore.clone(),
280
                force_authoring,
281
                relay_chain_slot_duration,
282
                para_id,
283
                overseer_handle.clone(),
284
                announce_block.clone(),
285
                container_chain_cli.base.experimental_max_pov_percentage,
286
            );
287
        }
288

            
289
        Ok((
290
            node_builder.task_manager,
291
            node_builder.client,
292
            node_builder.backend,
293
        ))
294
    }
295
    .instrument(sc_tracing::tracing::info_span!(
296
        sc_tracing::logging::PREFIX_LOG_SPAN,
297
        name = container_log_str(para_id),
298
    ))
299
}
300

            
301
pub fn container_chain_import_queue<RuntimeApi: MinimalContainerRuntimeApi>(
302
    parachain_config: &Configuration,
303
    node_builder: &NodeBuilder<ContainerChainNodeConfig<RuntimeApi>>,
304
    container_chain_cli: &ContainerChainCli,
305
    data_preserver: bool,
306
) -> (ContainerChainBlockImport<RuntimeApi>, BasicQueue<Block>) {
307
    // The nimbus import queue ONLY checks the signature correctness
308
    // Any other checks corresponding to the author-correctness should be done
309
    // in the runtime
310
    let block_import =
311
        ContainerChainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
312

            
313
    // Disable gap creation to check if that avoids block history download in warp sync.
314
    // Create gap means download block history. If the user passes `--download-block-history`, we
315
    // set dont_create_gap=false, so create_gap=true, which is the default behavior in polkadot.
316
    let dont_create_gap = !container_chain_cli.base.download_block_history.unwrap_or(
317
        // Default value for download_block_history:
318
        // false if running a collator
319
        // true if running a data preserver node
320
        data_preserver,
321
    );
322

            
323
    let import_queue = nimbus_consensus::import_queue(
324
        node_builder.client.clone(),
325
        block_import.clone(),
326
        move |_, _| async move {
327
            let time = sp_timestamp::InherentDataProvider::from_system_time();
328

            
329
            Ok((time,))
330
        },
331
        &node_builder.task_manager.spawn_essential_handle(),
332
        parachain_config.prometheus_registry(),
333
        false,
334
        dont_create_gap,
335
    )
336
    .expect("function never fails");
337

            
338
    (block_import, import_queue)
339
}
340

            
341
fn start_consensus_container<RuntimeApi: MinimalContainerRuntimeApi>(
342
    client: Arc<ContainerChainClient<RuntimeApi>>,
343
    backend: Arc<FullBackend>,
344
    collation_params: crate::spawner::CollationParams,
345
    block_import: ContainerChainBlockImport<RuntimeApi>,
346
    prometheus_registry: Option<Registry>,
347
    telemetry: Option<TelemetryHandle>,
348
    spawner: SpawnTaskHandle,
349
    relay_chain_interface: Arc<dyn RelayChainInterface>,
350
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
351
    transaction_pool: Arc<
352
        sc_transaction_pool::TransactionPoolHandle<Block, ContainerChainClient<RuntimeApi>>,
353
    >,
354
    sync_oracle: Arc<SyncingService<Block>>,
355
    keystore: KeystorePtr,
356
    force_authoring: bool,
357
    relay_chain_slot_duration: Duration,
358
    para_id: ParaId,
359
    overseer_handle: OverseerHandle,
360
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
361
    max_pov_percentage: Option<u32>,
362
) {
363
    let crate::spawner::CollationParams {
364
        collator_key,
365
        orchestrator_tx_pool,
366
        orchestrator_client,
367
        orchestrator_para_id,
368
        solochain,
369
    } = collation_params;
370
    let slot_duration = if solochain {
371
        // Solochains use Babe instead of Aura, which has 6s slot duration
372
        let relay_slot_ms = relay_chain_slot_duration.as_millis();
373
        SlotDuration::from_millis(
374
            u64::try_from(relay_slot_ms).expect("relay chain slot duration overflows u64"),
375
        )
376
    } else {
377
        cumulus_client_consensus_aura::slot_duration(
378
            orchestrator_client
379
                .as_deref()
380
                .expect("solochain is false, orchestrator_client must be Some"),
381
        )
382
        .expect("start_consensus_container: slot duration should exist")
383
    };
384

            
385
    let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
386
        spawner.clone(),
387
        client.clone(),
388
        transaction_pool,
389
        prometheus_registry.as_ref(),
390
        telemetry.clone(),
391
    );
392

            
393
    let proposer = cumulus_client_consensus_proposer::Proposer::new(proposer_factory);
394

            
395
    let collator_service = cumulus_client_collator::service::CollatorService::new(
396
        client.clone(),
397
        Arc::new(spawner.clone()),
398
        announce_block,
399
        client.clone(),
400
    );
401

            
402
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
403
    let relay_chain_interace_for_orch = relay_chain_interface.clone();
404
    let orchestrator_client_for_cidp = orchestrator_client.clone();
405
    let client_for_cidp = client.clone();
406
    let client_for_hash_provider = client.clone();
407
    let client_for_slot_duration = client.clone();
408

            
409
    let code_hash_provider = move |block_hash| {
410
        client_for_hash_provider
411
            .code_at(block_hash)
412
            .ok()
413
            .map(polkadot_primitives::ValidationCode)
414
            .map(|c| c.hash())
415
    };
416
    let buy_core_params = if solochain {
417
        BuyCoreParams::Solochain {}
418
    } else {
419
        BuyCoreParams::Orchestrator {
420
            orchestrator_tx_pool: orchestrator_tx_pool
421
                .expect("solochain is false, orchestrator_tx_pool must be Some"),
422
            orchestrator_client: orchestrator_client
423
                .expect("solochain is false, orchestrator_client must be Some"),
424
        }
425
    };
426

            
427
    let params = LookaheadTanssiAuraParams {
428
        max_pov_percentage,
429
        get_current_slot_duration: move |block_hash| {
430
            // Default to 12s if runtime API does not exist
431
            let slot_duration_ms = client_for_slot_duration
432
                .runtime_api()
433
                .slot_duration(block_hash)
434
                .unwrap_or(12_000);
435

            
436
            SlotDuration::from_millis(slot_duration_ms)
437
        },
438
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
439
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
440
            let orchestrator_chain_interface = orchestrator_chain_interface.clone();
441
            let client = client_for_cidp.clone();
442

            
443
            async move {
444
                let authorities_noting_inherent = if solochain {
445
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at_solochain(
446
                        relay_parent,
447
                        &relay_chain_interface,
448
                    )
449
                        .await
450
                } else {
451
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at(
452
                        relay_parent,
453
                        &relay_chain_interface,
454
                        &orchestrator_chain_interface,
455
                        orchestrator_para_id,
456
                    )
457
                        .await
458
                };
459

            
460
                let slot_duration = {
461
                    // Default to 12s if runtime API does not exist
462
                    let slot_duration_ms = client
463
                        .runtime_api()
464
                        .slot_duration(block_hash)
465
                        .unwrap_or(12_000);
466

            
467
                    SlotDuration::from_millis(slot_duration_ms)
468
                };
469

            
470
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
471

            
472
                let slot =
473
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
474
							*timestamp,
475
							slot_duration,
476
						);
477

            
478
                let authorities_noting_inherent = authorities_noting_inherent.ok_or_else(|| {
479
                    Box::<dyn std::error::Error + Send + Sync>::from(
480
                        "Failed to create authoritiesnoting inherent",
481
                    )
482
                })?;
483

            
484
                Ok((slot, timestamp, authorities_noting_inherent))
485
            }
486
        },
487
        get_orchestrator_aux_data: move |_block_hash, (relay_parent, _validation_data)| {
488
            let relay_chain_interace_for_orch = relay_chain_interace_for_orch.clone();
489
            let orchestrator_client_for_cidp = orchestrator_client_for_cidp.clone();
490

            
491
            async move {
492
                if solochain {
493
                    let authorities: Option<Vec<NimbusId>> = call_runtime_api(
494
                        &relay_chain_interace_for_orch,
495
                        "TanssiAuthorityAssignmentApi_para_id_authorities",
496
                        relay_parent,
497
                        &para_id,
498
                    )
499
                    .await?;
500

            
501
                    let authorities = authorities.ok_or_else(|| {
502
                        Box::<dyn std::error::Error + Send + Sync>::from(
503
                            "Failed to fetch authorities with error",
504
                        )
505
                    })?;
506

            
507
                    log::info!(
508
                        "Authorities {:?} found for header {:?}",
509
                        authorities,
510
                        relay_parent
511
                    );
512

            
513
                    let slot_freq: Option<_> = call_runtime_api(
514
                        &relay_chain_interace_for_orch,
515
                        "OnDemandBlockProductionApi_parathread_slot_frequency",
516
                        relay_parent,
517
                        &para_id,
518
                    )
519
                    .await?;
520

            
521
                    let aux_data = OrchestratorAuraWorkerAuxData {
522
                        authorities,
523
                        slot_freq,
524
                    };
525

            
526
                    Ok(aux_data)
527
                } else {
528
                    let latest_header =
529
                        ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::get_latest_orchestrator_head_info(
530
                            relay_parent,
531
                            &relay_chain_interace_for_orch,
532
                            orchestrator_para_id,
533
                        )
534
                            .await;
535

            
536
                    let latest_header = latest_header.ok_or_else(|| {
537
                        Box::<dyn std::error::Error + Send + Sync>::from(
538
                            "Failed to fetch latest header",
539
                        )
540
                    })?;
541

            
542
                    let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
543
                        orchestrator_client_for_cidp
544
                            .as_ref()
545
                            .expect("solochain is false, orchestrator_client must be Some"),
546
                        &latest_header.hash(),
547
                        para_id,
548
                    );
549

            
550
                    let authorities = authorities.ok_or_else(|| {
551
                        Box::<dyn std::error::Error + Send + Sync>::from(
552
                            "Failed to fetch authorities with error",
553
                        )
554
                    })?;
555

            
556
                    log::info!(
557
                        "Authorities {:?} found for header {:?}",
558
                        authorities,
559
                        latest_header
560
                    );
561

            
562
                    let slot_freq = tc_consensus::min_slot_freq::<Block, ParachainClient, NimbusPair>(
563
                        orchestrator_client_for_cidp
564
                            .as_ref()
565
                            .expect("solochain is false, orchestrator_client must be Some"),
566
                        &latest_header.hash(),
567
                        para_id,
568
                    );
569

            
570
                    let aux_data = OrchestratorAuraWorkerAuxData {
571
                        authorities,
572
                        slot_freq,
573
                    };
574

            
575
                    Ok(aux_data)
576
                }
577
            }
578
        },
579
        block_import,
580
        para_client: client,
581
        relay_client: relay_chain_interface,
582
        sync_oracle,
583
        keystore,
584
        collator_key,
585
        para_id,
586
        overseer_handle,
587
        orchestrator_slot_duration: slot_duration,
588
        force_authoring,
589
        relay_chain_slot_duration,
590
        proposer,
591
        collator_service,
592
        authoring_duration: Duration::from_millis(2000),
593
        para_backend: backend,
594
        code_hash_provider,
595
        // This cancellation token is no-op as it is not shared outside.
596
        cancellation_token: CancellationToken::new(),
597
        buy_core_params,
598
    };
599

            
600
    let (fut, _exit_notification_receiver) =
601
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
602
            params,
603
        );
604
    spawner.spawn("tanssi-aura-container", None, fut);
605
}
606

            
607
// Log string that will be shown for the container chain: `[Container-2000]`.
608
// This needs to be a separate function because the `prefix_logs_with` macro
609
// has trouble parsing expressions.
610
fn container_log_str(para_id: ParaId) -> String {
611
    format!("Container-{}", para_id)
612
}