1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    crate::command::solochain::{
21
        build_solochain_config_dir, copy_zombienet_keystore, dummy_config, keystore_config,
22
    },
23
    cumulus_client_cli::CollatorOptions,
24
    cumulus_client_collator::service::CollatorService,
25
    cumulus_client_consensus_proposer::Proposer,
26
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
27
    cumulus_client_service::{
28
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
29
    },
30
    cumulus_primitives_core::{
31
        relay_chain::{well_known_keys as RelayWellKnownKeys, CollatorPair},
32
        ParaId,
33
    },
34
    cumulus_relay_chain_interface::{
35
        call_remote_runtime_function, OverseerHandle, RelayChainInterface,
36
    },
37
    dancebox_runtime::{
38
        opaque::{Block, Hash},
39
        AccountId, RuntimeApi,
40
    },
41
    dc_orchestrator_chain_interface::{
42
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
43
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
44
        PHeader,
45
    },
46
    futures::{Stream, StreamExt},
47
    nimbus_primitives::{NimbusId, NimbusPair},
48
    node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing},
49
    pallet_author_noting_runtime_api::AuthorNotingApi,
50
    pallet_data_preservers_runtime_api::DataPreserversApi,
51
    pallet_registrar_runtime_api::RegistrarApi,
52
    parity_scale_codec::Encode,
53
    polkadot_cli::ProvideRuntimeApi,
54
    polkadot_parachain_primitives::primitives::HeadData,
55
    polkadot_service::Handle,
56
    sc_cli::CliConfiguration,
57
    sc_client_api::{
58
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
59
    },
60
    sc_consensus::BasicQueue,
61
    sc_network::NetworkBlock,
62
    sc_network_common::role::Role,
63
    sc_network_sync::SyncingService,
64
    sc_service::{Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend, TaskManager},
65
    sc_telemetry::TelemetryHandle,
66
    sc_transaction_pool::FullPool,
67
    sp_api::StorageProof,
68
    sp_consensus::SyncOracle,
69
    sp_consensus_slots::Slot,
70
    sp_core::{traits::SpawnEssentialNamed, H256},
71
    sp_keystore::KeystorePtr,
72
    sp_state_machine::{Backend as StateBackend, StorageValue},
73
    std::{pin::Pin, sync::Arc, time::Duration},
74
    tc_consensus::{
75
        collators::lookahead::{
76
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
77
        },
78
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
79
    },
80
    tc_service_container_chain::{
81
        cli::ContainerChainCli,
82
        monitor,
83
        service::{
84
            DevParachainBlockImport, ParachainBlockImport, ParachainClient, ParachainExecutor,
85
            ParachainProposerFactory,
86
        },
87
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
88
    },
89
    tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
90
    tokio_util::sync::CancellationToken,
91
};
92

            
93
mod mocked_relay_keys;
94

            
95
type FullBackend = TFullBackend<Block>;
96

            
97
pub struct NodeConfig;
98
impl NodeBuilderConfig for NodeConfig {
99
    type Block = Block;
100
    type RuntimeApi = RuntimeApi;
101
    type ParachainExecutor = ParachainExecutor;
102
}
103

            
104
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
105

            
106
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
107
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
108
struct MockTimestampInherentDataProvider;
109
#[async_trait::async_trait]
110
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
111
    async fn provide_inherent_data(
112
        &self,
113
        inherent_data: &mut sp_inherents::InherentData,
114
6992
    ) -> Result<(), sp_inherents::Error> {
115
6992
        TIMESTAMP.with(|x| {
116
6992
            *x.borrow_mut() += dancebox_runtime::SLOT_DURATION;
117
6992
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
118
6992
        })
119
13984
    }
120

            
121
    async fn try_handle_error(
122
        &self,
123
        _identifier: &sp_inherents::InherentIdentifier,
124
        _error: &[u8],
125
    ) -> Option<Result<(), sp_inherents::Error>> {
126
        // The pallet never reports error.
127
        None
128
    }
