1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::rpc::generate_rpc_builder::{GenerateRpcBuilder, GenerateRpcBuilderParams},
19
    cumulus_client_consensus_common::{
20
        ParachainBlockImport as TParachainBlockImport, ParachainBlockImportMarker,
21
    },
22
    cumulus_client_service::{
23
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, ParachainHostFunctions,
24
        StartRelayChainTasksParams,
25
    },
26
    cumulus_primitives_core::ParaId,
27
    cumulus_relay_chain_interface::{
28
        call_remote_runtime_function, OverseerHandle, RelayChainInterface,
29
    },
30
    dancebox_runtime::{
31
        opaque::{Block, Hash},
32
        RuntimeApi,
33
    },
34
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
35
    dp_slot_duration_runtime_api::TanssiSlotDurationApi,
36
    nimbus_primitives::{NimbusId, NimbusPair},
37
    node_common::service::{MinimalCumulusRuntimeApi, NodeBuilder, NodeBuilderConfig},
38
    sc_basic_authorship::ProposerFactory,
39
    sc_consensus::{BasicQueue, BlockImport},
40
    sc_executor::WasmExecutor,
41
    sc_network::NetworkBlock,
42
    sc_network_sync::SyncingService,
43
    sc_service::{
44
        Configuration, ImportQueue, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
45
    },
46
    sc_telemetry::TelemetryHandle,
47
    sc_transaction_pool::FullPool,
48
    sp_api::ProvideRuntimeApi,
49
    sp_consensus::EnableProofRecording,
50
    sp_consensus_aura::SlotDuration,
51
    sp_keystore::KeystorePtr,
52
    std::{marker::PhantomData, sync::Arc, time::Duration},
53
    substrate_prometheus_endpoint::Registry,
54
    tc_consensus::{
55
        collators::lookahead::{
56
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
57
        },
58
        OrchestratorAuraWorkerAuxData,
59
    },
60
    tokio_util::sync::CancellationToken,
61
};
62

            
63
#[allow(deprecated)]
64
use sc_executor::NativeElseWasmExecutor;
65

            
66
type FullBackend = TFullBackend<Block>;
67

            
68
/// Native executor type.
69
pub struct ParachainNativeExecutor;
70

            
71
impl sc_executor::NativeExecutionDispatch for ParachainNativeExecutor {
72
    type ExtendHostFunctions = ParachainHostFunctions;
73

            
74
245886
    fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
75
245886
        dancebox_runtime::api::dispatch(method, data)
76
245886
    }
77

            
78
2700
    fn native_version() -> sc_executor::NativeVersion {
79
2700
        dancebox_runtime::native_version()
80
2700
    }
81
}
82

            
83
#[derive(Default, Copy, Clone)]
84
pub struct ContainerChainNodeConfig<RuntimeApi>(PhantomData<RuntimeApi>);
85
impl<RuntimeApi> NodeBuilderConfig for ContainerChainNodeConfig<RuntimeApi> {
86
    type Block = Block;
87
    /// RuntimeApi is customizable to allow supporting more features than the common subset of
88
    /// runtime api features.
89
    type RuntimeApi = RuntimeApi;
90
    type ParachainExecutor = ContainerChainExecutor;
91
}
92

            
93
impl<RuntimeApi> ContainerChainNodeConfig<RuntimeApi> {
94
    pub fn new() -> Self {
95
        Self(PhantomData)
96
    }
97
}
98

            
99
/// Orchestrator Parachain Block import. We cannot use the one in cumulus as it overrides the best
100
/// chain selection rule
101
#[derive(Clone)]
102
pub struct OrchestratorParachainBlockImport<BI> {
103
    inner: BI,
104
}
105

            
106
impl<BI> OrchestratorParachainBlockImport<BI> {
107
    /// Create a new instance.
108
178
    pub fn new(inner: BI) -> Self {
109
178
        Self { inner }
110
178
    }
111
}
112

            
113
/// We simply rely on the inner
114
#[async_trait::async_trait]
115
impl<BI> BlockImport<Block> for OrchestratorParachainBlockImport<BI>
116
where
117
    BI: BlockImport<Block> + Send + Sync,
