1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    cumulus_client_consensus_common::{
19
        ParachainBlockImport as TParachainBlockImport, ParachainBlockImportMarker,
20
    },
21
    cumulus_client_service::{
22
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, ParachainHostFunctions,
23
        StartRelayChainTasksParams,
24
    },
25
    cumulus_primitives_core::ParaId,
26
    cumulus_relay_chain_interface::{
27
        call_remote_runtime_function, OverseerHandle, RelayChainInterface,
28
    },
29
    dancebox_runtime::{
30
        opaque::{Block, Hash},
31
        RuntimeApi,
32
    },
33
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
34
    dp_slot_duration_runtime_api::TanssiSlotDurationApi,
35
    nimbus_primitives::{NimbusId, NimbusPair},
36
    node_common::service::{NodeBuilder, NodeBuilderConfig},
37
    sc_basic_authorship::ProposerFactory,
38
    sc_consensus::{BasicQueue, BlockImport},
39
    sc_executor::WasmExecutor,
40
    sc_network::NetworkBlock,
41
    sc_network_sync::SyncingService,
42
    sc_service::{
43
        Configuration, ImportQueue, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
44
    },
45
    sc_telemetry::TelemetryHandle,
46
    sc_transaction_pool::FullPool,
47
    sp_api::ProvideRuntimeApi,
48
    sp_consensus::EnableProofRecording,
49
    sp_consensus_aura::SlotDuration,
50
    sp_keystore::KeystorePtr,
51
    std::{sync::Arc, time::Duration},
52
    substrate_prometheus_endpoint::Registry,
53
    tc_consensus::{
54
        collators::lookahead::{
55
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
56
        },
57
        OrchestratorAuraWorkerAuxData,
58
    },
59
    tokio_util::sync::CancellationToken,
60
};
61

            
62
#[allow(deprecated)]
63
use sc_executor::NativeElseWasmExecutor;
64

            
65
type FullBackend = TFullBackend<Block>;
66

            
67
/// Native executor type.
68
pub struct ParachainNativeExecutor;
69

            
70
impl sc_executor::NativeExecutionDispatch for ParachainNativeExecutor {
71
    type ExtendHostFunctions = ParachainHostFunctions;
72

            
73
163616
    fn dispatch(method: &str, data: &[u8]) -> Option<Vec<u8>> {
74
163616
        dancebox_runtime::api::dispatch(method, data)
75
163616
    }
76

            
77
1800
    fn native_version() -> sc_executor::NativeVersion {
78
1800
        dancebox_runtime::native_version()
79
1800
    }
80
}
81

            
82
pub struct ContainerChainNodeConfig;
83
impl NodeBuilderConfig for ContainerChainNodeConfig {
84
    type Block = Block;
85
    // TODO: RuntimeApi here should be the subset of runtime apis available for all containers
86
    // Currently we are using the orchestrator runtime apis
87
    type RuntimeApi = RuntimeApi;
88
    type ParachainExecutor = ContainerChainExecutor;
89
}
90

            
91
/// Orchestrator Parachain Block import. We cannot use the one in cumulus as it overrides the best
92
/// chain selection rule
93
#[derive(Clone)]
94
pub struct OrchestratorParachainBlockImport<BI> {
95
    inner: BI,
96
}
97

            
98
impl<BI> OrchestratorParachainBlockImport<BI> {
99
    /// Create a new instance.
100
178
    pub fn new(inner: BI) -> Self {
101
178
        Self { inner }
102
178
    }
103
}
104

            
105
/// We simply rely on the inner
106
#[async_trait::async_trait]
107
impl<BI> BlockImport<Block> for OrchestratorParachainBlockImport<BI>
108
where
109
    BI: BlockImport<Block> + Send + Sync,