129
}
130

            
131
/// Background task used to detect changes to container chain assignment,
132
/// and start/stop container chains on demand. The check runs on every new block.
133
pub fn build_check_assigned_para_id(
134
    client: Arc<dyn OrchestratorChainInterface>,
135
    sync_keystore: KeystorePtr,
136
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
137
    spawner: impl SpawnEssentialNamed,
138
) {
139
    let check_assigned_para_id_task = async move {
140
        // Subscribe to new blocks in order to react to para id assignment
141
        // This must be the stream of finalized blocks, otherwise the collators may rotate to a
142
        // different chain before the block is finalized, and that could lead to a stalled chain
143
        let mut import_notifications = client.finality_notification_stream().await.unwrap();
144

            
145
        while let Some(msg) = import_notifications.next().await {
146
            let block_hash = msg.hash();
147
            let client_set_aside_for_cidp = client.clone();
148
            let sync_keystore = sync_keystore.clone();
149
            let cc_spawn_tx = cc_spawn_tx.clone();
150

            
151
            check_assigned_para_id(
152
                cc_spawn_tx,
153
                sync_keystore,
154
                client_set_aside_for_cidp,
155
                block_hash,
156
            )
157
            .await
158
            .unwrap();
159
        }
160
    };
161

            
162
    spawner.spawn_essential(
163
        "check-assigned-para-id",
164
        None,
165
        Box::pin(check_assigned_para_id_task),
166
    );
167
}
168

            
169
/// Check the parachain assignment using the orchestrator chain client, and send a `CcSpawnMsg` to
170
/// start or stop the required container chains.
171
///
172
/// Checks the assignment for the next block, so if there is a session change on block 15, this will
173
/// detect the assignment change after importing block 14.
174
async fn check_assigned_para_id(
175
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
176
    sync_keystore: KeystorePtr,
177
    client_set_aside_for_cidp: Arc<dyn OrchestratorChainInterface>,
178
    block_hash: H256,
179
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
180
    // Check current assignment
181
    let current_container_chain_para_id =
182
        tc_consensus::first_eligible_key::<dyn OrchestratorChainInterface, NimbusPair>(
183
            client_set_aside_for_cidp.as_ref(),
184
            &block_hash,
185
            sync_keystore.clone(),
186
        )
187
        .await
188
        .map(|(_nimbus_key, para_id)| para_id);
189

            
190
    // Check assignment in the next session
191
    let next_container_chain_para_id = tc_consensus::first_eligible_key_next_session::<
192
        dyn OrchestratorChainInterface,
193
        NimbusPair,
194
    >(
195
        client_set_aside_for_cidp.as_ref(),
196
        &block_hash,
197
        sync_keystore,
198
    )
199
    .await
200
    .map(|(_nimbus_key, para_id)| para_id);
201

            
202
    cc_spawn_tx.send(CcSpawnMsg::UpdateAssignment {
203
        current: current_container_chain_para_id,
204
        next: next_container_chain_para_id,
205
    })?;
206

            
207
    Ok(())
208
}
209

            
210
pub fn import_queue(
211
    parachain_config: &Configuration,
212
    node_builder: &NodeBuilder<NodeConfig>,
213
) -> (ParachainBlockImport, BasicQueue<Block>) {
214
    // The nimbus import queue ONLY checks the signature correctness
215
    // Any other checks corresponding to the author-correctness should be done
216
    // in the runtime
217
    let block_import =
218
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
219

            
220
    let import_queue = nimbus_consensus::import_queue(
221
        node_builder.client.clone(),
222
        block_import.clone(),
223
        move |_, _| async move {
224
            let time = sp_timestamp::InherentDataProvider::from_system_time();
225

            
226
            Ok((time,))
227
        },
228
        &node_builder.task_manager.spawn_essential_handle(),
229
        parachain_config.prometheus_registry(),
230
        false,
231
    )
232
    .expect("function never fails");
233

            
234
    (block_import, import_queue)
235
}
236

            
237
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
238
///
239
/// This is the actual implementation that is abstract over the executor and the runtime api.
240
#[sc_tracing::logging::prefix_logs_with("Orchestrator")]
241
async fn start_node_impl(
242
    orchestrator_config: Configuration,
243
    polkadot_config: Configuration,
244
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
245
    collator_options: CollatorOptions,
246
    para_id: ParaId,
247
    hwbench: Option<sc_sysinfo::HwBench>,
248
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
249
    let parachain_config = prepare_node_config(orchestrator_config);
250
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
251
    let relay_chain = crate::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
252
        .map(|e| e.relay_chain.clone())
253
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
254

            
255
    // Channel to send messages to start/stop container chains
256
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
257

            
258
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
259
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
260

            
261
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
262

            
263
    let (relay_chain_interface, collator_key) = node_builder
264
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
265
        .await?;
266

            
267
    let validator = parachain_config.role.is_authority();
268
    let force_authoring = parachain_config.force_authoring;
269

            
270
    let node_builder = node_builder
271
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
272
            &parachain_config,
273
            para_id,
274
            import_queue,
275
            relay_chain_interface.clone(),
276
        )
277
        .await?;
278

            
279
    let rpc_builder = {
280
        let client = node_builder.client.clone();
281
        let transaction_pool = node_builder.transaction_pool.clone();
282

            
283
        Box::new(move |deny_unsafe, _| {
284
            let deps = crate::rpc::FullDeps {
285
                client: client.clone(),
286
                pool: transaction_pool.clone(),
287
                deny_unsafe,
288
                command_sink: None,
289
                xcm_senders: None,
290
            };
291

            
292
            crate::rpc::create_full(deps).map_err(Into::into)
293
        })
294
    };
295

            
296
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
297

            
298
    let relay_chain_slot_duration = Duration::from_secs(6);
