1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    crate::command::solochain::{
21
        build_solochain_config_dir, copy_zombienet_keystore, dummy_config, keystore_config,
22
    },
23
    core::marker::PhantomData,
24
    cumulus_client_cli::CollatorOptions,
25
    cumulus_client_collator::service::CollatorService,
26
    cumulus_client_consensus_proposer::Proposer,
27
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
28
    cumulus_client_service::{
29
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
30
    },
31
    cumulus_primitives_core::{
32
        relay_chain::{well_known_keys as RelayWellKnownKeys, CollatorPair},
33
        ParaId,
34
    },
35
    cumulus_relay_chain_interface::{
36
        call_remote_runtime_function, OverseerHandle, RelayChainInterface,
37
    },
38
    dancebox_runtime::{
39
        opaque::{Block, Hash},
40
        AccountId, RuntimeApi,
41
    },
42
    dc_orchestrator_chain_interface::{
43
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
44
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
45
        PHeader,
46
    },
47
    futures::{Stream, StreamExt},
48
    nimbus_primitives::{NimbusId, NimbusPair},
49
    node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing},
50
    pallet_author_noting_runtime_api::AuthorNotingApi,
51
    pallet_data_preservers_runtime_api::DataPreserversApi,
52
    pallet_registrar_runtime_api::RegistrarApi,
53
    parity_scale_codec::Encode,
54
    polkadot_cli::ProvideRuntimeApi,
55
    polkadot_parachain_primitives::primitives::HeadData,
56
    polkadot_service::Handle,
57
    sc_cli::CliConfiguration,
58
    sc_client_api::{
59
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
60
    },
61
    sc_consensus::BasicQueue,
62
    sc_network::NetworkBlock,
63
    sc_network_common::role::Role,
64
    sc_network_sync::SyncingService,
65
    sc_service::{Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend, TaskManager},
66
    sc_telemetry::TelemetryHandle,
67
    sc_transaction_pool::FullPool,
68
    sp_api::StorageProof,
69
    sp_consensus::SyncOracle,
70
    sp_consensus_slots::Slot,
71
    sp_core::{traits::SpawnEssentialNamed, H256},
72
    sp_keystore::KeystorePtr,
73
    sp_state_machine::{Backend as StateBackend, StorageValue},
74
    std::{pin::Pin, sync::Arc, time::Duration},
75
    tc_consensus::{
76
        collators::lookahead::{
77
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
78
        },
79
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
80
    },
81
    tc_service_container_chain::{
82
        cli::ContainerChainCli,
83
        monitor,
84
        service::{
85
            DevParachainBlockImport, ParachainBlockImport, ParachainClient, ParachainExecutor,
86
            ParachainProposerFactory,
87
        },
88
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
89
    },
90
    tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
91
    tokio_util::sync::CancellationToken,
92
};
93

            
94
mod mocked_relay_keys;
95

            
96
type FullBackend = TFullBackend<Block>;
97

            
98
pub struct NodeConfig;
99
impl NodeBuilderConfig for NodeConfig {
100
    type Block = Block;
101
    type RuntimeApi = RuntimeApi;
102
    type ParachainExecutor = ParachainExecutor;
103
}
104

            
105
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
106

            
107
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
108
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
109
struct MockTimestampInherentDataProvider;
110
#[async_trait::async_trait]
111
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
112
    async fn provide_inherent_data(
113
        &self,
114
        inherent_data: &mut sp_inherents::InherentData,
115
6992
    ) -> Result<(), sp_inherents::Error> {
116
6992
        TIMESTAMP.with(|x| {
117
6992
            *x.borrow_mut() += dancebox_runtime::SLOT_DURATION;
118
6992
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
119
6992
        })
120
13984
    }
121

            
122
    async fn try_handle_error(
123
        &self,
124
        _identifier: &sp_inherents::InherentIdentifier,
125
        _error: &[u8],
126
    ) -> Option<Result<(), sp_inherents::Error>> {
127
        // The pallet never reports error.
128
        None
129
    }