110
{
111
    type Error = BI::Error;
112

            
113
    async fn check_block(
114
        &self,
115
        block: sc_consensus::BlockCheckParams<Block>,
116
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
117
        self.inner.check_block(block).await
118
    }
119

            
120
    async fn import_block(
121
        &mut self,
122
        params: sc_consensus::BlockImportParams<Block>,
123
6992
    ) -> Result<sc_consensus::ImportResult, Self::Error> {
124
6992
        let res = self.inner.import_block(params).await?;
125

            
126
6992
        Ok(res)
127
13984
    }
128
}
129

            
130
/// But we need to implement the ParachainBlockImportMarker trait to fullfil
131
impl<BI> ParachainBlockImportMarker for OrchestratorParachainBlockImport<BI> {}
132

            
133
// Orchestrator chain types
134
#[allow(deprecated)]
135
pub type ParachainExecutor = NativeElseWasmExecutor<ParachainNativeExecutor>;
136
pub type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
137
pub type ParachainBackend = TFullBackend<Block>;
138
pub type DevParachainBlockImport = OrchestratorParachainBlockImport<Arc<ParachainClient>>;
139
pub type ParachainBlockImport =
140
    TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
141
pub type ParachainProposerFactory =
142
    ProposerFactory<FullPool<Block, ParachainClient>, ParachainClient, EnableProofRecording>;
143

            
144
// Container chains types
145
type ContainerChainExecutor = WasmExecutor<ParachainHostFunctions>;
146
pub type ContainerChainClient = TFullClient<Block, RuntimeApi, ContainerChainExecutor>;
147
pub type ContainerChainBackend = TFullBackend<Block>;
148
type ContainerChainBlockImport =
149
    TParachainBlockImport<Block, Arc<ContainerChainClient>, ContainerChainBackend>;
150

            
151
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
152
///
153
/// This is the actual implementation that is abstract over the executor and the runtime api.
154
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
155
pub async fn start_node_impl_container(
156
    parachain_config: Configuration,
157
    relay_chain_interface: Arc<dyn RelayChainInterface>,
158
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
159
    keystore: KeystorePtr,
160
    para_id: ParaId,
161
    collation_params: Option<crate::spawner::CollationParams>,
162
) -> sc_service::error::Result<(
163
    TaskManager,
164
    Arc<ContainerChainClient>,
165
    Arc<ParachainBackend>,
166
)> {
167
    let parachain_config = prepare_node_config(parachain_config);
168

            
169
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
170
    let node_builder = ContainerChainNodeConfig::new_builder(&parachain_config, None)?;
171

            
172
    let (block_import, import_queue) =
173
        container_chain_import_queue(&parachain_config, &node_builder);
174
    let import_queue_service = import_queue.service();
175

            
176
    log::info!("are we collators? {:?}", collation_params.is_some());
177
    let node_builder = node_builder
178
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
179
            &parachain_config,
180
            para_id,
181
            import_queue,
182
            relay_chain_interface.clone(),
183
        )
184
        .await?;
185

            
186
    let force_authoring = parachain_config.force_authoring;
187
    let prometheus_registry = parachain_config.prometheus_registry().cloned();
188

            
189
    let rpc_builder = {
190
        let client = node_builder.client.clone();
191
        let transaction_pool = node_builder.transaction_pool.clone();
192

            
193
        Box::new(move |deny_unsafe, _| {
194
            let deps = crate::rpc::FullDeps {
195
                client: client.clone(),
196
                pool: transaction_pool.clone(),
197
                deny_unsafe,
198
                command_sink: None,
199
                xcm_senders: None,
200
            };
201

            
202
            crate::rpc::create_full(deps).map_err(Into::into)
203
        })
204
    };
205

            
206
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
207

            
208
    let announce_block = {
209
        let sync_service = node_builder.network.sync_service.clone();
210
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
211
    };
212

            
213
    let relay_chain_slot_duration = Duration::from_secs(6);
214

            
215
    let overseer_handle = relay_chain_interface
216
        .overseer_handle()
217
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
218
    let (mut node_builder, _) = node_builder.extract_import_queue_service();