299
    let overseer_handle = relay_chain_interface
300
        .overseer_handle()
301
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
302
    let sync_keystore = node_builder.keystore_container.keystore();
303
    let mut collate_on_tanssi: Arc<
304
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
305
    > = Arc::new(move || {
306
        if validator {
307
            panic!("Called uninitialized collate_on_tanssi");
308
        } else {
309
            panic!("Called collate_on_tanssi when node is not running as a validator");
310
        }
311
    });
312

            
313
    let announce_block = {
314
        let sync_service = node_builder.network.sync_service.clone();
315
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
316
    };
317

            
318
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
319

            
320
    start_relay_chain_tasks(StartRelayChainTasksParams {
321
        client: node_builder.client.clone(),
322
        announce_block: announce_block.clone(),
323
        para_id,
324
        relay_chain_interface: relay_chain_interface.clone(),
325
        task_manager: &mut node_builder.task_manager,
326
        da_recovery_profile: if validator {
327
            DARecoveryProfile::Collator
328
        } else {
329
            DARecoveryProfile::FullNode
330
        },
331
        import_queue: import_queue_service,
332
        relay_chain_slot_duration,
333
        recovery_handle: Box::new(overseer_handle.clone()),
334
        sync_service: node_builder.network.sync_service.clone(),
335
    })?;
336

            
337
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
338
        client: node_builder.client.clone(),
339
        backend: node_builder.backend.clone(),
340
        sync_oracle: node_builder.network.sync_service.clone(),
341
        overseer_handle: overseer_handle.clone(),
342
    };
343
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
344

            
345
    if validator {
346
        let collator_key = collator_key
347
            .clone()
348
            .expect("Command line arguments do not allow this. qed");
349

            
350
        // Start task which detects para id assignment, and starts/stops container chains.
351
        // Note that if this node was started without a `container_chain_config`, we don't
352
        // support collation on container chains, so there is no need to detect changes to assignment
353
        if container_chain_config.is_some() {
354
            build_check_assigned_para_id(
355
                orchestrator_chain_interface.clone(),
356
                sync_keystore.clone(),
357
                cc_spawn_tx.clone(),
358
                node_builder.task_manager.spawn_essential_handle(),
359
            );
360
        }
361

            
362
        let start_collation = {
363
            // Params for collate_on_tanssi closure
364
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
365
            let node_keystore = node_builder.keystore_container.keystore().clone();
366
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
367
            let node_client = node_builder.client.clone();
368
            let node_backend = node_builder.backend.clone();
369
            let relay_interface = relay_chain_interface.clone();
370
            let node_sync_service = node_builder.network.sync_service.clone();
371
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
372
            let overseer = overseer_handle.clone();
373
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
374
                node_spawn_handle.clone(),
375
                node_client.clone(),
376
                node_builder.transaction_pool.clone(),
377
                node_builder.prometheus_registry.as_ref(),
378
                node_telemetry_handle.clone(),
379
            );
380

            
381
            move || {
382
                start_consensus_orchestrator(
383
                    node_client.clone(),
384
                    node_backend.clone(),
385
                    block_import.clone(),
386
                    node_spawn_handle.clone(),
387
                    relay_interface.clone(),
388
                    node_sync_service.clone(),
389
                    node_keystore.clone(),
390
                    force_authoring,
391
                    relay_chain_slot_duration,
392
                    para_id,
393
                    collator_key.clone(),
394
                    overseer.clone(),
395
                    announce_block.clone(),
396
                    proposer_factory.clone(),
397
                    orchestrator_tx_pool.clone(),
398
                )
399
            }
400
        };
401
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
402
        collate_on_tanssi = Arc::new(start_collation);
403
    }
404

            
405
    node_builder.network.start_network.start_network();
406

            
407
    let sync_keystore = node_builder.keystore_container.keystore();
408

            
409
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
410
        // If the orchestrator chain is running as a full-node, we start a full node for the
411
        // container chain immediately, because only collator nodes detect their container chain
412
        // assignment so otherwise it will never start.
413
        if !validator {
414
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
415
                // Spawn new container chain node
416
                cc_spawn_tx
417
                    .send(CcSpawnMsg::UpdateAssignment {
418
                        current: Some(container_chain_para_id.into()),
419
                        next: Some(container_chain_para_id.into()),
420
                    })
421
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
422
            }
423
        }
424

            
425
        // Start container chain spawner task. This will start and stop container chains on demand.
426
        let orchestrator_client = node_builder.client.clone();
427
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
428
        let spawn_handle = node_builder.task_manager.spawn_handle();