130
}
131

            
132
/// Background task used to detect changes to container chain assignment,
133
/// and start/stop container chains on demand. The check runs on every new block.
134
pub fn build_check_assigned_para_id(
135
    client: Arc<dyn OrchestratorChainInterface>,
136
    sync_keystore: KeystorePtr,
137
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
138
    spawner: impl SpawnEssentialNamed,
139
) {
140
    let check_assigned_para_id_task = async move {
141
        // Subscribe to new blocks in order to react to para id assignment
142
        // This must be the stream of finalized blocks, otherwise the collators may rotate to a
143
        // different chain before the block is finalized, and that could lead to a stalled chain
144
        let mut import_notifications = client.finality_notification_stream().await.unwrap();
145

            
146
        while let Some(msg) = import_notifications.next().await {
147
            let block_hash = msg.hash();
148
            let client_set_aside_for_cidp = client.clone();
149
            let sync_keystore = sync_keystore.clone();
150
            let cc_spawn_tx = cc_spawn_tx.clone();
151

            
152
            check_assigned_para_id(
153
                cc_spawn_tx,
154
                sync_keystore,
155
                client_set_aside_for_cidp,
156
                block_hash,
157
            )
158
            .await
159
            .unwrap();
160
        }
161
    };
162

            
163
    spawner.spawn_essential(
164
        "check-assigned-para-id",
165
        None,
166
        Box::pin(check_assigned_para_id_task),
167
    );
168
}
169

            
170
/// Check the parachain assignment using the orchestrator chain client, and send a `CcSpawnMsg` to
171
/// start or stop the required container chains.
172
///
173
/// Checks the assignment for the next block, so if there is a session change on block 15, this will
174
/// detect the assignment change after importing block 14.
175
async fn check_assigned_para_id(
176
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
177
    sync_keystore: KeystorePtr,
178
    client_set_aside_for_cidp: Arc<dyn OrchestratorChainInterface>,
179
    block_hash: H256,
180
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
181
    // Check current assignment
182
    let current_container_chain_para_id =
183
        tc_consensus::first_eligible_key::<dyn OrchestratorChainInterface, NimbusPair>(
184
            client_set_aside_for_cidp.as_ref(),
185
            &block_hash,
186
            sync_keystore.clone(),
187
        )
188
        .await
189
        .map(|(_nimbus_key, para_id)| para_id);
190

            
191
    // Check assignment in the next session
192
    let next_container_chain_para_id = tc_consensus::first_eligible_key_next_session::<
193
        dyn OrchestratorChainInterface,
194
        NimbusPair,
195
    >(
196
        client_set_aside_for_cidp.as_ref(),
197
        &block_hash,
198
        sync_keystore,
199
    )
200
    .await
201
    .map(|(_nimbus_key, para_id)| para_id);
202

            
203
    cc_spawn_tx.send(CcSpawnMsg::UpdateAssignment {
204
        current: current_container_chain_para_id,
205
        next: next_container_chain_para_id,
206
    })?;
207

            
208
    Ok(())
209
}
210

            
211
pub fn import_queue(
212
    parachain_config: &Configuration,
213
    node_builder: &NodeBuilder<NodeConfig>,
214
) -> (ParachainBlockImport, BasicQueue<Block>) {
215
    // The nimbus import queue ONLY checks the signature correctness
216
    // Any other checks corresponding to the author-correctness should be done
217
    // in the runtime
218
    let block_import =
219
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
220

            
221
    let import_queue = nimbus_consensus::import_queue(
222
        node_builder.client.clone(),
223
        block_import.clone(),
224
        move |_, _| async move {
225
            let time = sp_timestamp::InherentDataProvider::from_system_time();
226

            
227
            Ok((time,))
228
        },
229
        &node_builder.task_manager.spawn_essential_handle(),
230
        parachain_config.prometheus_registry(),
231
        false,
232
    )
233
    .expect("function never fails");
234

            
235
    (block_import, import_queue)
236
}
237

            
238
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
239
///
240
/// This is the actual implementation that is abstract over the executor and the runtime api.
241
#[sc_tracing::logging::prefix_logs_with("Orchestrator")]
242
async fn start_node_impl(
243
    orchestrator_config: Configuration,
244
    polkadot_config: Configuration,
245
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
246
    collator_options: CollatorOptions,
247
    para_id: ParaId,
248
    hwbench: Option<sc_sysinfo::HwBench>,
249
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
250
    let parachain_config = prepare_node_config(orchestrator_config);
251
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
252
    let relay_chain = crate::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
253
        .map(|e| e.relay_chain.clone())
254
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
255

            
256
    // Channel to send messages to start/stop container chains
257
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
258

            
259
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
260
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
261

            
262
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
263

            
264
    let (relay_chain_interface, collator_key) = node_builder
265
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
266
        .await?;
267

            
268
    let validator = parachain_config.role.is_authority();
269
    let force_authoring = parachain_config.force_authoring;
270

            
271
    let node_builder = node_builder
272
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
273
            &parachain_config,
274
            para_id,
275
            import_queue,
276
            relay_chain_interface.clone(),
277
        )
278
        .await?;
279

            
280
    let rpc_builder = {
281
        let client = node_builder.client.clone();
282
        let transaction_pool = node_builder.transaction_pool.clone();
283

            
284
        Box::new(move |deny_unsafe, _| {
285
            let deps = crate::rpc::FullDeps {
286
                client: client.clone(),
287
                pool: transaction_pool.clone(),
288
                deny_unsafe,
289
                command_sink: None,
290
                xcm_senders: None,
291
            };
292

            
293
            crate::rpc::create_full(deps).map_err(Into::into)
294
        })
295
    };
296

            
297
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
298

            
299
    let relay_chain_slot_duration = Duration::from_secs(6);
300
    let overseer_handle = relay_chain_interface
301
        .overseer_handle()
302
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
303
    let sync_keystore = node_builder.keystore_container.keystore();
