1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
pub mod rpc;
18

            
19
use cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams};
20
use node_common::service::node_builder::StartBootnodeParams;
21
use sc_client_api::TrieCacheContext;
22
use {
23
    cumulus_client_cli::CollatorOptions,
24
    cumulus_client_collator::service::CollatorService,
25
    cumulus_client_consensus_proposer::Proposer,
26
    cumulus_client_service::{
27
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
28
    },
29
    cumulus_primitives_core::{relay_chain::CollatorPair, ParaId},
30
    cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface},
31
    dancebox_runtime::{
32
        opaque::{Block, Hash},
33
        AccountId, RuntimeApi,
34
    },
35
    dc_orchestrator_chain_interface::{
36
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
37
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
38
        PHeader,
39
    },
40
    futures::{Stream, StreamExt},
41
    nimbus_primitives::{NimbusId, NimbusPair},
42
    node_common::{service::node_builder::NodeBuilder, service::node_builder::NodeBuilderConfig},
43
    pallet_author_noting_runtime_api::AuthorNotingApi,
44
    pallet_collator_assignment_runtime_api::CollatorAssignmentApi,
45
    pallet_data_preservers_runtime_api::DataPreserversApi,
46
    pallet_registrar_runtime_api::RegistrarApi,
47
    polkadot_cli::ProvideRuntimeApi,
48
    sc_client_api::{
49
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
50
    },
51
    sc_consensus::BasicQueue,
52
    sc_network::{NetworkBackend, NetworkBlock},
53
    sc_network_sync::SyncingService,
54
    sc_service::{Configuration, SpawnTaskHandle, TaskManager},
55
    sc_tracing::tracing::Instrument,
56
    sc_transaction_pool::TransactionPoolHandle,
57
    sp_api::{ApiExt, StorageProof},
58
    sp_consensus::SyncOracle,
59
    sp_consensus_slots::Slot,
60
    sp_core::H256,
61
    sp_keystore::KeystorePtr,
62
    sp_state_machine::{Backend as StateBackend, StorageValue},
63
    std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration},
64
    tc_consensus::{
65
        collators::lookahead::{
66
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
67
        },
68
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
69
    },
70
    tc_service_container_chain_spawner::{
71
        cli::ContainerChainCli,
72
        monitor,
73
        service::{
74
            ParachainBlockImport, ParachainClient, ParachainExecutor, ParachainProposerFactory,
75
        },
76
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
77
    },
78
    tokio::sync::mpsc,
79
    tokio_util::sync::CancellationToken,
80
};
81

            
82
type FullBackend = sc_service::TFullBackend<Block>;
83

            
84
pub struct NodeConfig;
85
impl NodeBuilderConfig for NodeConfig {
86
    type Block = Block;
87
    type RuntimeApi = RuntimeApi;
88
    type ParachainExecutor = ParachainExecutor;
89
}
90

            
91
pub struct ParachainNodeStarted {
92
    pub task_manager: TaskManager,
93
    pub client: Arc<ParachainClient>,
94
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
95
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
96
    pub keystore: KeystorePtr,
97
    pub start_bootnode_params: StartBootnodeParams,
98
}
99

            
100
/// Start a parachain node.
101
pub async fn start_parachain_node<Net>(
102
    parachain_config: Configuration,
103
    polkadot_config: Configuration,
104
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
105
    collator_options: CollatorOptions,
106
    para_id: ParaId,
107
    hwbench: Option<sc_sysinfo::HwBench>,
108
    max_pov_percentage: Option<u32>,
109
) -> sc_service::error::Result<ParachainNodeStarted>
110
where
111
    Net: NetworkBackend<Block, Hash>,
