1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::rpc::generate_rpc_builder::{GenerateRpcBuilder, GenerateRpcBuilderParams},
19
    cumulus_client_consensus_common::{
20
        ParachainBlockImport as TParachainBlockImport, ParachainBlockImportMarker,
21
    },
22
    cumulus_client_service::{
23
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, ParachainHostFunctions,
24
        StartRelayChainTasksParams,
25
    },
26
    cumulus_primitives_core::ParaId,
27
    cumulus_relay_chain_interface::{call_runtime_api, OverseerHandle, RelayChainInterface},
28
    dancebox_runtime::{
29
        opaque::{Block, Hash},
30
        RuntimeApi,
31
    },
32
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
33
    dp_slot_duration_runtime_api::TanssiSlotDurationApi,
34
    nimbus_primitives::{NimbusId, NimbusPair},
35
    node_common::service::{MinimalCumulusRuntimeApi, NodeBuilder, NodeBuilderConfig},
36
    sc_basic_authorship::ProposerFactory,
37
    sc_consensus::{BasicQueue, BlockImport},
38
    sc_executor::WasmExecutor,
39
    sc_network::NetworkBlock,
40
    sc_network_sync::SyncingService,
41
    sc_service::{
42
        Configuration, ImportQueue, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
43
    },
44
    sc_telemetry::TelemetryHandle,
45
    sc_tracing::tracing::Instrument,
46
    sc_transaction_pool::TransactionPoolHandle,
47
    sp_api::ProvideRuntimeApi,
48
    sp_consensus::EnableProofRecording,
49
    sp_consensus_aura::SlotDuration,
50
    sp_keystore::KeystorePtr,
51
    std::{marker::PhantomData, sync::Arc, time::Duration},
52
    substrate_prometheus_endpoint::Registry,
53
    tc_consensus::{
54
        collators::lookahead::{
55
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
56
        },
57
        OrchestratorAuraWorkerAuxData,
58
    },
59
    tokio_util::sync::CancellationToken,
60
};
61

            
62
#[allow(deprecated)]
63
use sc_executor::NativeElseWasmExecutor;
64

            
65
type FullBackend = TFullBackend<Block>;
66

            
67
/// Native executor type.
68
pub struct ParachainNativeExecutor;
69

            
70
impl sc_executor::NativeExecutionDispatch for ParachainNativeExecutor {
71
    type ExtendHostFunctions = ParachainHostFunctions;
72

            
73
277872
    fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
74
277872
        dancebox_runtime::api::dispatch(method, data)
75
277872
    }
76

            
77
2760
    fn native_version() -> sc_executor::NativeVersion {
78
2760
        dancebox_runtime::native_version()
79
2760
    }
80
}
81

            
82
#[derive(Default, Copy, Clone)]
83
pub struct ContainerChainNodeConfig<RuntimeApi>(PhantomData<RuntimeApi>);
84
impl<RuntimeApi> NodeBuilderConfig for ContainerChainNodeConfig<RuntimeApi> {
85
    type Block = Block;
86
    /// RuntimeApi is customizable to allow supporting more features than the common subset of
87
    /// runtime api features.
88
    type RuntimeApi = RuntimeApi;
89
    type ParachainExecutor = ContainerChainExecutor;
90
}
91

            
92
impl<RuntimeApi> ContainerChainNodeConfig<RuntimeApi> {
93
    pub fn new() -> Self {
94
        Self(PhantomData)
95
    }
96
}
97

            
98
/// Orchestrator Parachain Block import. We cannot use the one in cumulus as it overrides the best
99
/// chain selection rule
100
#[derive(Clone)]
101
pub struct OrchestratorParachainBlockImport<BI> {
102
    inner: BI,
103
}
104

            
105
impl<BI> OrchestratorParachainBlockImport<BI> {
106
    /// Create a new instance.
107
182
    pub fn new(inner: BI) -> Self {
108
182
        Self { inner }
109
182
    }
110
}
111

            
112
/// We simply rely on the inner
113
#[async_trait::async_trait]
114
impl<BI> BlockImport<Block> for OrchestratorParachainBlockImport<BI>
115
where
116
    BI: BlockImport<Block> + Send + Sync,