304
    let mut collate_on_tanssi: Arc<
305
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
306
    > = Arc::new(move || {
307
        if validator {
308
            panic!("Called uninitialized collate_on_tanssi");
309
        } else {
310
            panic!("Called collate_on_tanssi when node is not running as a validator");
311
        }
312
    });
313

            
314
    let announce_block = {
315
        let sync_service = node_builder.network.sync_service.clone();
316
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
317
    };
318

            
319
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
320

            
321
    start_relay_chain_tasks(StartRelayChainTasksParams {
322
        client: node_builder.client.clone(),
323
        announce_block: announce_block.clone(),
324
        para_id,
325
        relay_chain_interface: relay_chain_interface.clone(),
326
        task_manager: &mut node_builder.task_manager,
327
        da_recovery_profile: if validator {
328
            DARecoveryProfile::Collator
329
        } else {
330
            DARecoveryProfile::FullNode
331
        },
332
        import_queue: import_queue_service,
333
        relay_chain_slot_duration,
334
        recovery_handle: Box::new(overseer_handle.clone()),
335
        sync_service: node_builder.network.sync_service.clone(),
336
    })?;
337

            
338
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
339
        client: node_builder.client.clone(),
340
        backend: node_builder.backend.clone(),
341
        sync_oracle: node_builder.network.sync_service.clone(),
342
        overseer_handle: overseer_handle.clone(),
343
    };
344
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
345

            
346
    if validator {
347
        let collator_key = collator_key
348
            .clone()
349
            .expect("Command line arguments do not allow this. qed");
350

            
351
        // Start task which detects para id assignment, and starts/stops container chains.
352
        // Note that if this node was started without a `container_chain_config`, we don't
353
        // support collation on container chains, so there is no need to detect changes to assignment
354
        if container_chain_config.is_some() {
355
            build_check_assigned_para_id(
356
                orchestrator_chain_interface.clone(),
357
                sync_keystore.clone(),
358
                cc_spawn_tx.clone(),
359
                node_builder.task_manager.spawn_essential_handle(),
360
            );
361
        }
362

            
363
        let start_collation = {
364
            // Params for collate_on_tanssi closure
365
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
366
            let node_keystore = node_builder.keystore_container.keystore().clone();
367
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
368
            let node_client = node_builder.client.clone();
369
            let node_backend = node_builder.backend.clone();
370
            let relay_interface = relay_chain_interface.clone();
371
            let node_sync_service = node_builder.network.sync_service.clone();
372
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
373
            let overseer = overseer_handle.clone();
374
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
375
                node_spawn_handle.clone(),
376
                node_client.clone(),
377
                node_builder.transaction_pool.clone(),
378
                node_builder.prometheus_registry.as_ref(),
379
                node_telemetry_handle.clone(),
380
            );
381

            
382
            move || {
383
                start_consensus_orchestrator(
384
                    node_client.clone(),
385
                    node_backend.clone(),
386
                    block_import.clone(),
387
                    node_spawn_handle.clone(),
388
                    relay_interface.clone(),
389
                    node_sync_service.clone(),
390
                    node_keystore.clone(),
391
                    force_authoring,
392
                    relay_chain_slot_duration,
393
                    para_id,
394
                    collator_key.clone(),
395
                    overseer.clone(),
396
                    announce_block.clone(),
397
                    proposer_factory.clone(),
398
                    orchestrator_tx_pool.clone(),
399
                )
400
            }
401
        };
402
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
403
        collate_on_tanssi = Arc::new(start_collation);
404
    }
405

            
406
    node_builder.network.start_network.start_network();
407

            
408
    let sync_keystore = node_builder.keystore_container.keystore();
409

            
410
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
411
        // If the orchestrator chain is running as a full-node, we start a full node for the
412
        // container chain immediately, because only collator nodes detect their container chain
413
        // assignment so otherwise it will never start.
414
        if !validator {
415
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
416
                // Spawn new container chain node
417
                cc_spawn_tx
418
                    .send(CcSpawnMsg::UpdateAssignment {
419
                        current: Some(container_chain_para_id.into()),
420
                        next: Some(container_chain_para_id.into()),
421
                    })
422
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
423
            }
424
        }
425

            
426
        // Start container chain spawner task. This will start and stop container chains on demand.
427
        let orchestrator_client = node_builder.client.clone();
428
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
429
        let spawn_handle = node_builder.task_manager.spawn_handle();
430

            
431
        // This considers that the container chains have the same APIs as dancebox, which
432
        // is not the case. However the spawner don't call APIs that are not part of the expected
433
        // common APIs for a container chain.
434
        // TODO: Depend on the simple container chain runtime which should be the minimal api?