112
{
113
    start_node_impl::<Net>(
114
        parachain_config,
115
        polkadot_config,
116
        container_config,
117
        collator_options,
118
        para_id,
119
        hwbench,
120
        max_pov_percentage,
121
    )
122
    .instrument(sc_tracing::tracing::info_span!(
123
        sc_tracing::logging::PREFIX_LOG_SPAN,
124
        name = "Orchestrator",
125
    ))
126
    .await
127
}
128

            
129
/// Start collator task for orchestrator chain.
130
/// Returns a `CancellationToken` that can be used to cancel the collator task,
131
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
132
fn start_consensus_orchestrator(
133
    client: Arc<ParachainClient>,
134
    backend: Arc<FullBackend>,
135
    block_import: ParachainBlockImport,
136
    spawner: SpawnTaskHandle,
137
    relay_chain_interface: Arc<dyn RelayChainInterface>,
138
    sync_oracle: Arc<SyncingService<Block>>,
139
    keystore: KeystorePtr,
140
    force_authoring: bool,
141
    relay_chain_slot_duration: Duration,
142
    para_id: ParaId,
143
    collator_key: CollatorPair,
144
    overseer_handle: OverseerHandle,
145
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
146
    proposer_factory: ParachainProposerFactory,
147
    orchestrator_tx_pool: Arc<TransactionPoolHandle<Block, ParachainClient>>,
148
    max_pov_percentage: Option<u32>,
149
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
150
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
151
        .expect("start_consensus_orchestrator: slot duration should exist");
152

            
153
    let proposer = Proposer::new(proposer_factory);
154

            
155
    let collator_service = CollatorService::new(
156
        client.clone(),
157
        Arc::new(spawner.clone()),
158
        announce_block,
159
        client.clone(),
160
    );
161

            
162
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
163
    let client_set_aside_for_cidp = client.clone();
164
    let client_set_aside_for_orch = client.clone();
165
    let client_for_hash_provider = client.clone();
166
    let client_for_slot_duration_provider = client.clone();
167

            
168
    let code_hash_provider = move |block_hash| {
169
        client_for_hash_provider
170
            .code_at(block_hash)
171
            .ok()
172
            .map(polkadot_primitives::ValidationCode)
173
            .map(|c| c.hash())
174
    };
175

            
176
    let cancellation_token = CancellationToken::new();
177
    let buy_core_params = BuyCoreParams::Orchestrator {
178
        orchestrator_tx_pool,
179
        orchestrator_client: client.clone(),
180
    };
181

            
182
    let params = LookaheadTanssiAuraParams {
183
        max_pov_percentage,
184
        get_current_slot_duration: move |block_hash| {
185
            sc_consensus_aura::standalone::slot_duration_at(
186
                &*client_for_slot_duration_provider,
187
                block_hash,
188
            )
189
            .expect("Slot duration should be set")
190
        },
191
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
192
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
193
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
194
            async move {
195
                // We added a new runtime api that allows to know which parachains have
196
                // some collators assigned to them. We'll now only include those. For older
197
                // runtimes we continue to write all of them.
198
                let para_ids = match client_set_aside_for_cidp
199
                    .runtime_api()
200
                    .api_version::<dyn CollatorAssignmentApi<Block, AccountId, ParaId>>(
201
                    block_hash,
202
                )? {
203
                    Some(version) if version >= 2 => client_set_aside_for_cidp
204
                        .runtime_api()
205
                        .parachains_with_some_collators(block_hash)?,
206
                    _ => client_set_aside_for_cidp
207
                        .runtime_api()
208
                        .registered_paras(block_hash)?,
209
                };
210
                let para_ids: Vec<_> = para_ids.into_iter().collect();
211
                let author_noting_inherent =
212
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
213
                        relay_parent,
214
                        &relay_chain_interface,
215
                        &para_ids,
216
                    )
217
                    .await;
218

            
219
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
220
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
221
                    &*client_set_aside_for_cidp.clone(),
222
                    block_hash,
223
                )
224
                .expect("Slot duration should be set");
225

            
226
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
227

            
228
                let slot =
229
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
230
							*timestamp,
231
							slot_duration,
232
						);