429

            
430
        let container_chain_spawner = ContainerChainSpawner {
431
            params: ContainerChainSpawnParams {
432
                orchestrator_chain_interface,
433
                container_chain_cli,
434
                tokio_handle,
435
                chain_type,
436
                relay_chain,
437
                relay_chain_interface,
438
                sync_keystore,
439
                orchestrator_para_id: para_id,
440
                data_preserver: false,
441
                collation_params: if validator {
442
                    Some(spawner::CollationParams {
443
                        orchestrator_client: Some(orchestrator_client.clone()),
444
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
445
                        orchestrator_para_id: para_id,
446
                        collator_key: collator_key
447
                            .expect("there should be a collator key if we're a validator"),
448
                        solochain: false,
449
                    })
450
                } else {
451
                    None
452
                },
453
                spawn_handle,
454
            },
455
            state: Default::default(),
456
            collate_on_tanssi,
457
            collation_cancellation_constructs: None,
458
        };
459
        let state = container_chain_spawner.state.clone();
460

            
461
        node_builder.task_manager.spawn_essential_handle().spawn(
462
            "container-chain-spawner-rx-loop",
463
            None,
464
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
465
        );
466

            
467
        node_builder.task_manager.spawn_essential_handle().spawn(
468
            "container-chain-spawner-debug-state",
469
            None,
470
            monitor::monitor_task(state),
471
        )
472
    }
473

            
474
    Ok((node_builder.task_manager, node_builder.client))
475
}
476

            
477
/// Build the import queue for the parachain runtime (manual seal).
478
178
fn build_manual_seal_import_queue(
479
178
    _client: Arc<ParachainClient>,
480
178
    block_import: DevParachainBlockImport,
481
178
    config: &Configuration,
482
178
    _telemetry: Option<TelemetryHandle>,
483
178
    task_manager: &TaskManager,
484
178
) -> Result<sc_consensus::DefaultImportQueue<Block>, sc_service::Error> {
485
178
    Ok(sc_consensus_manual_seal::import_queue(
486
178
        Box::new(block_import),
487
178
        &task_manager.spawn_essential_handle(),
488
178
        config.prometheus_registry(),
489
178
    ))
490
178
}
491

            
492
/// Start collator task for orchestrator chain.
493
/// Returns a `CancellationToken` that can be used to cancel the collator task,
494
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
495
fn start_consensus_orchestrator(
496
    client: Arc<ParachainClient>,
497
    backend: Arc<FullBackend>,
498
    block_import: ParachainBlockImport,
499
    spawner: SpawnTaskHandle,
500
    relay_chain_interface: Arc<dyn RelayChainInterface>,
501
    sync_oracle: Arc<SyncingService<Block>>,
502
    keystore: KeystorePtr,
503
    force_authoring: bool,
504
    relay_chain_slot_duration: Duration,
505
    para_id: ParaId,
506
    collator_key: CollatorPair,
507
    overseer_handle: OverseerHandle,
508
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
509
    proposer_factory: ParachainProposerFactory,
510
    orchestrator_tx_pool: Arc<FullPool<Block, ParachainClient>>,
511
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
512
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
513
        .expect("start_consensus_orchestrator: slot duration should exist");
514

            
515
    let proposer = Proposer::new(proposer_factory);
516

            
517
    let collator_service = CollatorService::new(
518
        client.clone(),
519
        Arc::new(spawner.clone()),
520
        announce_block,
521
        client.clone(),
522
    );
523

            
524
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
525
    let client_set_aside_for_cidp = client.clone();
526
    let client_set_aside_for_orch = client.clone();
527
    let client_for_hash_provider = client.clone();
528
    let client_for_slot_duration_provider = client.clone();
529

            
530
    let code_hash_provider = move |block_hash| {
531
        client_for_hash_provider
532
            .code_at(block_hash)
533
            .ok()
534
            .map(polkadot_primitives::ValidationCode)
535
            .map(|c| c.hash())
536
    };
537

            
538
    let cancellation_token = CancellationToken::new();
539
    let buy_core_params = BuyCoreParams::Orchestrator {
540
        orchestrator_tx_pool,
541
        orchestrator_client: client.clone(),
542
    };
543

            
544
    let params = LookaheadTanssiAuraParams {
545
        get_current_slot_duration: move |block_hash| {
546
            sc_consensus_aura::standalone::slot_duration_at(
547
                &*client_for_slot_duration_provider,
548
                block_hash,
549
            )
550
            .expect("Slot duration should be set")
551
        },
552
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
553
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
554
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
555
            async move {
556
                let para_ids = client_set_aside_for_cidp
557
                    .runtime_api()
558
                    .registered_paras(block_hash)?;
559
                let para_ids: Vec<_> = para_ids.into_iter().collect();
560
                let author_noting_inherent =
561
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
562
                        relay_parent,
563
                        &relay_chain_interface,
564
                        &para_ids,
565
                    )
566
                    .await;
567

            
568
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
569
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
570
                    &*client_set_aside_for_cidp.clone(),
571
                    block_hash,
572
                )
573
                .expect("Slot duration should be set");
574

            
575
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
576

            
577
                let slot =
578
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
579
							*timestamp,
580
							slot_duration,
581
						);