435
        let container_chain_spawner = ContainerChainSpawner {
436
            params: ContainerChainSpawnParams {
437
                orchestrator_chain_interface,
438
                container_chain_cli,
439
                tokio_handle,
440
                chain_type,
441
                relay_chain,
442
                relay_chain_interface,
443
                sync_keystore,
444
                orchestrator_para_id: para_id,
445
                data_preserver: false,
446
                collation_params: if validator {
447
                    Some(spawner::CollationParams {
448
                        orchestrator_client: Some(orchestrator_client.clone()),
449
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
450
                        orchestrator_para_id: para_id,
451
                        collator_key: collator_key
452
                            .expect("there should be a collator key if we're a validator"),
453
                        solochain: false,
454
                    })
455
                } else {
456
                    None
457
                },
458
                spawn_handle,
459
                generate_rpc_builder: tc_service_container_chain::rpc::GenerateSubstrateRpcBuilder::<
460
                    dancebox_runtime::RuntimeApi,
461
                >::new(),
462
                phantom: PhantomData,
463
            },
464
            state: Default::default(),
465
            db_folder_cleanup_done: false,
466
            collate_on_tanssi,
467
            collation_cancellation_constructs: None,
468
        };
469
        let state = container_chain_spawner.state.clone();
470

            
471
        node_builder.task_manager.spawn_essential_handle().spawn(
472
            "container-chain-spawner-rx-loop",
473
            None,
474
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
475
        );
476

            
477
        node_builder.task_manager.spawn_essential_handle().spawn(
478
            "container-chain-spawner-debug-state",
479
            None,
480
            monitor::monitor_task(state),
481
        )
482
    }
483

            
484
    Ok((node_builder.task_manager, node_builder.client))
485
}
486

            
487
/// Build the import queue for the parachain runtime (manual seal).
488
178
fn build_manual_seal_import_queue(
489
178
    _client: Arc<ParachainClient>,
490
178
    block_import: DevParachainBlockImport,
491
178
    config: &Configuration,
492
178
    _telemetry: Option<TelemetryHandle>,
493
178
    task_manager: &TaskManager,
494
178
) -> Result<sc_consensus::DefaultImportQueue<Block>, sc_service::Error> {
495
178
    Ok(sc_consensus_manual_seal::import_queue(
496
178
        Box::new(block_import),
497
178
        &task_manager.spawn_essential_handle(),
498
178
        config.prometheus_registry(),
499
178
    ))
500
178
}
501

            
502
/// Start collator task for orchestrator chain.
503
/// Returns a `CancellationToken` that can be used to cancel the collator task,
504
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
505
fn start_consensus_orchestrator(
506
    client: Arc<ParachainClient>,
507
    backend: Arc<FullBackend>,
508
    block_import: ParachainBlockImport,
509
    spawner: SpawnTaskHandle,
510
    relay_chain_interface: Arc<dyn RelayChainInterface>,
511
    sync_oracle: Arc<SyncingService<Block>>,
512
    keystore: KeystorePtr,
513
    force_authoring: bool,
514
    relay_chain_slot_duration: Duration,
515
    para_id: ParaId,
516
    collator_key: CollatorPair,
517
    overseer_handle: OverseerHandle,
518
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
519
    proposer_factory: ParachainProposerFactory,
520
    orchestrator_tx_pool: Arc<FullPool<Block, ParachainClient>>,
521
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
522
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
523
        .expect("start_consensus_orchestrator: slot duration should exist");
524

            
525
    let proposer = Proposer::new(proposer_factory);
526

            
527
    let collator_service = CollatorService::new(
528
        client.clone(),
529
        Arc::new(spawner.clone()),
530
        announce_block,
531
        client.clone(),
532
    );
533

            
534
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
535
    let client_set_aside_for_cidp = client.clone();
536
    let client_set_aside_for_orch = client.clone();
537
    let client_for_hash_provider = client.clone();
538
    let client_for_slot_duration_provider = client.clone();
539

            
540
    let code_hash_provider = move |block_hash| {
541
        client_for_hash_provider
542
            .code_at(block_hash)
543
            .ok()
544
            .map(polkadot_primitives::ValidationCode)
545
            .map(|c| c.hash())
546
    };
547

            
548
    let cancellation_token = CancellationToken::new();
549
    let buy_core_params = BuyCoreParams::Orchestrator {
550
        orchestrator_tx_pool,
551
        orchestrator_client: client.clone(),
552
    };
553

            
554
    let params = LookaheadTanssiAuraParams {
555
        get_current_slot_duration: move |block_hash| {
556
            sc_consensus_aura::standalone::slot_duration_at(
557
                &*client_for_slot_duration_provider,
558
                block_hash,
559
            )
560
            .expect("Slot duration should be set")
561
        },
562
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
563
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
564
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
565
            async move {
566
                let para_ids = client_set_aside_for_cidp
567
                    .runtime_api()
568
                    .registered_paras(block_hash)?;
569
                let para_ids: Vec<_> = para_ids.into_iter().collect();
570
                let author_noting_inherent =
571
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
572
                        relay_parent,
573
                        &relay_chain_interface,
574
                        &para_ids,
575
                    )
576
                    .await;
577

            
578
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
579
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
580
                    &*client_set_aside_for_cidp.clone(),
581
                    block_hash,
582
                )
583
                .expect("Slot duration should be set");
584

            
585
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
586

            
587
                let slot =
588
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
589
							*timestamp,
590
							slot_duration,
591
						);