233

            
234
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
235
                    Box::<dyn std::error::Error + Send + Sync>::from(
236
                        "Failed to create author noting inherent",
237
                    )
238
                })?;
239

            
240
                Ok((slot, timestamp, author_noting_inherent))
241
            }
242
        },
243
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
244
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
245

            
246
            async move {
247
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
248
                    client_set_aside_for_orch.as_ref(),
249
                    &block_hash,
250
                    para_id,
251
                );
252

            
253
                let authorities = authorities.ok_or_else(|| {
254
                    Box::<dyn std::error::Error + Send + Sync>::from(
255
                        "Failed to fetch authorities with error",
256
                    )
257
                })?;
258

            
259
                log::info!(
260
                    "Authorities {:?} found for header {:?}",
261
                    authorities,
262
                    block_hash
263
                );
264

            
265
                let aux_data = OrchestratorAuraWorkerAuxData {
266
                    authorities,
267
                    // This is the orchestrator consensus, it does not have a slot frequency
268
                    slot_freq: None,
269
                };
270

            
271
                Ok(aux_data)
272
            }
273
        },
274
        block_import,
275
        para_client: client,
276
        relay_client: relay_chain_interface,
277
        sync_oracle,
278
        keystore,
279
        collator_key,
280
        para_id,
281
        overseer_handle,
282
        orchestrator_slot_duration: slot_duration,
283
        relay_chain_slot_duration,
284
        force_authoring,
285
        proposer,
286
        collator_service,
287
        authoring_duration: Duration::from_millis(2000),
288
        code_hash_provider,
289
        para_backend: backend,
290
        cancellation_token: cancellation_token.clone(),
291
        buy_core_params,
292
    };
293

            
294
    let (fut, exit_notification_receiver) =
295
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
296
            params,
297
        );
298
    spawner.spawn("tanssi-aura", None, fut);
299

            
300
    (cancellation_token, exit_notification_receiver)
301
}
302

            
303
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
304
///
305
/// This is the actual implementation that is abstract over the executor and the runtime api.
306
async fn start_node_impl<Net>(
307
    orchestrator_config: Configuration,
308
    polkadot_config: Configuration,
309
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
310
    collator_options: CollatorOptions,
311
    para_id: ParaId,
312
    hwbench: Option<sc_sysinfo::HwBench>,
313
    max_pov_percentage: Option<u32>,
314
) -> sc_service::error::Result<ParachainNodeStarted>
315
where
316
    Net: NetworkBackend<Block, Hash>,