219

            
220
    start_relay_chain_tasks(StartRelayChainTasksParams {
221
        client: node_builder.client.clone(),
222
        announce_block: announce_block.clone(),
223
        para_id,
224
        relay_chain_interface: relay_chain_interface.clone(),
225
        task_manager: &mut node_builder.task_manager,
226
        da_recovery_profile: if collation_params.is_some() {
227
            DARecoveryProfile::Collator
228
        } else {
229
            DARecoveryProfile::FullNode
230
        },
231
        import_queue: import_queue_service,
232
        relay_chain_slot_duration,
233
        recovery_handle: Box::new(overseer_handle.clone()),
234
        sync_service: node_builder.network.sync_service.clone(),
235
    })?;
236

            
237
    if let Some(collation_params) = collation_params {
238
        let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
239
        let node_client = node_builder.client.clone();
240
        let node_backend = node_builder.backend.clone();
241

            
242
        start_consensus_container(
243
            node_client.clone(),
244
            node_backend.clone(),
245
            collation_params,
246
            block_import.clone(),
247
            prometheus_registry.clone(),
248
            node_builder.telemetry.as_ref().map(|t| t.handle()).clone(),
249
            node_spawn_handle.clone(),
250
            relay_chain_interface.clone(),
251
            orchestrator_chain_interface.clone(),
252
            node_builder.transaction_pool.clone(),
253
            node_builder.network.sync_service.clone(),
254
            keystore.clone(),
255
            force_authoring,
256
            relay_chain_slot_duration,
257
            para_id,
258
            overseer_handle.clone(),
259
            announce_block.clone(),
260
        );
261
    }
262

            
263
    node_builder.network.start_network.start_network();
264

            
265
    Ok((
266
        node_builder.task_manager,
267
        node_builder.client,
268
        node_builder.backend,
269
    ))
270
}
271

            
272
pub fn container_chain_import_queue(
273
    parachain_config: &Configuration,
274
    node_builder: &NodeBuilder<ContainerChainNodeConfig>,
275
) -> (ContainerChainBlockImport, BasicQueue<Block>) {
276
    // The nimbus import queue ONLY checks the signature correctness
277
    // Any other checks corresponding to the author-correctness should be done
278
    // in the runtime
279
    let block_import =
280
        ContainerChainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
281

            
282
    let import_queue = nimbus_consensus::import_queue(
283
        node_builder.client.clone(),
284
        block_import.clone(),
285
        move |_, _| async move {
286
            let time = sp_timestamp::InherentDataProvider::from_system_time();
287

            
288
            Ok((time,))
289
        },
290
        &node_builder.task_manager.spawn_essential_handle(),
291
        parachain_config.prometheus_registry(),
292
        false,
293
    )
294
    .expect("function never fails");
295

            
296
    (block_import, import_queue)
297
}
298

            
299
#[sc_tracing::logging::prefix_logs_with(container_log_str(para_id))]
300
fn start_consensus_container(
301
    client: Arc<ContainerChainClient>,
302
    backend: Arc<FullBackend>,
303
    collation_params: crate::spawner::CollationParams,
304
    block_import: ContainerChainBlockImport,
305
    prometheus_registry: Option<Registry>,
306
    telemetry: Option<TelemetryHandle>,
307
    spawner: SpawnTaskHandle,
308
    relay_chain_interface: Arc<dyn RelayChainInterface>,
309
    orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
310
    transaction_pool: Arc<sc_transaction_pool::FullPool<Block, ContainerChainClient>>,
311
    sync_oracle: Arc<SyncingService<Block>>,
312
    keystore: KeystorePtr,
313
    force_authoring: bool,
314
    relay_chain_slot_duration: Duration,
315
    para_id: ParaId,
316
    overseer_handle: OverseerHandle,
317
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
318
) {
319
    let crate::spawner::CollationParams {
320
        collator_key,
321
        orchestrator_tx_pool,
322
        orchestrator_client,
323
        orchestrator_para_id,
324
        solochain,
325
    } = collation_params;
326
    let slot_duration = if solochain {
327
        // Solochains use Babe instead of Aura, which has 6s slot duration
328
        let relay_slot_ms = relay_chain_slot_duration.as_millis();
329
        SlotDuration::from_millis(
330
            u64::try_from(relay_slot_ms).expect("relay chain slot duration overflows u64"),
331
        )
332
    } else {
333
        cumulus_client_consensus_aura::slot_duration(
334
            orchestrator_client
335
                .as_deref()
336
                .expect("solochain is false, orchestrator_client must be Some"),
337
        )
338
        .expect("start_consensus_container: slot duration should exist")
339
    };
340

            
341
    let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
342
        spawner.clone(),
343
        client.clone(),
344
        transaction_pool,
345
        prometheus_registry.as_ref(),
346
        telemetry.clone(),
347
    );