118
{
119
    type Error = BI::Error;
120

            
121
    async fn check_block(
122
        &self,
123
        block: sc_consensus::BlockCheckParams<Block>,
124
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
125
        self.inner.check_block(block).await
126
    }
127

            
128
    async fn import_block(
129
        &mut self,
130
        params: sc_consensus::BlockImportParams<Block>,
131
6992
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
132
6992
        let res = self.inner.import_block(params).await?;
133

            
134
6992
        Ok(res)
135
13984
    }
136
}
137

            
138
/// But we need to implement the ParachainBlockImportMarker trait to fullfil
139
impl<BI> ParachainBlockImportMarker for OrchestratorParachainBlockImport<BI> {}
140

            
141
// Orchestrator chain types
142
#[allow(deprecated)]
143
pub type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;
144
pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
145
pub type ParachainBackend = TFullBackend<Block>;
146
pub type DevParachainBlockImport = OrchestratorParachainBlockImport<Arc<ParachainClient>>;
147
pub type ParachainBlockImport =
148
    TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
149
pub type ParachainProposerFactory =
150
    ProposerFactory<FullPool<Block, ParachainClient>, ParachainClient, EnableProofRecording>;
151

            
152
// Container chains types
153
type ContainerChainExecutor = WasmExecutor<ParachainHostFunctions>;
154
pub type ContainerChainClient<RuntimeApi> = TFullClient<Block, RuntimeApi, ContainerChainExecutor>;
155
pub type ContainerChainBackend = TFullBackend<Block>;
156
type ContainerChainBlockImport<RuntimeApi> =
157
    TParachainBlockImport<Block, Arc<ContainerChainClient<RuntimeApi>>, ContainerChainBackend>;
158

            
159
tp_traits::alias!(
160
    pub trait MinimalContainerRuntimeApi:
161
        MinimalCumulusRuntimeApi<Block, ContainerChainClient<Self>>
162
        + sp_api::ConstructRuntimeApi<
163
            Block,
164
            ContainerChainClient<Self>,
165
            RuntimeApi:
166
                TanssiSlotDurationApi<Block>
167
                + async_backing_primitives::UnincludedSegmentApi<Block>,
168
        >
169
        + Sized
170
);
171

            
172
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
173
///
174
/// This is the actual implementation that is abstract over the executor and the runtime api.
175
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
176
pub async fn start_node_impl_container<
177
    RuntimeApi: MinimalContainerRuntimeApi,
178
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
179
>(
180
    parachain_config: Configuration,
181
    relay_chain_interface: Arc<dyn RelayChainInterface>,
182
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
183
    keystore: KeystorePtr,
184
    para_id: ParaId,
185
    collation_params: Option<crate::spawner::CollationParams>,
186
    generate_rpc_builder: TGenerateRpcBuilder,
187
) -> sc_service::error::Result<(
188
    TaskManager,
189
    Arc<ContainerChainClient<RuntimeApi>>,
190
    Arc<ParachainBackend>,
191
)> {
192
    let parachain_config = prepare_node_config(parachain_config);
193

            
194
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
195
    let node_builder = ContainerChainNodeConfig::new_builder(&parachain_config, None)?;
196

            
197
    let (block_import, import_queue) =
198
        container_chain_import_queue(&parachain_config, &node_builder);
199
    let import_queue_service = import_queue.service();
200

            
201
    log::info!("are we collators? {:?}", collation_params.is_some());
202
    let node_builder = node_builder
203
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
204
            &parachain_config,
205
            para_id,
206
            import_queue,
207
            relay_chain_interface.clone(),
208
        )
209
        .await?;
210

            
211
    let force_authoring = parachain_config.force_authoring;
212
    let prometheus_registry = parachain_config.prometheus_registry().cloned();
213

            
214
    let rpc_builder = generate_rpc_builder.generate(GenerateRpcBuilderParams {
215
        task_manager: &node_builder.task_manager,
216
        container_chain_config: &parachain_config,
217
        client: node_builder.client.clone(),
218
        backend: node_builder.backend.clone(),
219
        sync_service: node_builder.network.sync_service.clone(),
220
        transaction_pool: node_builder.transaction_pool.clone(),
221
        prometheus_registry: node_builder.prometheus_registry.clone(),
222
        command_sink: None,
223
        xcm_senders: None,
224
        network: node_builder.network.network.clone(),
225
    })?;