117
{
118
    type Error = BI::Error;
119

            
120
    async fn check_block(
121
        &self,
122
        block: sc_consensus::BlockCheckParams<Block>,
123
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
124
        self.inner.check_block(block).await
125
    }
126

            
127
    async fn import_block(
128
        &self,
129
        params: sc_consensus::BlockImportParams<Block>,
130
7274
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
131
7274
        let res = self.inner.import_block(params).await?;
132

            
133
7274
        Ok(res)
134
14548
    }
135
}
136

            
137
/// But we need to implement the ParachainBlockImportMarker trait to fullfil
138
impl<BI> ParachainBlockImportMarker for OrchestratorParachainBlockImport<BI> {}
139

            
140
// Orchestrator chain types
141
#[allow(deprecated)]
142
pub type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;
143
pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
144
pub type ParachainBackend = TFullBackend<Block>;
145
pub type DevParachainBlockImport = OrchestratorParachainBlockImport<Arc<ParachainClient>>;
146
pub type ParachainBlockImport =
147
    TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
148
pub type ParachainProposerFactory = ProposerFactory<
149
    TransactionPoolHandle<Block, ParachainClient>,
150
    ParachainClient,
151
    EnableProofRecording,
152
>;
153

            
154
// Container chains types
155
type ContainerChainExecutor = WasmExecutor<ParachainHostFunctions>;
156
pub type ContainerChainClient<RuntimeApi> = TFullClient<Block, RuntimeApi, ContainerChainExecutor>;
157
pub type ContainerChainBackend = TFullBackend<Block>;
158
type ContainerChainBlockImport<RuntimeApi> =
159
    TParachainBlockImport<Block, Arc<ContainerChainClient<RuntimeApi>>, ContainerChainBackend>;
160

            
161
tp_traits::alias!(
162
    pub trait MinimalContainerRuntimeApi:
163
        MinimalCumulusRuntimeApi<Block, ContainerChainClient<Self>>
164
        + sp_api::ConstructRuntimeApi<
165
            Block,
166
            ContainerChainClient<Self>,
167
            RuntimeApi:
168
                TanssiSlotDurationApi<Block>
169
                + async_backing_primitives::UnincludedSegmentApi<Block>,
170
        >
171
        + Sized
172
);
173

            
174
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
175
///
176
/// This is the actual implementation that is abstract over the executor and the runtime api.
177
pub fn start_node_impl_container<
178
    RuntimeApi: MinimalContainerRuntimeApi,
179
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
180
>(
181
    parachain_config: Configuration,
182
    relay_chain_interface: Arc<dyn RelayChainInterface>,
183
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
184
    keystore: KeystorePtr,
185
    para_id: ParaId,
186
    collation_params: Option<crate::spawner::CollationParams>,
187
    generate_rpc_builder: TGenerateRpcBuilder,
188
) -> impl std::future::Future<
189
    Output = sc_service::error::Result<(
190
        TaskManager,
191
        Arc<ContainerChainClient<RuntimeApi>>,
192
        Arc<ParachainBackend>,
193
    )>,