592

            
593
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
594
                    Box::<dyn std::error::Error + Send + Sync>::from(
595
                        "Failed to create author noting inherent",
596
                    )
597
                })?;
598

            
599
                Ok((slot, timestamp, author_noting_inherent))
600
            }
601
        },
602
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
603
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
604

            
605
            async move {
606
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
607
                    client_set_aside_for_orch.as_ref(),
608
                    &block_hash,
609
                    para_id,
610
                );
611

            
612
                let authorities = authorities.ok_or_else(|| {
613
                    Box::<dyn std::error::Error + Send + Sync>::from(
614
                        "Failed to fetch authorities with error",
615
                    )
616
                })?;
617

            
618
                log::info!(
619
                    "Authorities {:?} found for header {:?}",
620
                    authorities,
621
                    block_hash
622
                );
623

            
624
                let aux_data = OrchestratorAuraWorkerAuxData {
625
                    authorities,
626
                    // This is the orchestrator consensus, it does not have a slot frequency
627
                    slot_freq: None,
628
                };
629

            
630
                Ok(aux_data)
631
            }
632
        },
633
        block_import,
634
        para_client: client,
635
        relay_client: relay_chain_interface,
636
        sync_oracle,
637
        keystore,
638
        collator_key,
639
        para_id,
640
        overseer_handle,
641
        orchestrator_slot_duration: slot_duration,
642
        relay_chain_slot_duration,
643
        force_authoring,
644
        proposer,
645
        collator_service,
646
        authoring_duration: Duration::from_millis(2000),
647
        code_hash_provider,
648
        para_backend: backend,
649
        cancellation_token: cancellation_token.clone(),
650
        buy_core_params,
651
    };
652

            
653
    let (fut, exit_notification_receiver) =
654
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
655
            params,
656
        );
657
    spawner.spawn("tanssi-aura", None, fut);
658

            
659
    (cancellation_token, exit_notification_receiver)
660
}
661

            
662
/// Start a parachain node.
663
pub async fn start_parachain_node(
664
    parachain_config: Configuration,
665
    polkadot_config: Configuration,
666
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
667
    collator_options: CollatorOptions,
668
    para_id: ParaId,
669
    hwbench: Option<sc_sysinfo::HwBench>,
670
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
671
    start_node_impl(
672
        parachain_config,
673
        polkadot_config,
674
        container_config,
675
        collator_options,
676
        para_id,
677
        hwbench,
678
    )
679
    .await
680
}
681

            
682
/// Start a solochain node.
683
pub async fn start_solochain_node(
684
    polkadot_config: Configuration,
685
    container_chain_cli: ContainerChainCli,
686
    collator_options: CollatorOptions,
687
    hwbench: Option<sc_sysinfo::HwBench>,
688
) -> sc_service::error::Result<TaskManager> {
689
    let tokio_handle = polkadot_config.tokio_handle.clone();
690
    let orchestrator_para_id = Default::default();
691

            
692
    let chain_type = polkadot_config.chain_spec.chain_type().clone();
693
    let relay_chain = polkadot_config.chain_spec.id().to_string();
694

            
695
    let base_path = container_chain_cli
696
        .base
697
        .base
698
        .shared_params
699
        .base_path
700
        .as_ref()
701
        .expect("base_path is always set");
702
    let config_dir = build_solochain_config_dir(base_path);
703
    let keystore = keystore_config(container_chain_cli.keystore_params(), &config_dir)
704
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
705

            
706
    // Instead of putting keystore in
707
    // Collator1000-01/data/chains/simple_container_2000/keystore
708
    // We put it in
709
    // Collator1000-01/data/config/keystore
710
    // And same for "network" folder
711
    // But zombienet will put the keys in the old path, so we need to manually copy it if we
712
    // are running under zombienet
713
    copy_zombienet_keystore(&keystore)?;
714

            
715
    let keystore_container = KeystoreContainer::new(&keystore)?;
716

            
717
    // No metrics so no prometheus registry
718
    let prometheus_registry = None;
719
    let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
720

            
721
    // Each container chain will spawn its own telemetry
722
    let telemetry_worker_handle = None;
723

            
724
    // Dummy parachain config only needed because `build_relay_chain_interface` needs to know if we
725
    // are collators or not
726
    let validator = container_chain_cli.base.collator;
727
    let mut dummy_parachain_config = dummy_config(
728
        polkadot_config.tokio_handle.clone(),
729
        polkadot_config.base_path.clone(),
730
    );
731
    dummy_parachain_config.role = if validator {
732
        Role::Authority
733
    } else {
734
        Role::Full
735
    };
736
    let (relay_chain_interface, collator_key) =
737
        cumulus_client_service::build_relay_chain_interface(
738
            polkadot_config,
739
            &dummy_parachain_config,
740
            telemetry_worker_handle.clone(),
741
            &mut task_manager,
742
            collator_options.clone(),
743
            hwbench.clone(),
744
        )
745
        .await
746
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
747

            
748
    log::info!("start_solochain_node: is validator? {}", validator);
749

            
750
    let overseer_handle = relay_chain_interface
751
        .overseer_handle()
752
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
753
    let sync_keystore = keystore_container.keystore();
754
    let collate_on_tanssi: Arc<
755
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
756
    > = Arc::new(move || {
757
        // collate_on_tanssi will not be called in solochains because solochains use a different consensus
758
        // mechanism and need validators instead of collators.
759
        // The runtime enforces this because the orchestrator_chain is never assigned any collators.
760
        panic!("Called collate_on_tanssi on solochain collator. This is unsupported and the runtime shouldn't allow this, it is a bug")
761
    });
762

            
763
    let orchestrator_chain_interface_builder = OrchestratorChainSolochainInterfaceBuilder {
764
        overseer_handle: overseer_handle.clone(),
765
        relay_chain_interface: relay_chain_interface.clone(),
766
    };
767
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
768
    // Channel to send messages to start/stop container chains
769
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
770

            
771
    if validator {
772
        // Start task which detects para id assignment, and starts/stops container chains.
773
        build_check_assigned_para_id(
774
            orchestrator_chain_interface.clone(),
775
            sync_keystore.clone(),
776
            cc_spawn_tx.clone(),
777
            task_manager.spawn_essential_handle(),
778
        );
779
    }
780

            
781
    // If the orchestrator chain is running as a full-node, we start a full node for the
782
    // container chain immediately, because only collator nodes detect their container chain
783
    // assignment so otherwise it will never start.
784
    if !validator {
785
        if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
786
            // Spawn new container chain node
787
            cc_spawn_tx
788
                .send(CcSpawnMsg::UpdateAssignment {
789
                    current: Some(container_chain_para_id.into()),
790
                    next: Some(container_chain_para_id.into()),
791
                })
792
                .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
793
        }
794
    }