226

            
227
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
228

            
229
    let announce_block = {
230
        let sync_service = node_builder.network.sync_service.clone();
231
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
232
    };
233

            
234
    let relay_chain_slot_duration = Duration::from_secs(6);
235

            
236
    let overseer_handle = relay_chain_interface
237
        .overseer_handle()
238
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
239
    let (mut node_builder, _) = node_builder.extract_import_queue_service();
240

            
241
    start_relay_chain_tasks(StartRelayChainTasksParams {
242
        client: node_builder.client.clone(),
243
        announce_block: announce_block.clone(),
244
        para_id,
245
        relay_chain_interface: relay_chain_interface.clone(),
246
        task_manager: &mut node_builder.task_manager,
247
        da_recovery_profile: if collation_params.is_some() {
248
            DARecoveryProfile::Collator
249
        } else {
250
            DARecoveryProfile::FullNode
251
        },
252
        import_queue: import_queue_service,
253
        relay_chain_slot_duration,
254
        recovery_handle: Box::new(overseer_handle.clone()),
255
        sync_service: node_builder.network.sync_service.clone(),
256
    })?;
257

            
258
    if let Some(collation_params) = collation_params {
259
        let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
260
        let node_client = node_builder.client.clone();
261
        let node_backend = node_builder.backend.clone();
262

            
263
        start_consensus_container(
264
            node_client.clone(),
265
            node_backend.clone(),
266
            collation_params,
267
            block_import.clone(),
268
            prometheus_registry.clone(),
269
            node_builder.telemetry.as_ref().map(|t| t.handle()).clone(),
270
            node_spawn_handle.clone(),
271
            relay_chain_interface.clone(),
272
            orchestrator_chain_interface.clone(),
273
            node_builder.transaction_pool.clone(),
274
            node_builder.network.sync_service.clone(),
275
            keystore.clone(),
276
            force_authoring,
277
            relay_chain_slot_duration,
278
            para_id,
279
            overseer_handle.clone(),
280
            announce_block.clone(),
281
        );
282
    }
283

            
284
    node_builder.network.start_network.start_network();
285

            
286
    Ok((
287
        node_builder.task_manager,
288
        node_builder.client,
289
        node_builder.backend,
290
    ))
291
}
292

            
293
pub fn container_chain_import_queue<RuntimeApi: MinimalContainerRuntimeApi>(
294
    parachain_config: &Configuration,
295
    node_builder: &NodeBuilder<ContainerChainNodeConfig<RuntimeApi>>,
296
) -> (ContainerChainBlockImport<RuntimeApi>, BasicQueue<Block>) {
297
    // The nimbus import queue ONLY checks the signature correctness
298
    // Any other checks corresponding to the author-correctness should be done
299
    // in the runtime
300
    let block_import =
301
        ContainerChainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
302

            
303
    let import_queue = nimbus_consensus::import_queue(
304
        node_builder.client.clone(),
305
        block_import.clone(),
306
        move |_, _| async move {
307
            let time = sp_timestamp::InherentDataProvider::from_system_time();
308

            
309
            Ok((time,))
310
        },
311
        &node_builder.task_manager.spawn_essential_handle(),
312
        parachain_config.prometheus_registry(),
313
        false,
314
    )
315
    .expect("function never fails");
316

            
317
    (block_import, import_queue)
318
}
319

            
320
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
321
fn start_consensus_container<RuntimeApi: MinimalContainerRuntimeApi>(
322
    client: Arc<ContainerChainClient<RuntimeApi>>,
323
    backend: Arc<FullBackend>,
324
    collation_params: crate::spawner::CollationParams,
325
    block_import: ContainerChainBlockImport<RuntimeApi>,
326
    prometheus_registry: Option<Registry>,
327
    telemetry: Option<TelemetryHandle>,
328
    spawner: SpawnTaskHandle,
329
    relay_chain_interface: Arc<dyn RelayChainInterface>,
330
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
331
    transaction_pool: Arc<sc_transaction_pool::FullPool<Block, ContainerChainClient<RuntimeApi>>>,
332
    sync_oracle: Arc<SyncingService<Block>>,
333
    keystore: KeystorePtr,
334
    force_authoring: bool,
335
    relay_chain_slot_duration: Duration,
336
    para_id: ParaId,
337
    overseer_handle: OverseerHandle,
338
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
339
) {
340
    let crate::spawner::CollationParams {
341
        collator_key,
342
        orchestrator_tx_pool,
343
        orchestrator_client,
344
        orchestrator_para_id,
345
        solochain,
346
    } = collation_params;
347
    let slot_duration = if solochain {
348
        // Solochains use Babe instead of Aura, which has 6s slot duration
349
        let relay_slot_ms = relay_chain_slot_duration.as_millis();
350
        SlotDuration::from_millis(
351
            u64::try_from(relay_slot_ms).expect("relay chain slot duration overflows u64"),
352
        )
353
    } else {
354
        cumulus_client_consensus_aura::slot_duration(
355
            orchestrator_client
356
                .as_deref()
357
                .expect("solochain is false, orchestrator_client must be Some"),
358
        )
359
        .expect("start_consensus_container: slot duration should exist")
360
    };
361

            
362
    let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
363
        spawner.clone(),
364
        client.clone(),
365
        transaction_pool,
366
        prometheus_registry.as_ref(),
367
        telemetry.clone(),
368
    );