317
{
318
    let parachain_config = prepare_node_config(orchestrator_config);
319
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
320
    let relay_chain = node_common::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
321
        .map(|e| e.relay_chain.clone())
322
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
323

            
324
    // Channel to send messages to start/stop container chains
325
    let (cc_spawn_tx, cc_spawn_rx) = mpsc::unbounded_channel();
326

            
327
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
328
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
329

            
330
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
331

            
332
    let (relay_chain_interface, collator_key, start_bootnode_params) = node_builder
333
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
334
        .await?;
335

            
336
    let validator = parachain_config.role.is_authority();
337
    let force_authoring = parachain_config.force_authoring;
338

            
339
    let node_builder = node_builder
340
        .build_cumulus_network::<_, Net>(
341
            &parachain_config,
342
            para_id,
343
            import_queue,
344
            relay_chain_interface.clone(),
345
        )
346
        .await?;
347

            
348
    let rpc_builder = {
349
        let client = node_builder.client.clone();
350
        let transaction_pool = node_builder.transaction_pool.clone();
351

            
352
        Box::new(move |_| {
353
            let deps = rpc::FullDeps {
354
                client: client.clone(),
355
                pool: transaction_pool.clone(),
356
                command_sink: None,
357
                xcm_senders: None,
358
                randomness_sender: None,
359
                container_chain_exclusion_sender: None,
360
            };
361

            
362
            rpc::create_full(deps).map_err(Into::into)
363
        })
364
    };
365

            
366
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
367

            
368
    let relay_chain_slot_duration = Duration::from_secs(6);
369
    let overseer_handle = relay_chain_interface
370
        .overseer_handle()
371
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
372
    let sync_keystore = node_builder.keystore_container.keystore();
373
    let mut collate_on_tanssi: Arc<
374
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
375
    > = Arc::new(move || {
376
        if validator {
377
            panic!("Called uninitialized collate_on_tanssi");
378
        } else {
379
            panic!("Called collate_on_tanssi when node is not running as a validator");
380
        }
381
    });
382

            
383
    let announce_block = {
384
        let sync_service = node_builder.network.sync_service.clone();
385
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
386
    };
387

            
388
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
389

            
390
    start_relay_chain_tasks(StartRelayChainTasksParams {
391
        client: node_builder.client.clone(),
392
        announce_block: announce_block.clone(),
393
        para_id,
394
        relay_chain_interface: relay_chain_interface.clone(),
395
        task_manager: &mut node_builder.task_manager,
396
        da_recovery_profile: if validator {
397
            DARecoveryProfile::Collator
398
        } else {
399
            DARecoveryProfile::FullNode
400
        },
401
        import_queue: import_queue_service,
402
        relay_chain_slot_duration,
403
        recovery_handle: Box::new(overseer_handle.clone()),
404
        sync_service: node_builder.network.sync_service.clone(),
405
        prometheus_registry: node_builder.prometheus_registry.as_ref(),
406
    })?;
407

            
408
    {
409
        let StartBootnodeParams {
410
            relay_chain_fork_id,
411
            parachain_fork_id,
412
            advertise_non_global_ips,
413
            parachain_public_addresses,
414
            relay_chain_network,
415
            paranode_rx,
416
            embedded_dht_bootnode,
417
            dht_bootnode_discovery,
418
        } = start_bootnode_params.clone();
419

            
420
        // Advertise parachain bootnode address in relay chain DHT
421
        start_bootnode_tasks(StartBootnodeTasksParams {
422
            embedded_dht_bootnode,
423
            dht_bootnode_discovery,
424
            para_id,
425
            task_manager: &mut node_builder.task_manager,
426
            relay_chain_interface: relay_chain_interface.clone(),
427
            relay_chain_fork_id,
428
            relay_chain_network,
429
            request_receiver: paranode_rx,
430
            parachain_network: node_builder.network.network.clone(),
431
            advertise_non_global_ips,
432
            parachain_genesis_hash: node_builder.client.chain_info().genesis_hash,
433
            parachain_fork_id,
434
            parachain_public_addresses,
435
        });
436
    }
437

            
438
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
439
        client: node_builder.client.clone(),
440
        backend: node_builder.backend.clone(),
441
        sync_oracle: node_builder.network.sync_service.clone(),
442
        overseer_handle: overseer_handle.clone(),
443
    };