194
> {
195
    async move {
196
        let parachain_config = prepare_node_config(parachain_config);
197

            
198
        // Create a `NodeBuilder` which helps setup parachain nodes common systems.
199
        let node_builder = ContainerChainNodeConfig::new_builder(&parachain_config, None)?;
200

            
201
        let (block_import, import_queue) =
202
            container_chain_import_queue(&parachain_config, &node_builder);
203
        let import_queue_service = import_queue.service();
204

            
205
        let node_builder = node_builder
206
            .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
207
                &parachain_config,
208
                para_id,
209
                import_queue,
210
                relay_chain_interface.clone(),
211
            )
212
            .await?;
213

            
214
        let force_authoring = parachain_config.force_authoring;
215

            
216
        let prometheus_registry = parachain_config.prometheus_registry().cloned();
217

            
218
        let rpc_builder = generate_rpc_builder.generate(GenerateRpcBuilderParams {
219
            task_manager: &node_builder.task_manager,
220
            container_chain_config: &parachain_config,
221
            client: node_builder.client.clone(),
222
            backend: node_builder.backend.clone(),
223
            sync_service: node_builder.network.sync_service.clone(),
224
            transaction_pool: node_builder.transaction_pool.clone(),
225
            prometheus_registry: node_builder.prometheus_registry.clone(),
226
            command_sink: None,
227
            xcm_senders: None,
228
            network: node_builder.network.network.clone(),
229
        })?;
230

            
231
        let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
232

            
233
        let announce_block = {
234
            let sync_service = node_builder.network.sync_service.clone();
235
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
236
        };
237

            
238
        let relay_chain_slot_duration = Duration::from_secs(6);
239

            
240
        let overseer_handle = relay_chain_interface
241
            .overseer_handle()
242
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
243
        let (mut node_builder, _) = node_builder.extract_import_queue_service();
244

            
245
        start_relay_chain_tasks(StartRelayChainTasksParams {
246
            client: node_builder.client.clone(),
247
            announce_block: announce_block.clone(),
248
            para_id,
249
            relay_chain_interface: relay_chain_interface.clone(),
250
            task_manager: &mut node_builder.task_manager,
251
            da_recovery_profile: if collation_params.is_some() {
252
                DARecoveryProfile::Collator
253
            } else {
254
                DARecoveryProfile::FullNode
255
            },
256
            import_queue: import_queue_service,
257
            relay_chain_slot_duration,
258
            recovery_handle: Box::new(overseer_handle.clone()),
259
            sync_service: node_builder.network.sync_service.clone(),
260
        })?;
261

            
262
        if let Some(collation_params) = collation_params {
263
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
264
            let node_client = node_builder.client.clone();
265
            let node_backend = node_builder.backend.clone();
266

            
267
            start_consensus_container(
268
                node_client.clone(),
269
                node_backend.clone(),
270
                collation_params,
271
                block_import.clone(),
272
                prometheus_registry.clone(),
273
                node_builder.telemetry.as_ref().map(|t| t.handle()).clone(),
274
                node_spawn_handle.clone(),
275
                relay_chain_interface.clone(),
276
                orchestrator_chain_interface.clone(),
277
                node_builder.transaction_pool.clone(),
278
                node_builder.network.sync_service.clone(),
279
                keystore.clone(),
280
                force_authoring,
281
                relay_chain_slot_duration,
282
                para_id,
283
                overseer_handle.clone(),
284
                announce_block.clone(),
285
            );
286
        }
287

            
288
        node_builder.network.start_network.start_network();
289
        Ok((
290
            node_builder.task_manager,
291
            node_builder.client,
292
            node_builder.backend,
293
        ))
294
    }
295
    .instrument(sc_tracing::tracing::info_span!(
296
        sc_tracing::logging::PREFIX_LOG_SPAN,
297
        name = container_log_str(para_id),
298
    ))
299
}
300

            
301
pub fn container_chain_import_queue<RuntimeApi: MinimalContainerRuntimeApi>(
302
    parachain_config: &Configuration,
303
    node_builder: &NodeBuilder<ContainerChainNodeConfig<RuntimeApi>>,
304
) -> (ContainerChainBlockImport<RuntimeApi>, BasicQueue<Block>) {
305
    // The nimbus import queue ONLY checks the signature correctness
306
    // Any other checks corresponding to the author-correctness should be done
307
    // in the runtime
308
    let block_import =
309
        ContainerChainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
310

            
311
    let import_queue = nimbus_consensus::import_queue(
312
        node_builder.client.clone(),
313
        block_import.clone(),
314
        move |_, _| async move {
315
            let time = sp_timestamp::InherentDataProvider::from_system_time();
316

            
317
            Ok((time,))
318
        },
319
        &node_builder.task_manager.spawn_essential_handle(),
320
        parachain_config.prometheus_registry(),
321
        false,
322
    )
323
    .expect("function never fails");
324

            
325
    (block_import, import_queue)
326
}
327

            
328
fn start_consensus_container<RuntimeApi: MinimalContainerRuntimeApi>(
329
    client: Arc<ContainerChainClient<RuntimeApi>>,
330
    backend: Arc<FullBackend>,
331
    collation_params: crate::spawner::CollationParams,
332
    block_import: ContainerChainBlockImport<RuntimeApi>,
333
    prometheus_registry: Option<Registry>,
334
    telemetry: Option<TelemetryHandle>,
335
    spawner: SpawnTaskHandle,
336
    relay_chain_interface: Arc<dyn RelayChainInterface>,
337
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
338
    transaction_pool: Arc<
339
        sc_transaction_pool::TransactionPoolHandle<Block, ContainerChainClient<RuntimeApi>>,
340
    >,
341
    sync_oracle: Arc<SyncingService<Block>>,
342
    keystore: KeystorePtr,
343
    force_authoring: bool,
344
    relay_chain_slot_duration: Duration,
345
    para_id: ParaId,
346
    overseer_handle: OverseerHandle,
347
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
348
) {
349
    let crate::spawner::CollationParams {
350
        collator_key,
351
        orchestrator_tx_pool,
352
        orchestrator_client,
353
        orchestrator_para_id,
354
        solochain,
355
    } = collation_params;
356
    let slot_duration = if solochain {
357
        // Solochains use Babe instead of Aura, which has 6s slot duration
358
        let relay_slot_ms = relay_chain_slot_duration.as_millis();
359
        SlotDuration::from_millis(
360
            u64::try_from(relay_slot_ms).expect("relay chain slot duration overflows u64"),
361
        )
362
    } else {
363
        cumulus_client_consensus_aura::slot_duration(
364
            orchestrator_client
365
                .as_deref()
366
                .expect("solochain is false, orchestrator_client must be Some"),
367
        )
368
        .expect("start_consensus_container: slot duration should exist")
369
    };
370

            
371
    let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
372
        spawner.clone(),
373
        client.clone(),
374
        transaction_pool,
375
        prometheus_registry.as_ref(),
376
        telemetry.clone(),
377
    );