348

            
349
    let proposer = cumulus_client_consensus_proposer::Proposer::new(proposer_factory);
350

            
351
    let collator_service = cumulus_client_collator::service::CollatorService::new(
352
        client.clone(),
353
        Arc::new(spawner.clone()),
354
        announce_block,
355
        client.clone(),
356
    );
357

            
358
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
359
    let relay_chain_interace_for_orch = relay_chain_interface.clone();
360
    let orchestrator_client_for_cidp = orchestrator_client.clone();
361
    let client_for_cidp = client.clone();
362
    let client_for_hash_provider = client.clone();
363
    let client_for_slot_duration = client.clone();
364

            
365
    let code_hash_provider = move |block_hash| {
366
        client_for_hash_provider
367
            .code_at(block_hash)
368
            .ok()
369
            .map(polkadot_primitives::ValidationCode)
370
            .map(|c| c.hash())
371
    };
372
    let buy_core_params = if solochain {
373
        BuyCoreParams::Solochain {}
374
    } else {
375
        BuyCoreParams::Orchestrator {
376
            orchestrator_tx_pool: orchestrator_tx_pool
377
                .expect("solochain is false, orchestrator_tx_pool must be Some"),
378
            orchestrator_client: orchestrator_client
379
                .expect("solochain is false, orchestrator_client must be Some"),
380
        }
381
    };
382

            
383
    let params = LookaheadTanssiAuraParams {
384
        get_current_slot_duration: move |block_hash| {
385
            // Default to 12s if runtime API does not exist
386
            let slot_duration_ms = client_for_slot_duration
387
                .runtime_api()
388
                .slot_duration(block_hash)
389
                .unwrap_or(12_000);
390

            
391
            SlotDuration::from_millis(slot_duration_ms)
392
        },
393
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
394
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
395
            let orchestrator_chain_interface = orchestrator_chain_interface.clone();
396
            let client = client_for_cidp.clone();
397

            
398
            async move {
399
                let authorities_noting_inherent = if solochain {
400
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at_solochain(
401
                        relay_parent,
402
                        &relay_chain_interface,
403
                    )
404
                        .await
405
                } else {
406
                    ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::create_at(
407
                        relay_parent,
408
                        &relay_chain_interface,
409
                        &orchestrator_chain_interface,
410
                        orchestrator_para_id,
411
                    )
412
                        .await
413
                };
414

            
415
                let slot_duration = {
416
                    // Default to 12s if runtime API does not exist
417
                    let slot_duration_ms = client
418
                        .runtime_api()
419
                        .slot_duration(block_hash)
420
                        .unwrap_or(12_000);
421

            
422
                    SlotDuration::from_millis(slot_duration_ms)
423
                };
424

            
425
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
426

            
427
                let slot =
428
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
429
							*timestamp,
430
							slot_duration,
431
						);
432

            
433
                let authorities_noting_inherent = authorities_noting_inherent.ok_or_else(|| {
434
                    Box::<dyn std::error::Error + Send + Sync>::from(
435
                        "Failed to create authoritiesnoting inherent",
436
                    )
437
                })?;
438

            
439
                Ok((slot, timestamp, authorities_noting_inherent))
440
            }
441
        },