369

            
370
    let proposer = cumulus_client_consensus_proposer::Proposer::new(proposer_factory);
371

            
372
    let collator_service = cumulus_client_collator::service::CollatorService::new(
373
        client.clone(),
374
        Arc::new(spawner.clone()),
375
        announce_block,
376
        client.clone(),
377
    );
378

            
379
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
380
    let relay_chain_interace_for_orch = relay_chain_interface.clone();
381
    let orchestrator_client_for_cidp = orchestrator_client.clone();
382
    let client_for_cidp = client.clone();
383
    let client_for_hash_provider = client.clone();
384
    let client_for_slot_duration = client.clone();
385

            
386
    let code_hash_provider = move |block_hash| {
387
        client_for_hash_provider
388
            .code_at(block_hash)
389
            .ok()
390
            .map(polkadot_primitives::ValidationCode)
391
            .map(|c| c.hash())
392
    };
393
    let buy_core_params = if solochain {
394
        BuyCoreParams::Solochain {}
395
    } else {
396
        BuyCoreParams::Orchestrator {
397
            orchestrator_tx_pool: orchestrator_tx_pool
398
                .expect("solochain is false, orchestrator_tx_pool must be Some"),
399
            orchestrator_client: orchestrator_client
400
                .expect("solochain is false, orchestrator_client must be Some"),
401
        }
402
    };
403

            
404
    let params = LookaheadTanssiAuraParams {
405
        get_current_slot_duration: move |block_hash| {
406
            // Default to 12s if runtime API does not exist
407
            let slot_duration_ms = client_for_slot_duration
408
                .runtime_api()
409
                .slot_duration(block_hash)
410
                .unwrap_or(12_000);
411

            
412
            SlotDuration::from_millis(slot_duration_ms)
413
        },
414
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
415
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
416
            let orchestrator_chain_interface = orchestrator_chain_interface.clone();
417
            let client = client_for_cidp.clone();
418

            
419
            async move {
420
                let authorities_noting_inherent = if solochain {
421
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at_solochain(
422
                        relay_parent,
423
                        &relay_chain_interface,
424
                    )
425
                        .await
426
                } else {
427
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at(
428
                        relay_parent,
429
                        &relay_chain_interface,
430
                        &orchestrator_chain_interface,
431
                        orchestrator_para_id,
432
                    )
433
                        .await
434
                };
435

            
436
                let slot_duration = {
437
                    // Default to 12s if runtime API does not exist
438
                    let slot_duration_ms = client
439
                        .runtime_api()
440
                        .slot_duration(block_hash)
441
                        .unwrap_or(12_000);
442

            
443
                    SlotDuration::from_millis(slot_duration_ms)
444
                };
445

            
446
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
447

            
448
                let slot =
449
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
450
							*timestamp,
451
							slot_duration,
452
						);
453

            
454
                let authorities_noting_inherent = authorities_noting_inherent.ok_or_else(|| {
455
                    Box::<dyn std::error::Error + Send + Sync>::from(
456
                        "Failed to create authoritiesnoting inherent",
457
                    )
458
                })?;
459

            
460
                Ok((slot, timestamp, authorities_noting_inherent))
461
            }
462
        },