378

            
379
    let proposer = cumulus_client_consensus_proposer::Proposer::new(proposer_factory);
380

            
381
    let collator_service = cumulus_client_collator::service::CollatorService::new(
382
        client.clone(),
383
        Arc::new(spawner.clone()),
384
        announce_block,
385
        client.clone(),
386
    );
387

            
388
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
389
    let relay_chain_interace_for_orch = relay_chain_interface.clone();
390
    let orchestrator_client_for_cidp = orchestrator_client.clone();
391
    let client_for_cidp = client.clone();
392
    let client_for_hash_provider = client.clone();
393
    let client_for_slot_duration = client.clone();
394

            
395
    let code_hash_provider = move |block_hash| {
396
        client_for_hash_provider
397
            .code_at(block_hash)
398
            .ok()
399
            .map(polkadot_primitives::ValidationCode)
400
            .map(|c| c.hash())
401
    };
402
    let buy_core_params = if solochain {
403
        BuyCoreParams::Solochain {}
404
    } else {
405
        BuyCoreParams::Orchestrator {
406
            orchestrator_tx_pool: orchestrator_tx_pool
407
                .expect("solochain is false, orchestrator_tx_pool must be Some"),
408
            orchestrator_client: orchestrator_client
409
                .expect("solochain is false, orchestrator_client must be Some"),
410
        }
411
    };
412

            
413
    let params = LookaheadTanssiAuraParams {
414
        get_current_slot_duration: move |block_hash| {
415
            // Default to 12s if runtime API does not exist
416
            let slot_duration_ms = client_for_slot_duration
417
                .runtime_api()
418
                .slot_duration(block_hash)
419
                .unwrap_or(12_000);
420

            
421
            SlotDuration::from_millis(slot_duration_ms)
422
        },
423
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
424
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
425
            let orchestrator_chain_interface = orchestrator_chain_interface.clone();
426
            let client = client_for_cidp.clone();
427

            
428
            async move {
429
                let authorities_noting_inherent = if solochain {
430
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at_solochain(
431
                        relay_parent,
432
                        &relay_chain_interface,
433
                    )
434
                        .await
435
                } else {
436
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at(
437
                        relay_parent,
438
                        &relay_chain_interface,
439
                        &orchestrator_chain_interface,
440
                        orchestrator_para_id,
441
                    )
442
                        .await
443
                };
444

            
445
                let slot_duration = {
446
                    // Default to 12s if runtime API does not exist
447
                    let slot_duration_ms = client
448
                        .runtime_api()
449
                        .slot_duration(block_hash)
450
                        .unwrap_or(12_000);
451

            
452
                    SlotDuration::from_millis(slot_duration_ms)
453
                };
454

            
455
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
456

            
457
                let slot =
458
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
459
							*timestamp,
460
							slot_duration,
461
						);
462

            
463
                let authorities_noting_inherent = authorities_noting_inherent.ok_or_else(|| {
464
                    Box::<dyn std::error::Error + Send + Sync>::from(
465
                        "Failed to create authoritiesnoting inherent",
466
                    )
467
                })?;
468

            
469
                Ok((slot, timestamp, authorities_noting_inherent))
470
            }