444
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
445

            
446
    if validator {
447
        let collator_key = collator_key
448
            .clone()
449
            .expect("Command line arguments do not allow this. qed");
450

            
451
        // Start task which detects para id assignment, and starts/stops container chains.
452
        // Note that if this node was started without a `container_chain_config`, we don't
453
        // support collation on container chains, so there is no need to detect changes to assignment
454
        if container_chain_config.is_some() {
455
            crate::build_check_assigned_para_id(
456
                orchestrator_chain_interface.clone(),
457
                sync_keystore.clone(),
458
                cc_spawn_tx.clone(),
459
                node_builder.task_manager.spawn_essential_handle(),
460
            );
461
        }
462

            
463
        let start_collation = {
464
            // Params for collate_on_tanssi closure
465
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
466
            let node_keystore = node_builder.keystore_container.keystore().clone();
467
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
468
            let node_client = node_builder.client.clone();
469
            let node_backend = node_builder.backend.clone();
470
            let relay_interface = relay_chain_interface.clone();
471
            let node_sync_service = node_builder.network.sync_service.clone();
472
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
473
            let overseer = overseer_handle.clone();
474
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
475
                node_spawn_handle.clone(),
476
                node_client.clone(),
477
                node_builder.transaction_pool.clone(),
478
                node_builder.prometheus_registry.as_ref(),
479
                node_telemetry_handle.clone(),
480
            );
481

            
482
            move || {
483
                start_consensus_orchestrator(
484
                    node_client.clone(),
485
                    node_backend.clone(),
486
                    block_import.clone(),
487
                    node_spawn_handle.clone(),
488
                    relay_interface.clone(),
489
                    node_sync_service.clone(),
490
                    node_keystore.clone(),
491
                    force_authoring,
492
                    relay_chain_slot_duration,
493
                    para_id,
494
                    collator_key.clone(),
495
                    overseer.clone(),
496
                    announce_block.clone(),
497
                    proposer_factory.clone(),
498
                    orchestrator_tx_pool.clone(),
499
                    max_pov_percentage,
500
                )
501
            }
502
        };
503
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
504
        collate_on_tanssi = Arc::new(start_collation);
505
    }
506

            
507
    let sync_keystore = node_builder.keystore_container.keystore();
508

            
509
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
510
        // If the orchestrator chain is running as a full-node, we start a full node for the
511
        // container chain immediately, because only collator nodes detect their container chain
512
        // assignment so otherwise it will never start.
513
        if !validator {
514
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
515
                // Spawn new container chain node
516
                cc_spawn_tx
517
                    .send(CcSpawnMsg::UpdateAssignment {
518
                        current: Some(container_chain_para_id.into()),
519
                        next: Some(container_chain_para_id.into()),
520
                    })
521
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
522
            }
523
        }
524

            
525
        // Start container chain spawner task. This will start and stop container chains on demand.
526
        let orchestrator_client = node_builder.client.clone();
527
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
528
        let spawn_handle = node_builder.task_manager.spawn_handle();
529
        let relay_chain_interface = relay_chain_interface.clone();
530
        let orchestrator_chain_interface = orchestrator_chain_interface.clone();
531

            
532
        // This considers that the container chains have the same APIs as dancebox, which
533
        // is not the case. However the spawner don't call APIs that are not part of the expected
534
        // common APIs for a container chain.
535
        // TODO: Depend on the simple container chain runtime which should be the minimal api?
536
        let container_chain_spawner = ContainerChainSpawner {
537
            params: ContainerChainSpawnParams {
538
                orchestrator_chain_interface,
539
                container_chain_cli,
540
                tokio_handle,
541
                chain_type,
542
                relay_chain,
543
                relay_chain_interface,
544
                sync_keystore,
545
                data_preserver: false,
546
                collation_params: if validator {
547
                    Some(spawner::CollationParams {
548
                        orchestrator_client: Some(orchestrator_client.clone()),
549
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
550
                        orchestrator_para_id: para_id,
551
                        collator_key: collator_key
552
                            .expect("there should be a collator key if we're a validator"),
553
                        solochain: false,
554
                    })
555
                } else {
556
                    None
557
                },
558
                spawn_handle,
559
                generate_rpc_builder:
560
                    tc_service_container_chain_spawner::rpc::GenerateSubstrateRpcBuilder::<
561
                        dancebox_runtime::RuntimeApi,
562
                    >::new(),
563
                override_sync_mode: Some(sc_cli::SyncMode::Warp),
564
                start_bootnode_params: start_bootnode_params.clone(),
565
                phantom: PhantomData,
566
            },
567
            state: Default::default(),
568
            db_folder_cleanup_done: false,
569
            collate_on_tanssi,
570
            collation_cancellation_constructs: None,
571
        };
572
        let state = container_chain_spawner.state.clone();
573

            
574
        node_builder.task_manager.spawn_essential_handle().spawn(
575
            "container-chain-spawner-rx-loop",
576
            None,
577
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
578
        );