795

            
796
    // Start container chain spawner task. This will start and stop container chains on demand.
797
    let spawn_handle = task_manager.spawn_handle();
798

            
799
    let container_chain_spawner = ContainerChainSpawner {
800
        params: ContainerChainSpawnParams {
801
            orchestrator_chain_interface,
802
            container_chain_cli,
803
            tokio_handle,
804
            chain_type,
805
            relay_chain,
806
            relay_chain_interface,
807
            sync_keystore,
808
            orchestrator_para_id,
809
            collation_params: if validator {
810
                Some(spawner::CollationParams {
811
                    // TODO: all these args must be solochain instead of orchestrator
812
                    orchestrator_client: None,
813
                    orchestrator_tx_pool: None,
814
                    orchestrator_para_id,
815
                    collator_key: collator_key
816
                        .expect("there should be a collator key if we're a validator"),
817
                    solochain: true,
818
                })
819
            } else {
820
                None
821
            },
822
            spawn_handle,
823
            data_preserver: false,
824
            generate_rpc_builder: tc_service_container_chain::rpc::GenerateSubstrateRpcBuilder::<
825
                dancebox_runtime::RuntimeApi,
826
            >::new(),
827
            phantom: PhantomData,
828
        },
829
        state: Default::default(),
830
        db_folder_cleanup_done: false,
831
        collate_on_tanssi,
832
        collation_cancellation_constructs: None,
833
    };
834
    let state = container_chain_spawner.state.clone();
835

            
836
    task_manager.spawn_essential_handle().spawn(
837
        "container-chain-spawner-rx-loop",
838
        None,
839
        container_chain_spawner.rx_loop(cc_spawn_rx, validator, true),
840
    );
841

            
842
    task_manager.spawn_essential_handle().spawn(
843
        "container-chain-spawner-debug-state",
844
        None,
845
        monitor::monitor_task(state),
846
    );
847

            
848
    Ok(task_manager)
849
}
850

            
851
pub const SOFT_DEADLINE_PERCENT: sp_runtime::Percent = sp_runtime::Percent::from_percent(100);
852

            
853
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
854
///
855
/// This is the actual implementation that is abstract over the executor and the runtime api.
856
178
#[sc_tracing::logging::prefix_logs_with("Orchestrator Dev Node")]
857
pub fn start_dev_node(
858
    orchestrator_config: Configuration,
859
    sealing: Sealing,
860
    hwbench: Option<sc_sysinfo::HwBench>,
861
    para_id: ParaId,
862
) -> sc_service::error::Result<TaskManager> {
863
    let parachain_config = prepare_node_config(orchestrator_config);
864

            
865
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
866
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
867

            
868
    // This node block import.
869
    let block_import = DevParachainBlockImport::new(node_builder.client.clone());
870
    let import_queue = build_manual_seal_import_queue(
871
        node_builder.client.clone(),
872
        block_import.clone(),
873
        &parachain_config,
874
        node_builder
875
            .telemetry
876
            .as_ref()
877
            .map(|telemetry| telemetry.handle()),
878
        &node_builder.task_manager,
879
    )?;
880

            
881
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
882
    // the relaychain)
883
    let mut node_builder = node_builder
884
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
885
            &parachain_config,
886
            import_queue,
887
        )?;