463
        get_orchestrator_aux_data: move |_block_hash, (relay_parent, _validation_data)| {
464
            let relay_chain_interace_for_orch = relay_chain_interace_for_orch.clone();
465
            let orchestrator_client_for_cidp = orchestrator_client_for_cidp.clone();
466

            
467
            async move {
468
                if solochain {
469
                    let authorities: Option<Vec<NimbusId>> = call_remote_runtime_function(
470
                        &relay_chain_interace_for_orch,
471
                        "TanssiAuthorityAssignmentApi_para_id_authorities",
472
                        relay_parent,
473
                        &para_id,
474
                    )
475
                    .await?;
476

            
477
                    let authorities = authorities.ok_or_else(|| {
478
                        Box::<dyn std::error::Error + Send + Sync>::from(
479
                            "Failed to fetch authorities with error",
480
                        )
481
                    })?;
482

            
483
                    log::info!(
484
                        "Authorities {:?} found for header {:?}",
485
                        authorities,
486
                        relay_parent
487
                    );
488

            
489
                    let slot_freq: Option<_> = call_remote_runtime_function(
490
                        &relay_chain_interace_for_orch,
491
                        "OnDemandBlockProductionApi_parathread_slot_frequency",
492
                        relay_parent,
493
                        &para_id,
494
                    )
495
                    .await?;
496

            
497
                    let aux_data = OrchestratorAuraWorkerAuxData {
498
                        authorities,
499
                        slot_freq,
500
                    };
501

            
502
                    Ok(aux_data)
503
                } else {
504
                    let latest_header =
505
                        ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::get_latest_orchestrator_head_info(
506
                            relay_parent,
507
                            &relay_chain_interace_for_orch,
508
                            orchestrator_para_id,
509
                        )
510
                            .await;
511

            
512
                    let latest_header = latest_header.ok_or_else(|| {
513
                        Box::<dyn std::error::Error + Send + Sync>::from(
514
                            "Failed to fetch latest header",
515
                        )
516
                    })?;
517

            
518
                    let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
519
                        orchestrator_client_for_cidp
520
                            .as_ref()
521
                            .expect("solochain is false, orchestrator_client must be Some"),
522
                        &latest_header.hash(),
523
                        para_id,
524
                    );
525

            
526
                    let authorities = authorities.ok_or_else(|| {
527
                        Box::<dyn std::error::Error + Send + Sync>::from(
528
                            "Failed to fetch authorities with error",
529
                        )
530
                    })?;
531

            
532
                    log::info!(
533
                        "Authorities {:?} found for header {:?}",
534
                        authorities,
535
                        latest_header
536
                    );
537

            
538
                    let slot_freq = tc_consensus::min_slot_freq::<Block, ParachainClient, NimbusPair>(
539
                        orchestrator_client_for_cidp
540
                            .as_ref()
541
                            .expect("solochain is false, orchestrator_client must be Some"),
542
                        &latest_header.hash(),
543
                        para_id,
544
                    );
545

            
546
                    let aux_data = OrchestratorAuraWorkerAuxData {
547
                        authorities,
548
                        slot_freq,
549
                    };
550

            
551
                    Ok(aux_data)
552
                }
553
            }
554
        },
555
        block_import,
556
        para_client: client,
557
        relay_client: relay_chain_interface,
558
        sync_oracle,
559
        keystore,
560
        collator_key,
561
        para_id,
562
        overseer_handle,
563
        orchestrator_slot_duration: slot_duration,
564
        force_authoring,
565
        relay_chain_slot_duration,
566
        proposer,
567
        collator_service,
568
        authoring_duration: Duration::from_millis(2000),
569
        para_backend: backend,
570
        code_hash_provider,
571
        // This cancellation token is no-op as it is not shared outside.
572
        cancellation_token: CancellationToken::new(),
573
        buy_core_params,
574
    };
575

            
576
    let (fut, _exit_notification_receiver) =
577
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
578
            params,
579
        );
580
    spawner.spawn("tanssi-aura-container", None, fut);
581
}
582

            
583
// Log string that will be shown for the container chain: `[Container-2000]`.
584
// This needs to be a separate function because the `prefix_logs_with` macro
585
// has trouble parsing expressions.
586
fn container_log_str(para_id: ParaId) -> String {
587
    format!("Container-{}", para_id)
588
}