582

            
583
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
584
                    Box::<dyn std::error::Error + Send + Sync>::from(
585
                        "Failed to create author noting inherent",
586
                    )
587
                })?;
588

            
589
                Ok((slot, timestamp, author_noting_inherent))
590
            }
591
        },
592
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
593
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
594

            
595
            async move {
596
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
597
                    client_set_aside_for_orch.as_ref(),
598
                    &block_hash,
599
                    para_id,
600
                );
601

            
602
                let authorities = authorities.ok_or_else(|| {
603
                    Box::<dyn std::error::Error + Send + Sync>::from(
604
                        "Failed to fetch authorities with error",
605
                    )
606
                })?;
607

            
608
                log::info!(
609
                    "Authorities {:?} found for header {:?}",
610
                    authorities,
611
                    block_hash
612
                );
613

            
614
                let aux_data = OrchestratorAuraWorkerAuxData {
615
                    authorities,
616
                    // This is the orchestrator consensus, it does not have a slot frequency
617
                    slot_freq: None,
618
                };
619

            
620
                Ok(aux_data)
621
            }
622
        },
623
        block_import,
624
        para_client: client,
625
        relay_client: relay_chain_interface,
626
        sync_oracle,
627
        keystore,
628
        collator_key,
629
        para_id,
630
        overseer_handle,
631
        orchestrator_slot_duration: slot_duration,
632
        relay_chain_slot_duration,
633
        force_authoring,
634
        proposer,
635
        collator_service,
636
        authoring_duration: Duration::from_millis(2000),
637
        code_hash_provider,
638
        para_backend: backend,
639
        cancellation_token: cancellation_token.clone(),
640
        buy_core_params,
641
    };
642

            
643
    let (fut, exit_notification_receiver) =
644
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
645
            params,
646
        );
647
    spawner.spawn("tanssi-aura", None, fut);
648

            
649
    (cancellation_token, exit_notification_receiver)
650
}
651

            
652
/// Start a parachain node.
653
pub async fn start_parachain_node(
654
    parachain_config: Configuration,
655
    polkadot_config: Configuration,
656
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
657
    collator_options: CollatorOptions,
658
    para_id: ParaId,
659
    hwbench: Option<sc_sysinfo::HwBench>,
660
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
661
    start_node_impl(
662
        parachain_config,
663
        polkadot_config,
664
        container_config,
665
        collator_options,
666
        para_id,
667
        hwbench,
668
    )
669
    .await
670
}
671

            
672
/// Start a solochain node.
673
pub async fn start_solochain_node(
674
    polkadot_config: Configuration,
675
    container_chain_cli: ContainerChainCli,
676
    collator_options: CollatorOptions,
677
    hwbench: Option<sc_sysinfo::HwBench>,
678
) -> sc_service::error::Result<TaskManager> {
679
    let tokio_handle = polkadot_config.tokio_handle.clone();
680
    let orchestrator_para_id = Default::default();
681

            
682
    let chain_type = polkadot_config.chain_spec.chain_type().clone();
683
    let relay_chain = polkadot_config.chain_spec.id().to_string();
684

            
685
    let base_path = container_chain_cli
686
        .base
687
        .base
688
        .shared_params
689
        .base_path
690
        .as_ref()
691
        .expect("base_path is always set");
692
    let config_dir = build_solochain_config_dir(base_path);
693
    let keystore = keystore_config(container_chain_cli.keystore_params(), &config_dir)
694
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
695

            
696
    // Instead of putting keystore in
697
    // Collator1000-01/data/chains/simple_container_2000/keystore
698
    // We put it in
699
    // Collator1000-01/data/config/keystore
700
    // And same for "network" folder
701
    // But zombienet will put the keys in the old path, so we need to manually copy it if we
702
    // are running under zombienet
703
    copy_zombienet_keystore(&keystore)?;
704

            
705
    let keystore_container = KeystoreContainer::new(&keystore)?;
706

            
707
    // No metrics so no prometheus registry
708
    let prometheus_registry = None;
709
    let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
710

            
711
    // Each container chain will spawn its own telemetry
712
    let telemetry_worker_handle = None;
713

            
714
    // Dummy parachain config only needed because `build_relay_chain_interface` needs to know if we
715
    // are collators or not
716
    let validator = container_chain_cli.base.collator;
717
    let mut dummy_parachain_config = dummy_config(
718
        polkadot_config.tokio_handle.clone(),
719
        polkadot_config.base_path.clone(),
720
    );
721
    dummy_parachain_config.role = if validator {
722
        Role::Authority
723
    } else {
724
        Role::Full
725
    };
726
    let (relay_chain_interface, collator_key) =
727
        cumulus_client_service::build_relay_chain_interface(
728
            polkadot_config,
729
            &dummy_parachain_config,
730
            telemetry_worker_handle.clone(),
731
            &mut task_manager,
732
            collator_options.clone(),
733
            hwbench.clone(),
734
        )
735
        .await
736
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
737

            
738
    log::info!("start_solochain_node: is validator? {}", validator);
739

            
740
    let overseer_handle = relay_chain_interface
741
        .overseer_handle()
742
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
743
    let sync_keystore = keystore_container.keystore();
744
    let collate_on_tanssi: Arc<
745
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
746
    > = Arc::new(move || {
747
        // collate_on_tanssi will not be called in solochains because solochains use a different consensus
748
        // mechanism and need validators instead of collators.
749
        // The runtime enforces this because the orchestrator_chain is never assigned any collators.
750
        panic!("Called collate_on_tanssi on solochain collator. This is unsupported and the runtime shouldn't allow this, it is a bug")
751
    });
752

            
753
    let orchestrator_chain_interface_builder = OrchestratorChainSolochainInterfaceBuilder {
754
        overseer_handle: overseer_handle.clone(),
755
        relay_chain_interface: relay_chain_interface.clone(),
756
    };
757
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
758
    // Channel to send messages to start/stop container chains
759
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
760

            
761
    if validator {
762
        // Start task which detects para id assignment, and starts/stops container chains.
763
        build_check_assigned_para_id(
764
            orchestrator_chain_interface.clone(),
765
            sync_keystore.clone(),
766
            cc_spawn_tx.clone(),
767
            task_manager.spawn_essential_handle(),
768
        );
769
    }
770

            
771
    // If the orchestrator chain is running as a full-node, we start a full node for the
772
    // container chain immediately, because only collator nodes detect their container chain
773
    // assignment so otherwise it will never start.
774
    if !validator {
775
        if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
776
            // Spawn new container chain node
777
            cc_spawn_tx
778
                .send(CcSpawnMsg::UpdateAssignment {
779
                    current: Some(container_chain_para_id.into()),
780
                    next: Some(container_chain_para_id.into()),
781
                })
782
                .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
783
        }
784
    }