442
        get_orchestrator_aux_data: move |_block_hash, (relay_parent, _validation_data)| {
443
            let relay_chain_interace_for_orch = relay_chain_interace_for_orch.clone();
444
            let orchestrator_client_for_cidp = orchestrator_client_for_cidp.clone();
445

            
446
            async move {
447
                if solochain {
448
                    let authorities: Option<Vec<NimbusId>> = call_remote_runtime_function(
449
                        &relay_chain_interace_for_orch,
450
                        "TanssiAuthorityAssignmentApi_para_id_authorities",
451
                        relay_parent,
452
                        &para_id,
453
                    )
454
                    .await?;
455

            
456
                    let authorities = authorities.ok_or_else(|| {
457
                        Box::<dyn std::error::Error + Send + Sync>::from(
458
                            "Failed to fetch authorities with error",
459
                        )
460
                    })?;
461

            
462
                    log::info!(
463
                        "Authorities {:?} found for header {:?}",
464
                        authorities,
465
                        relay_parent
466
                    );
467

            
468
                    let slot_freq: Option<_> = call_remote_runtime_function(
469
                        &relay_chain_interace_for_orch,
470
                        "OnDemandBlockProductionApi_parathread_slot_frequency",
471
                        relay_parent,
472
                        &para_id,
473
                    )
474
                    .await?;
475

            
476
                    let aux_data = OrchestratorAuraWorkerAuxData {
477
                        authorities,
478
                        slot_freq,
479
                    };
480

            
481
                    Ok(aux_data)
482
                } else {
483
                    let latest_header =
484
                        ccp_authorities_noting_inherent::ContainerChainAuthoritiesInherentData::get_latest_orchestrator_head_info(
485
                            relay_parent,
486
                            &relay_chain_interace_for_orch,
487
                            orchestrator_para_id,
488
                        )
489
                            .await;
490

            
491
                    let latest_header = latest_header.ok_or_else(|| {
492
                        Box::<dyn std::error::Error + Send + Sync>::from(
493
                            "Failed to fetch latest header",
494
                        )
495
                    })?;
496

            
497
                    let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
498
                        orchestrator_client_for_cidp
499
                            .as_ref()
500
                            .expect("solochain is false, orchestrator_client must be Some"),
501
                        &latest_header.hash(),
502
                        para_id,
503
                    );
504

            
505
                    let authorities = authorities.ok_or_else(|| {
506
                        Box::<dyn std::error::Error + Send + Sync>::from(
507
                            "Failed to fetch authorities with error",
508
                        )
509
                    })?;
510

            
511
                    log::info!(
512
                        "Authorities {:?} found for header {:?}",
513
                        authorities,
514
                        latest_header
515
                    );
516

            
517
                    let slot_freq = tc_consensus::min_slot_freq::<Block, ParachainClient, NimbusPair>(
518
                        orchestrator_client_for_cidp
519
                            .as_ref()
520
                            .expect("solochain is false, orchestrator_client must be Some"),
521
                        &latest_header.hash(),
522
                        para_id,
523
                    );
524

            
525
                    let aux_data = OrchestratorAuraWorkerAuxData {
526
                        authorities,
527
                        slot_freq,
528
                    };
529

            
530
                    Ok(aux_data)
531
                }
532
            }
533
        },
534
        block_import,
535
        para_client: client,
536
        relay_client: relay_chain_interface,
537
        sync_oracle,
538
        keystore,
539
        collator_key,
540
        para_id,
541
        overseer_handle,
542
        orchestrator_slot_duration: slot_duration,
543
        force_authoring,
544
        relay_chain_slot_duration,
545
        proposer,
546
        collator_service,
547
        authoring_duration: Duration::from_millis(2000),
548
        para_backend: backend,
549
        code_hash_provider,
550
        // This cancellation token is no-op as it is not shared outside.
551
        cancellation_token: CancellationToken::new(),
552
        buy_core_params,
553
    };
554

            
555
    let (fut, _exit_notification_receiver) =
556
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
557
            params,
558
        );
559
    spawner.spawn("tanssi-aura-container", None, fut);
560
}
561

            
562
// Log string that will be shown for the container chain: `[Container-2000]`.
563
// This needs to be a separate function because the `prefix_logs_with` macro
564
// has trouble parsing expressions.
565
fn container_log_str(para_id: ParaId) -> String {
566
    format!("Container-{}", para_id)
567
}