579

            
580
        node_builder.task_manager.spawn_essential_handle().spawn(
581
            "container-chain-spawner-debug-state",
582
            None,
583
            monitor::monitor_task(state),
584
        )
585
    }
586

            
587
    Ok(ParachainNodeStarted {
588
        task_manager: node_builder.task_manager,
589
        client: node_builder.client,
590
        relay_chain_interface,
591
        orchestrator_chain_interface,
592
        keystore: node_builder.keystore_container.keystore(),
593
        start_bootnode_params,
594
    })
595
}
596

            
597
pub fn import_queue(
598
    parachain_config: &Configuration,
599
    node_builder: &NodeBuilder<NodeConfig>,
600
) -> (ParachainBlockImport, BasicQueue<Block>) {
601
    // The nimbus import queue ONLY checks the signature correctness
602
    // Any other checks corresponding to the author-correctness should be done
603
    // in the runtime
604
    let block_import =
605
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
606

            
607
    let import_queue = nimbus_consensus::import_queue(
608
        node_builder.client.clone(),
609
        block_import.clone(),
610
        move |_, _| async move {
611
            let time = sp_timestamp::InherentDataProvider::from_system_time();
612

            
613
            Ok((time,))
614
        },
615
        &node_builder.task_manager.spawn_essential_handle(),
616
        parachain_config.prometheus_registry(),
617
        false,
618
        false,
619
    )
620
    .expect("function never fails");
621

            
622
    (block_import, import_queue)
623
}
624

            
625
/// Builder for a concrete relay chain interface, created from a full node. Builds
626
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
627
///
628
/// The builder takes a [`polkadot_client::Client`]
629
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
630
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
631
struct OrchestratorChainInProcessInterfaceBuilder {
632
    client: Arc<ParachainClient>,
633
    backend: Arc<FullBackend>,
634
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
635
    overseer_handle: OverseerHandle,
636
}
637

            
638
impl OrchestratorChainInProcessInterfaceBuilder {
639
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
640
        Arc::new(OrchestratorChainInProcessInterface::new(
641
            self.client,
642
            self.backend,
643
            self.sync_oracle,
644
            self.overseer_handle,
645
        ))
646
    }
647
}
648

            
649
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
650
pub struct OrchestratorChainInProcessInterface<Client> {
651
    pub full_client: Arc<Client>,
652
    pub backend: Arc<FullBackend>,
653
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
654
    pub overseer_handle: OverseerHandle,
655
}
656

            
657
impl<Client> OrchestratorChainInProcessInterface<Client> {
658
    /// Create a new instance of [`RelayChainInProcessInterface`]
659
    pub fn new(
660
        full_client: Arc<Client>,
661
        backend: Arc<FullBackend>,
662
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
663
        overseer_handle: OverseerHandle,
664
    ) -> Self {
665
        Self {
666
            full_client,
667
            backend,
668
            sync_oracle,
669
            overseer_handle,
670
        }
671
    }
672
}
673

            
674
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
675
    fn clone(&self) -> Self {
676
        Self {
677
            full_client: self.full_client.clone(),
678
            backend: self.backend.clone(),
679
            sync_oracle: self.sync_oracle.clone(),
680
            overseer_handle: self.overseer_handle.clone(),
681
        }
682
    }
683
}
684

            
685
#[async_trait::async_trait]
686
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
687
where
688
    Client: ProvideRuntimeApi<Block>
689
        + BlockchainEvents<Block>
690
        + AuxStore
691
        + UsageProvider<Block>
692
        + Sync
693
        + Send,
694
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
695
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
696
        + RegistrarApi<Block, ParaId>