785

            
786
    // Start container chain spawner task. This will start and stop container chains on demand.
787
    let spawn_handle = task_manager.spawn_handle();
788

            
789
    let container_chain_spawner = ContainerChainSpawner {
790
        params: ContainerChainSpawnParams {
791
            orchestrator_chain_interface,
792
            container_chain_cli,
793
            tokio_handle,
794
            chain_type,
795
            relay_chain,
796
            relay_chain_interface,
797
            sync_keystore,
798
            orchestrator_para_id,
799
            collation_params: if validator {
800
                Some(spawner::CollationParams {
801
                    // TODO: all these args must be solochain instead of orchestrator
802
                    orchestrator_client: None,
803
                    orchestrator_tx_pool: None,
804
                    orchestrator_para_id,
805
                    collator_key: collator_key
806
                        .expect("there should be a collator key if we're a validator"),
807
                    solochain: true,
808
                })
809
            } else {
810
                None
811
            },
812
            spawn_handle,
813
            data_preserver: false,
814
        },
815
        state: Default::default(),
816
        collate_on_tanssi,
817
        collation_cancellation_constructs: None,
818
    };
819
    let state = container_chain_spawner.state.clone();
820

            
821
    task_manager.spawn_essential_handle().spawn(
822
        "container-chain-spawner-rx-loop",
823
        None,
824
        container_chain_spawner.rx_loop(cc_spawn_rx, validator, true),
825
    );
826

            
827
    task_manager.spawn_essential_handle().spawn(
828
        "container-chain-spawner-debug-state",
829
        None,
830
        monitor::monitor_task(state),
831
    );
832

            
833
    Ok(task_manager)
834
}
835

            
836
pub const SOFT_DEADLINE_PERCENT: sp_runtime::Percent = sp_runtime::Percent::from_percent(100);
837

            
838
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
839
///
840
/// This is the actual implementation that is abstract over the executor and the runtime api.
841
178
#[sc_tracing::logging::prefix_logs_with("Orchestrator Dev Node")]
842
pub fn start_dev_node(
843
    orchestrator_config: Configuration,
844
    sealing: Sealing,
845
    hwbench: Option<sc_sysinfo::HwBench>,
846
    para_id: ParaId,
847
) -> sc_service::error::Result<TaskManager> {
848
    let parachain_config = prepare_node_config(orchestrator_config);
849

            
850
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
851
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
852

            
853
    // This node block import.
854
    let block_import = DevParachainBlockImport::new(node_builder.client.clone());
855
    let import_queue = build_manual_seal_import_queue(
856
        node_builder.client.clone(),
857
        block_import.clone(),
858
        &parachain_config,
859
        node_builder
860
            .telemetry
861
            .as_ref()
862
            .map(|telemetry| telemetry.handle()),
863
        &node_builder.task_manager,
864
    )?;
865

            
866
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
867
    // the relaychain)
868
    let mut node_builder = node_builder
869
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
870
            &parachain_config,
871
            import_queue,
872
        )?;