888

            
889
    // If we're running a collator dev node we must install manual seal block
890
    // production.
891
    let mut command_sink = None;
892
    let mut xcm_senders = None;
893
    if parachain_config.role.is_authority() {
894
        let client = node_builder.client.clone();
895
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
896
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
897
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
898

            
899
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
900
            block_import,
901
            sealing,
902
            soft_deadline: Some(SOFT_DEADLINE_PERCENT),
903
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
904
            consensus_data_provider: Some(Box::new(
905
                tc_consensus::OrchestratorManualSealAuraConsensusDataProvider::new(
906
                    node_builder.client.clone(),
907
                    node_builder.keystore_container.keystore(),
908
                    para_id,
909
                ),
910
            )),
911
6992
            create_inherent_data_providers: move |block: H256, ()| {
912
6992
                let current_para_block = client
913
6992
                    .number(block)
914
6992
                    .expect("Header lookup should succeed")
915
6992
                    .expect("Header passed in as parent should be present in backend.");
916
6992

            
917
6992
                let para_ids = client
918
6992
                    .runtime_api()
919
6992
                    .registered_paras(block)
920
6992
                    .expect("registered_paras runtime API should exist")
921
6992
                    .into_iter()
922
6992
                    .collect();
923
6992

            
924
6992
                let hash = client
925
6992
                    .hash(current_para_block.saturating_sub(1))
926
6992
                    .expect("Hash of the desired block must be present")
927
6992
                    .expect("Hash of the desired block should exist");
928
6992

            
929
6992
                let para_header = client
930
6992
                    .expect_header(hash)
931
6992
                    .expect("Expected parachain header should exist")
932
6992
                    .encode();
933
6992

            
934
6992
                let para_head_data = HeadData(para_header).encode();
935
6992
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
936
6992
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
937
6992

            
938
6992
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
939
6992
                    &*client.clone(),
940
6992
                    block,
941
6992
                ).expect("Slot duration should be set");
942
6992

            
943
6992
                let mut timestamp = 0u64;
944
6992
                TIMESTAMP.with(|x| {
945
6992
                    timestamp = x.clone().take();
946
6992
                });
947
6992

            
948
6992
                timestamp += dancebox_runtime::SLOT_DURATION;
949
6992
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
950
6992
						timestamp.into(),
951
6992
						slot_duration,
952
6992
                    );
953
6992
                let relay_slot = u64::from(*relay_slot);
954
6992

            
955
6992
                let downward_xcm_receiver = downward_xcm_receiver.clone();
956
6992
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
957
6992

            
958
6992
                let client_for_xcm = client.clone();
959
6992
                async move {
960
6992
                    let mocked_author_noting =
961
6992
                        tp_author_noting_inherent::MockAuthorNotingInherentDataProvider {
962
6992
                            current_para_block,
963
6992
                            relay_offset: 1000,
964
6992
                            relay_blocks_per_para_block: 2,
965
6992
                            para_ids,
966
6992
                            slots_per_para_block: 1,
967
6992
                        };
968
6992
                    let mut additional_keys = mocked_author_noting.get_key_values();
969
6992
                    // Mock only chain 2002 in relay.
970
6992
                    // This will allow any signed origin to deregister chains 2000 and 2001, and register 2002.
971
6992
                    let (registrar_paras_key_2002, para_info_2002) = mocked_relay_keys::get_mocked_registrar_paras(2002.into());
972
6992
                    additional_keys.extend([(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode()), (registrar_paras_key_2002, para_info_2002)]);
973
6992

            
974
6992
                    let time = MockTimestampInherentDataProvider;
975
6992
                    let mocked_parachain = MockValidationDataInherentDataProvider {
976
6992
                        current_para_block,
977
6992
                        current_para_block_head: None,
978
6992
                        relay_offset: 1000,
979
6992
                        relay_blocks_per_para_block: 2,
980
6992
                        // TODO: Recheck
981
6992
                        para_blocks_per_relay_epoch: 10,
982
6992
                        relay_randomness_config: (),
983
6992
                        xcm_config: MockXcmConfig::new(
984
6992
                            &*client_for_xcm,
985
6992
                            block,
986
6992
                            Default::default(),
987
6992
                        ),
988
6992
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
989
6992
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
990
6992
                        additional_key_values: Some(additional_keys),
991
6992
                        para_id,
992
6992
                    };
993
6992

            
994
6992
                    Ok((time, mocked_parachain, mocked_author_noting))
995
6992
                }
996
6992
            },
997
        })?;
998
    }