471
        },
472
        get_orchestrator_aux_data: move |_block_hash, (relay_parent, _validation_data)| {
473
            let relay_chain_interace_for_orch = relay_chain_interace_for_orch.clone();
474
            let orchestrator_client_for_cidp = orchestrator_client_for_cidp.clone();
475

            
476
            async move {
477
                if solochain {
478
                    let authorities: Option<Vec<NimbusId>> = call_runtime_api(
479
                        &relay_chain_interace_for_orch,
480
                        "TanssiAuthorityAssignmentApi_para_id_authorities",
481
                        relay_parent,
482
                        &para_id,
483
                    )
484
                    .await?;
485

            
486
                    let authorities = authorities.ok_or_else(|| {
487
                        Box::<dyn std::error::Error + Send + Sync>::from(
488
                            "Failed to fetch authorities with error",
489
                        )
490
                    })?;
491

            
492
                    log::info!(
493
                        "Authorities {:?} found for header {:?}",
494
                        authorities,
495
                        relay_parent
496
                    );
497

            
498
                    let slot_freq: Option<_> = call_runtime_api(
499
                        &relay_chain_interace_for_orch,
500
                        "OnDemandBlockProductionApi_parathread_slot_frequency",
501
                        relay_parent,
502
                        &para_id,
503
                    )
504
                    .await?;
505

            
506
                    let aux_data = OrchestratorAuraWorkerAuxData {
507
                        authorities,
508
                        slot_freq,
509
                    };
510

            
511
                    Ok(aux_data)
512
                } else {
513
                    let latest_header =
514
                        ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::get_latest_orchestrator_head_info(
515
                            relay_parent,
516
                            &relay_chain_interace_for_orch,
517
                            orchestrator_para_id,
518
                        )
519
                            .await;
520

            
521
                    let latest_header = latest_header.ok_or_else(|| {
522
                        Box::<dyn std::error::Error + Send + Sync>::from(
523
                            "Failed to fetch latest header",
524
                        )
525
                    })?;
526

            
527
                    let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
528
                        orchestrator_client_for_cidp
529
                            .as_ref()
530
                            .expect("solochain is false, orchestrator_client must be Some"),
531
                        &latest_header.hash(),
532
                        para_id,
533
                    );
534

            
535
                    let authorities = authorities.ok_or_else(|| {
536
                        Box::<dyn std::error::Error + Send + Sync>::from(
537
                            "Failed to fetch authorities with error",
538
                        )
539
                    })?;
540

            
541
                    log::info!(
542
                        "Authorities {:?} found for header {:?}",
543
                        authorities,
544
                        latest_header
545
                    );
546

            
547
                    let slot_freq = tc_consensus::min_slot_freq::<Block, ParachainClient, NimbusPair>(
548
                        orchestrator_client_for_cidp
549
                            .as_ref()
550
                            .expect("solochain is false, orchestrator_client must be Some"),
551
                        &latest_header.hash(),
552
                        para_id,
553
                    );
554

            
555
                    let aux_data = OrchestratorAuraWorkerAuxData {
556
                        authorities,
557
                        slot_freq,
558
                    };
559

            
560
                    Ok(aux_data)
561
                }
562
            }
563
        },
564
        block_import,
565
        para_client: client,
566
        relay_client: relay_chain_interface,
567
        sync_oracle,
568
        keystore,
569
        collator_key,
570
        para_id,
571
        overseer_handle,
572
        orchestrator_slot_duration: slot_duration,
573
        force_authoring,
574
        relay_chain_slot_duration,
575
        proposer,
576
        collator_service,
577
        authoring_duration: Duration::from_millis(2000),
578
        para_backend: backend,
579
        code_hash_provider,
580
        // This cancellation token is no-op as it is not shared outside.
581
        cancellation_token: CancellationToken::new(),
582
        buy_core_params,
583
    };
584

            
585
    let (fut, _exit_notification_receiver) =
586
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
587
            params,
588
        );
589
    spawner.spawn("tanssi-aura-container", None, fut);
590
}
591

            
592
// Log string that will be shown for the container chain: `[Container-2000]`.
593
// This needs to be a separate function because the `prefix_logs_with` macro
594
// has trouble parsing expressions.
595
fn container_log_str(para_id: ParaId) -> String {
596
    format!("Container-{}", para_id)
597
}