873

            
874
    // If we're running a collator dev node we must install manual seal block
875
    // production.
876
    let mut command_sink = None;
877
    let mut xcm_senders = None;
878
    if parachain_config.role.is_authority() {
879
        let client = node_builder.client.clone();
880
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
881
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
882
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
883

            
884
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
885
            block_import,
886
            sealing,
887
            soft_deadline: Some(SOFT_DEADLINE_PERCENT),
888
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
889
            consensus_data_provider: Some(Box::new(
890
                tc_consensus::OrchestratorManualSealAuraConsensusDataProvider::new(
891
                    node_builder.client.clone(),
892
                    node_builder.keystore_container.keystore(),
893
                    para_id,
894
                ),
895
            )),
896
6992
            create_inherent_data_providers: move |block: H256, ()| {
897
6992
                let current_para_block = client
898
6992
                    .number(block)
899
6992
                    .expect("Header lookup should succeed")
900
6992
                    .expect("Header passed in as parent should be present in backend.");
901
6992

            
902
6992
                let para_ids = client
903
6992
                    .runtime_api()
904
6992
                    .registered_paras(block)
905
6992
                    .expect("registered_paras runtime API should exist")
906
6992
                    .into_iter()
907
6992
                    .collect();
908
6992

            
909
6992
                let hash = client
910
6992
                    .hash(current_para_block.saturating_sub(1))
911
6992
                    .expect("Hash of the desired block must be present")
912
6992
                    .expect("Hash of the desired block should exist");
913
6992

            
914
6992
                let para_header = client
915
6992
                    .expect_header(hash)
916
6992
                    .expect("Expected parachain header should exist")
917
6992
                    .encode();
918
6992

            
919
6992
                let para_head_data = HeadData(para_header).encode();
920
6992
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
921
6992
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
922
6992

            
923
6992
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
924
6992
                    &*client.clone(),
925
6992
                    block,
926
6992
                ).expect("Slot duration should be set");
927
6992

            
928
6992
                let mut timestamp = 0u64;
929
6992
                TIMESTAMP.with(|x| {
930
6992
                    timestamp = x.clone().take();
931
6992
                });
932
6992

            
933
6992
                timestamp += dancebox_runtime::SLOT_DURATION;
934
6992
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
935
6992
						timestamp.into(),
936
6992
						slot_duration,
937
6992
                    );
938
6992
                let relay_slot = u64::from(*relay_slot);
939
6992

            
940
6992
                let downward_xcm_receiver = downward_xcm_receiver.clone();
941
6992
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
942
6992

            
943
6992
                let client_for_xcm = client.clone();
944
6992
                async move {
945
6992
                    let mocked_author_noting =
946
6992
                        tp_author_noting_inherent::MockAuthorNotingInherentDataProvider {
947
6992
                            current_para_block,
948
6992
                            relay_offset: 1000,
949
6992
                            relay_blocks_per_para_block: 2,
950
6992
                            para_ids,
951
6992
                            slots_per_para_block: 1,
952
6992
                        };
953
6992
                    let mut additional_keys = mocked_author_noting.get_key_values();
954
6992
                    // Mock only chain 2002 in relay.
955
6992
                    // This will allow any signed origin to deregister chains 2000 and 2001, and register 2002.
956
6992
                    let (registrar_paras_key_2002, para_info_2002) = mocked_relay_keys::get_mocked_registrar_paras(2002.into());
957
6992
                    additional_keys.extend([(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode()), (registrar_paras_key_2002, para_info_2002)]);
958
6992

            
959
6992
                    let time = MockTimestampInherentDataProvider;
960
6992
                    let mocked_parachain = MockValidationDataInherentDataProvider {
961
6992
                        current_para_block,
962
6992
                        current_para_block_head: None,
963
6992
                        relay_offset: 1000,
964
6992
                        relay_blocks_per_para_block: 2,
965
6992
                        // TODO: Recheck
966
6992
                        para_blocks_per_relay_epoch: 10,
967
6992
                        relay_randomness_config: (),
968
6992
                        xcm_config: MockXcmConfig::new(
969
6992
                            &*client_for_xcm,
970
6992
                            block,
971
6992
                            Default::default(),
972
6992
                        ),
973
6992
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
974
6992
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
975
6992
                        additional_key_values: Some(additional_keys),
976
6992
                        para_id,
977
6992
                    };
978
6992

            
979
6992
                    Ok((time, mocked_parachain, mocked_author_noting))
980
6992
                }
981
6992
            },
982
        })?;
983
    }
984

            
985
    // This node RPC builder.