999

            
    // This node RPC builder.
    let rpc_builder = {
        let client = node_builder.client.clone();
        let transaction_pool = node_builder.transaction_pool.clone();
356
        Box::new(move |deny_unsafe, _| {
356
            let deps = crate::rpc::FullDeps {
356
                client: client.clone(),
356
                pool: transaction_pool.clone(),
356
                deny_unsafe,
356
                command_sink: command_sink.clone(),
356
                xcm_senders: xcm_senders.clone(),
356
            };
356

            
356
            crate::rpc::create_full(deps).map_err(Into::into)
356
        })
    };
    // We spawn all the common substrate tasks to properly run a node.
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
    log::info!("Development Service Ready");
    // We start the networking part.
    node_builder.network.start_network.start_network();
    Ok(node_builder.task_manager)
}
/// Can be called for a `Configuration` to check if it is a configuration for
/// the orchestrator network.
pub trait IdentifyVariant {
    /// Returns `true` if this is a configuration for a dev network.
    fn is_dev(&self) -> bool;
}
impl IdentifyVariant for Box<dyn sc_service::ChainSpec> {
178
    fn is_dev(&self) -> bool {
178
        self.chain_type() == sc_chain_spec::ChainType::Development
178
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainInProcessInterfaceBuilder {
    client: Arc<ParachainClient>,
    backend: Arc<FullBackend>,
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    overseer_handle: Handle,
}
impl OrchestratorChainInProcessInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainInProcessInterface::new(
            self.client,
            self.backend,
            self.sync_oracle,
            self.overseer_handle,
        ))
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainSolochainInterfaceBuilder {
    overseer_handle: Handle,
    relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainSolochainInterface::new(
            self.overseer_handle,
            self.relay_chain_interface,
        ))
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainInProcessInterface<Client> {
    pub full_client: Arc<Client>,
    pub backend: Arc<FullBackend>,
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    pub overseer_handle: Handle,
}
impl<Client> OrchestratorChainInProcessInterface<Client> {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        full_client: Arc<Client>,
        backend: Arc<FullBackend>,
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
        overseer_handle: Handle,
    ) -> Self {
        Self {
            full_client,
            backend,
            sync_oracle,
            overseer_handle,
        }
    }
}
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
    fn clone(&self) -> Self {
        Self {
            full_client: self.full_client.clone(),
            backend: self.backend.clone(),
            sync_oracle: self.sync_oracle.clone(),
            overseer_handle: self.overseer_handle.clone(),
        }
    }
}
#[async_trait::async_trait]
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
where
    Client: ProvideRuntimeApi<Block>
        + BlockchainEvents<Block>
        + AuxStore
        + UsageProvider<Block>
        + Sync
        + Send,
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
        + RegistrarApi<Block, ParaId>
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
{
    async fn get_storage_by_key(
        &self,
        orchestrator_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        let state = self.backend.state_at(orchestrator_parent)?;
        state
            .storage(key)
            .map_err(OrchestratorChainError::GenericError)
    }
    async fn prove_read(
        &self,
        orchestrator_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        let state_backend = self.backend.state_at(orchestrator_parent)?;
        sp_state_machine::prove_read(state_backend, relevant_keys)
            .map_err(OrchestratorChainError::StateMachineError)
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .import_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notifications_stream =
            self.full_client
                .import_notification_stream()
                .filter_map(|notification| async move {
                    notification.is_new_best.then_some(notification.header)
                });
        Ok(Box::pin(notifications_stream))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .finality_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    async fn genesis_data(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
    }
    async fn boot_nodes(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
    }
    async fn latest_block_number(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().best_hash)
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().finalized_hash)
    }
    async fn data_preserver_active_assignment(
        &self,
        orchestrator_parent: PHash,
        profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        use {
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
        };
        Ok(
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
            },
        )
    }
    async fn check_para_id_assignment(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainSolochainInterface {
    pub overseer_handle: Handle,
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterface {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        overseer_handle: Handle,
        relay_chain_interface: Arc<dyn RelayChainInterface>,
    ) -> Self {
        Self {
            overseer_handle,
            relay_chain_interface,
        }
    }
}
#[async_trait::async_trait]
impl OrchestratorChainInterface for OrchestratorChainSolochainInterface {
    async fn get_storage_by_key(
        &self,
        relay_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        self.relay_chain_interface
            .get_storage_by_key(relay_parent, key)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn prove_read(
        &self,
        relay_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        self.relay_chain_interface
            .prove_read(relay_parent, relevant_keys)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .import_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .new_best_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .finality_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn genesis_data(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let res: Option<ContainerChainGenesisData> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "RegistrarApi_genesis_data",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn boot_nodes(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let res: Vec<Vec<u8>> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "RegistrarApi_boot_nodes",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn latest_block_number(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let res: Option<BlockNumber> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "AuthorNotingApi_latest_block_number",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .best_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .finalized_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn data_preserver_active_assignment(
        &self,
        _orchestrator_parent: PHash,
        _profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        unimplemented!("Data preserver node does not support Dancelight yet")
    }
    async fn check_para_id_assignment(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_remote_runtime_function(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment_next_session",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
}