697
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
698
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
699
{
700
    async fn get_storage_by_key(
701
        &self,
702
        orchestrator_parent: PHash,
703
        key: &[u8],
704
    ) -> OrchestratorChainResult<Option<StorageValue>> {
705
        let state = self
706
            .backend
707
            .state_at(orchestrator_parent, TrieCacheContext::Untrusted)?;
708
        state
709
            .storage(key)
710
            .map_err(OrchestratorChainError::GenericError)
711
    }
712

            
713
    async fn prove_read(
714
        &self,
715
        orchestrator_parent: PHash,
716
        relevant_keys: &Vec<Vec<u8>>,
717
    ) -> OrchestratorChainResult<StorageProof> {
718
        let state_backend = self
719
            .backend
720
            .state_at(orchestrator_parent, TrieCacheContext::Untrusted)?;
721

            
722
        sp_state_machine::prove_read(state_backend, relevant_keys)
723
            .map_err(OrchestratorChainError::StateMachineError)
724
    }
725

            
726
    fn overseer_handle(&self) -> OrchestratorChainResult<OverseerHandle> {
727
        Ok(self.overseer_handle.clone())
728
    }
729

            
730
    /// Get a stream of import block notifications.
731
    async fn import_notification_stream(
732
        &self,
733
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
734
        let notification_stream = self
735
            .full_client
736
            .import_notification_stream()
737
            .map(|notification| notification.header);
738
        Ok(Box::pin(notification_stream))
739
    }
740

            
741
    /// Get a stream of new best block notifications.
742
    async fn new_best_notification_stream(
743
        &self,
744
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
745
        let notifications_stream =
746
            self.full_client
747
                .import_notification_stream()
748
                .filter_map(|notification| async move {
749
                    notification.is_new_best.then_some(notification.header)
750
                });
751
        Ok(Box::pin(notifications_stream))
752
    }
753

            
754
    /// Get a stream of finality notifications.
755
    async fn finality_notification_stream(
756
        &self,
757
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
758
        let notification_stream = self
759
            .full_client
760
            .finality_notification_stream()
761
            .map(|notification| notification.header);
762
        Ok(Box::pin(notification_stream))
763
    }
764

            
765
    async fn genesis_data(
766
        &self,
767
        orchestrator_parent: PHash,
768
        para_id: ParaId,
769
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
770
        let runtime_api = self.full_client.runtime_api();
771

            
772
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
773
    }
774

            
775
    async fn boot_nodes(
776
        &self,
777
        orchestrator_parent: PHash,
778
        para_id: ParaId,
779
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
780
        let runtime_api = self.full_client.runtime_api();
781

            
782
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
783
    }
784

            
785
    async fn latest_block_number(
786
        &self,
787
        orchestrator_parent: PHash,
788
        para_id: ParaId,
789
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
790
        let runtime_api = self.full_client.runtime_api();
791

            
792
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
793
    }
794

            
795
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
796
        Ok(self.backend.blockchain().info().best_hash)
797
    }
798

            
799
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
800
        Ok(self.backend.blockchain().info().finalized_hash)
801
    }
802

            
803
    async fn data_preserver_active_assignment(
804
        &self,
805
        orchestrator_parent: PHash,
806
        profile_id: DataPreserverProfileId,
807
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
808
        let runtime_api = self.full_client.runtime_api();
809

            
810
        use {
811
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
812
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
813
        };
814

            
815
        Ok(
816
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
817
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
818
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
819
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
820
            },
821
        )
822
    }
823

            
824
    async fn check_para_id_assignment(
825
        &self,
826
        orchestrator_parent: PHash,
827
        authority: NimbusId,
828
    ) -> OrchestratorChainResult<Option<ParaId>> {
829
        let runtime_api = self.full_client.runtime_api();
830

            
831
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
832
    }
833

            
834
    async fn check_para_id_assignment_next_session(
835
        &self,
836
        orchestrator_parent: PHash,
837
        authority: NimbusId,
838
    ) -> OrchestratorChainResult<Option<ParaId>> {
839
        let runtime_api = self.full_client.runtime_api();
840

            
841
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
842
    }
843
}