986
    let rpc_builder = {
987
        let client = node_builder.client.clone();
988
        let transaction_pool = node_builder.transaction_pool.clone();
989

            
990
356
        Box::new(move |deny_unsafe, _| {
991
356
            let deps = crate::rpc::FullDeps {
992
356
                client: client.clone(),
993
356
                pool: transaction_pool.clone(),
994
356
                deny_unsafe,
995
356
                command_sink: command_sink.clone(),
996
356
                xcm_senders: xcm_senders.clone(),
997
356
            };
998
356

            
999
356
            crate::rpc::create_full(deps).map_err(Into::into)
356
        })
    };
    // We spawn all the common substrate tasks to properly run a node.
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
    log::info!("Development Service Ready");
    // We start the networking part.
    node_builder.network.start_network.start_network();
    Ok(node_builder.task_manager)
}
/// Can be called for a `Configuration` to check if it is a configuration for
/// the orchestrator network.
pub trait IdentifyVariant {
    /// Returns `true` if this is a configuration for a dev network.
    fn is_dev(&self) -> bool;
}
impl IdentifyVariant for Box<dyn sc_service::ChainSpec> {
178
    fn is_dev(&self) -> bool {
178
        self.chain_type() == sc_chain_spec::ChainType::Development
178
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainInProcessInterfaceBuilder {
    client: Arc<ParachainClient>,
    backend: Arc<FullBackend>,
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    overseer_handle: Handle,
}
impl OrchestratorChainInProcessInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainInProcessInterface::new(
            self.client,
            self.backend,
            self.sync_oracle,
            self.overseer_handle,
        ))
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainSolochainInterfaceBuilder {
    overseer_handle: Handle,
    relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainSolochainInterface::new(
            self.overseer_handle,
            self.relay_chain_interface,
        ))
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainInProcessInterface<Client> {
    pub full_client: Arc<Client>,
    pub backend: Arc<FullBackend>,
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    pub overseer_handle: Handle,
}
impl<Client> OrchestratorChainInProcessInterface<Client> {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        full_client: Arc<Client>,
        backend: Arc<FullBackend>,
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
        overseer_handle: Handle,
    ) -> Self {
        Self {
            full_client,
            backend,
            sync_oracle,
            overseer_handle,
        }
    }
}
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
    fn clone(&self) -> Self {
        Self {
            full_client: self.full_client.clone(),
            backend: self.backend.clone(),
            sync_oracle: self.sync_oracle.clone(),
            overseer_handle: self.overseer_handle.clone(),
        }
    }
}
#[async_trait::async_trait]
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
where
    Client: ProvideRuntimeApi<Block>
        + BlockchainEvents<Block>
        + AuxStore
        + UsageProvider<Block>
        + Sync
        + Send,
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
        + RegistrarApi<Block, ParaId>
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
{
    async fn get_storage_by_key(
        &self,
        orchestrator_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        let state = self.backend.state_at(orchestrator_parent)?;
        state
            .storage(key)
            .map_err(OrchestratorChainError::GenericError)
    }
    async fn prove_read(
        &self,
        orchestrator_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        let state_backend = self.backend.state_at(orchestrator_parent)?;
        sp_state_machine::prove_read(state_backend, relevant_keys)
            .map_err(OrchestratorChainError::StateMachineError)
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .import_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notifications_stream =
            self.full_client
                .import_notification_stream()
                .filter_map(|notification| async move {
                    notification.is_new_best.then_some(notification.header)
                });
        Ok(Box::pin(notifications_stream))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .finality_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    async fn genesis_data(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
    }
    async fn boot_nodes(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
    }
    async fn latest_block_number(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().best_hash)
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().finalized_hash)
    }
    async fn data_preserver_active_assignment(
        &self,
        orchestrator_parent: PHash,
        profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        use {
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
        };
        Ok(
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
            },
        )
    }
    async fn check_para_id_assignment(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainSolochainInterface {
    pub overseer_handle: Handle,
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterface {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        overseer_handle: Handle,
        relay_chain_interface: Arc<dyn RelayChainInterface>,
    ) -> Self {
        Self {
            overseer_handle,
            relay_chain_interface,
        }
    }
}
#[async_trait::async_trait]
impl OrchestratorChainInterface for OrchestratorChainSolochainInterface {
    async fn get_storage_by_key(
        &self,
        relay_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        self.relay_chain_interface
            .get_storage_by_key(relay_parent, key)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn prove_read(
        &self,
        relay_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        self.relay_chain_interface
            .prove_read(relay_parent, relevant_keys)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .import_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .new_best_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .finality_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn genesis_data(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let res: Option<ContainerChainGenesisData> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "RegistrarApi_genesis_data",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn boot_nodes(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let res: Vec<Vec<u8>> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "RegistrarApi_boot_nodes",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn latest_block_number(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let res: Option<BlockNumber> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "AuthorNotingApi_latest_block_number",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .best_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .finalized_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn data_preserver_active_assignment(
        &self,
        _orchestrator_parent: PHash,
        _profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        unimplemented!("Data preserver node does not support Dancelight yet")
    }
    async fn check_para_id_assignment(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment_next_session",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
}