1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    crate::command::solochain::{
21
        build_solochain_config_dir, copy_zombienet_keystore, dummy_config, keystore_config,
22
    },
23
    core::marker::PhantomData,
24
    cumulus_client_cli::CollatorOptions,
25
    cumulus_client_collator::service::CollatorService,
26
    cumulus_client_consensus_proposer::Proposer,
27
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
28
    cumulus_client_service::{
29
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
30
    },
31
    cumulus_primitives_core::{
32
        relay_chain::{well_known_keys as RelayWellKnownKeys, CollatorPair},
33
        CollectCollationInfo, ParaId,
34
    },
35
    cumulus_relay_chain_interface::{call_runtime_api, OverseerHandle, RelayChainInterface},
36
    dancebox_runtime::{
37
        opaque::{Block, Hash},
38
        AccountId, RuntimeApi,
39
    },
40
    dc_orchestrator_chain_interface::{
41
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
42
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
43
        PHeader,
44
    },
45
    frame_support::__private::sp_tracing::tracing::Instrument,
46
    futures::{Stream, StreamExt},
47
    nimbus_primitives::{NimbusId, NimbusPair},
48
    node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing},
49
    pallet_author_noting_runtime_api::AuthorNotingApi,
50
    pallet_collator_assignment_runtime_api::CollatorAssignmentApi,
51
    pallet_data_preservers_runtime_api::DataPreserversApi,
52
    pallet_registrar_runtime_api::RegistrarApi,
53
    parity_scale_codec::{Decode, Encode},
54
    polkadot_cli::ProvideRuntimeApi,
55
    polkadot_parachain_primitives::primitives::HeadData,
56
    polkadot_primitives::UpgradeGoAhead,
57
    polkadot_service::Handle,
58
    sc_cli::CliConfiguration,
59
    sc_client_api::{
60
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
61
    },
62
    sc_consensus::BasicQueue,
63
    sc_network::NetworkBlock,
64
    sc_network_common::role::Role,
65
    sc_network_sync::SyncingService,
66
    sc_service::{Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend, TaskManager},
67
    sc_telemetry::TelemetryHandle,
68
    sc_transaction_pool::TransactionPoolHandle,
69
    sp_api::ApiExt,
70
    sp_api::StorageProof,
71
    sp_consensus::SyncOracle,
72
    sp_consensus_slots::Slot,
73
    sp_core::{traits::SpawnEssentialNamed, H256},
74
    sp_keystore::KeystorePtr,
75
    sp_state_machine::{Backend as StateBackend, StorageValue},
76
    std::{pin::Pin, sync::Arc, time::Duration},
77
    tc_consensus::{
78
        collators::lookahead::{
79
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
80
        },
81
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
82
    },
83
    tc_service_container_chain::{
84
        cli::ContainerChainCli,
85
        monitor,
86
        service::{
87
            DevParachainBlockImport, ParachainBlockImport, ParachainClient, ParachainExecutor,
88
            ParachainProposerFactory,
89
        },
90
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
91
    },
92
    tokio::sync::mpsc::{unbounded_channel, UnboundedSender},
93
    tokio_util::sync::CancellationToken,
94
};
95

            
96
mod mocked_relay_keys;
97

            
98
// We use this to detect whether randomness is activated
99
const RANDOMNESS_ACTIVATED_AUX_KEY: &[u8] = b"__DEV_RANDOMNESS_ACTIVATED";
100

            
101
const CONTAINER_CHAINS_EXCLUSION_AUX_KEY: &[u8] = b"__DEV_CONTAINER_CHAINS_EXCLUSION";
102

            
103
type FullBackend = TFullBackend<Block>;
104

            
105
pub struct NodeConfig;
106
impl NodeBuilderConfig for NodeConfig {
107
    type Block = Block;
108
    type RuntimeApi = RuntimeApi;
109
    type ParachainExecutor = ParachainExecutor;
110
}
111

            
112
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
113

            
114
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
115
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
116
struct MockTimestampInherentDataProvider;
117
#[async_trait::async_trait]
118
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
119
    async fn provide_inherent_data(
120
        &self,
121
        inherent_data: &mut sp_inherents::InherentData,
122
7810
    ) -> Result<(), sp_inherents::Error> {
123
7810
        TIMESTAMP.with(|x| {
124
7810
            *x.borrow_mut() += dancebox_runtime::SLOT_DURATION;
125
7810
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
126
7810
        })
127
15620
    }
128

            
129
    async fn try_handle_error(
130
        &self,
131
        _identifier: &sp_inherents::InherentIdentifier,
132
        _error: &[u8],
133
    ) -> Option<Result<(), sp_inherents::Error>> {
134
        // The pallet never reports error.
135
        None
136
    }
137
}
138

            
139
/// Background task used to detect changes to container chain assignment,
140
/// and start/stop container chains on demand. The check runs on every new block.
141
pub fn build_check_assigned_para_id(
142
    client: Arc<dyn OrchestratorChainInterface>,
143
    sync_keystore: KeystorePtr,
144
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
145
    spawner: impl SpawnEssentialNamed,
146
) {
147
    let check_assigned_para_id_task = async move {
148
        // Subscribe to new blocks in order to react to para id assignment
149
        // This must be the stream of finalized blocks, otherwise the collators may rotate to a
150
        // different chain before the block is finalized, and that could lead to a stalled chain
151
        let mut import_notifications = client.finality_notification_stream().await.unwrap();
152

            
153
        while let Some(msg) = import_notifications.next().await {
154
            let block_hash = msg.hash();
155
            let client_set_aside_for_cidp = client.clone();
156
            let sync_keystore = sync_keystore.clone();
157
            let cc_spawn_tx = cc_spawn_tx.clone();
158

            
159
            check_assigned_para_id(
160
                cc_spawn_tx,
161
                sync_keystore,
162
                client_set_aside_for_cidp,
163
                block_hash,
164
            )
165
            .await
166
            .unwrap();
167
        }
168
    };
169

            
170
    spawner.spawn_essential(
171
        "check-assigned-para-id",
172
        None,
173
        Box::pin(check_assigned_para_id_task),
174
    );
175
}
176

            
177
/// Check the parachain assignment using the orchestrator chain client, and send a `CcSpawnMsg` to
178
/// start or stop the required container chains.
179
///
180
/// Checks the assignment for the next block, so if there is a session change on block 15, this will
181
/// detect the assignment change after importing block 14.
182
async fn check_assigned_para_id(
183
    cc_spawn_tx: UnboundedSender<CcSpawnMsg>,
184
    sync_keystore: KeystorePtr,
185
    client_set_aside_for_cidp: Arc<dyn OrchestratorChainInterface>,
186
    block_hash: H256,
187
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
188
    // Check current assignment
189
    let current_container_chain_para_id =
190
        tc_consensus::first_eligible_key::<dyn OrchestratorChainInterface, NimbusPair>(
191
            client_set_aside_for_cidp.as_ref(),
192
            &block_hash,
193
            sync_keystore.clone(),
194
        )
195
        .await
196
        .map(|(_nimbus_key, para_id)| para_id);
197

            
198
    // Check assignment in the next session
199
    let next_container_chain_para_id = tc_consensus::first_eligible_key_next_session::<
200
        dyn OrchestratorChainInterface,
201
        NimbusPair,
202
    >(
203
        client_set_aside_for_cidp.as_ref(),
204
        &block_hash,
205
        sync_keystore,
206
    )
207
    .await
208
    .map(|(_nimbus_key, para_id)| para_id);
209

            
210
    cc_spawn_tx.send(CcSpawnMsg::UpdateAssignment {
211
        current: current_container_chain_para_id,
212
        next: next_container_chain_para_id,
213
    })?;
214

            
215
    Ok(())
216
}
217

            
218
pub fn import_queue(
219
    parachain_config: &Configuration,
220
    node_builder: &NodeBuilder<NodeConfig>,
221
) -> (ParachainBlockImport, BasicQueue<Block>) {
222
    // The nimbus import queue ONLY checks the signature correctness
223
    // Any other checks corresponding to the author-correctness should be done
224
    // in the runtime
225
    let block_import =
226
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
227

            
228
    let import_queue = nimbus_consensus::import_queue(
229
        node_builder.client.clone(),
230
        block_import.clone(),
231
        move |_, _| async move {
232
            let time = sp_timestamp::InherentDataProvider::from_system_time();
233

            
234
            Ok((time,))
235
        },
236
        &node_builder.task_manager.spawn_essential_handle(),
237
        parachain_config.prometheus_registry(),
238
        false,
239
        false,
240
    )
241
    .expect("function never fails");
242

            
243
    (block_import, import_queue)
244
}
245

            
246
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
247
///
248
/// This is the actual implementation that is abstract over the executor and the runtime api.
249
async fn start_node_impl(
250
    orchestrator_config: Configuration,
251
    polkadot_config: Configuration,
252
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
253
    collator_options: CollatorOptions,
254
    para_id: ParaId,
255
    hwbench: Option<sc_sysinfo::HwBench>,
256
    max_pov_percentage: Option<u32>,
257
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
258
    let parachain_config = prepare_node_config(orchestrator_config);
259
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
260
    let relay_chain = crate::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
261
        .map(|e| e.relay_chain.clone())
262
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
263

            
264
    // Channel to send messages to start/stop container chains
265
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
266

            
267
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
268
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
269

            
270
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
271

            
272
    let (relay_chain_interface, collator_key) = node_builder
273
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
274
        .await?;
275

            
276
    let validator = parachain_config.role.is_authority();
277
    let force_authoring = parachain_config.force_authoring;
278

            
279
    let node_builder = node_builder
280
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
281
            &parachain_config,
282
            para_id,
283
            import_queue,
284
            relay_chain_interface.clone(),
285
        )
286
        .await?;
287

            
288
    let rpc_builder = {
289
        let client = node_builder.client.clone();
290
        let transaction_pool = node_builder.transaction_pool.clone();
291

            
292
        Box::new(move |_| {
293
            let deps = crate::rpc::FullDeps {
294
                client: client.clone(),
295
                pool: transaction_pool.clone(),
296
                command_sink: None,
297
                xcm_senders: None,
298
                randomness_sender: None,
299
                container_chain_exclusion_sender: None,
300
            };
301

            
302
            crate::rpc::create_full(deps).map_err(Into::into)
303
        })
304
    };
305

            
306
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
307

            
308
    let relay_chain_slot_duration = Duration::from_secs(6);
309
    let overseer_handle = relay_chain_interface
310
        .overseer_handle()
311
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
312
    let sync_keystore = node_builder.keystore_container.keystore();
313
    let mut collate_on_tanssi: Arc<
314
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
315
    > = Arc::new(move || {
316
        if validator {
317
            panic!("Called uninitialized collate_on_tanssi");
318
        } else {
319
            panic!("Called collate_on_tanssi when node is not running as a validator");
320
        }
321
    });
322

            
323
    let announce_block = {
324
        let sync_service = node_builder.network.sync_service.clone();
325
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
326
    };
327

            
328
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
329

            
330
    start_relay_chain_tasks(StartRelayChainTasksParams {
331
        client: node_builder.client.clone(),
332
        announce_block: announce_block.clone(),
333
        para_id,
334
        relay_chain_interface: relay_chain_interface.clone(),
335
        task_manager: &mut node_builder.task_manager,
336
        da_recovery_profile: if validator {
337
            DARecoveryProfile::Collator
338
        } else {
339
            DARecoveryProfile::FullNode
340
        },
341
        import_queue: import_queue_service,
342
        relay_chain_slot_duration,
343
        recovery_handle: Box::new(overseer_handle.clone()),
344
        sync_service: node_builder.network.sync_service.clone(),
345
    })?;
346

            
347
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
348
        client: node_builder.client.clone(),
349
        backend: node_builder.backend.clone(),
350
        sync_oracle: node_builder.network.sync_service.clone(),
351
        overseer_handle: overseer_handle.clone(),
352
    };
353
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
354

            
355
    if validator {
356
        let collator_key = collator_key
357
            .clone()
358
            .expect("Command line arguments do not allow this. qed");
359

            
360
        // Start task which detects para id assignment, and starts/stops container chains.
361
        // Note that if this node was started without a `container_chain_config`, we don't
362
        // support collation on container chains, so there is no need to detect changes to assignment
363
        if container_chain_config.is_some() {
364
            build_check_assigned_para_id(
365
                orchestrator_chain_interface.clone(),
366
                sync_keystore.clone(),
367
                cc_spawn_tx.clone(),
368
                node_builder.task_manager.spawn_essential_handle(),
369
            );
370
        }
371

            
372
        let start_collation = {
373
            // Params for collate_on_tanssi closure
374
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
375
            let node_keystore = node_builder.keystore_container.keystore().clone();
376
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
377
            let node_client = node_builder.client.clone();
378
            let node_backend = node_builder.backend.clone();
379
            let relay_interface = relay_chain_interface.clone();
380
            let node_sync_service = node_builder.network.sync_service.clone();
381
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
382
            let overseer = overseer_handle.clone();
383
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
384
                node_spawn_handle.clone(),
385
                node_client.clone(),
386
                node_builder.transaction_pool.clone(),
387
                node_builder.prometheus_registry.as_ref(),
388
                node_telemetry_handle.clone(),
389
            );
390

            
391
            move || {
392
                start_consensus_orchestrator(
393
                    node_client.clone(),
394
                    node_backend.clone(),
395
                    block_import.clone(),
396
                    node_spawn_handle.clone(),
397
                    relay_interface.clone(),
398
                    node_sync_service.clone(),
399
                    node_keystore.clone(),
400
                    force_authoring,
401
                    relay_chain_slot_duration,
402
                    para_id,
403
                    collator_key.clone(),
404
                    overseer.clone(),
405
                    announce_block.clone(),
406
                    proposer_factory.clone(),
407
                    orchestrator_tx_pool.clone(),
408
                    max_pov_percentage,
409
                )
410
            }
411
        };
412
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
413
        collate_on_tanssi = Arc::new(start_collation);
414
    }
415

            
416
    node_builder.network.start_network.start_network();
417

            
418
    let sync_keystore = node_builder.keystore_container.keystore();
419

            
420
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
421
        // If the orchestrator chain is running as a full-node, we start a full node for the
422
        // container chain immediately, because only collator nodes detect their container chain
423
        // assignment so otherwise it will never start.
424
        if !validator {
425
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
426
                // Spawn new container chain node
427
                cc_spawn_tx
428
                    .send(CcSpawnMsg::UpdateAssignment {
429
                        current: Some(container_chain_para_id.into()),
430
                        next: Some(container_chain_para_id.into()),
431
                    })
432
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
433
            }
434
        }
435

            
436
        // Start container chain spawner task. This will start and stop container chains on demand.
437
        let orchestrator_client = node_builder.client.clone();
438
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
439
        let spawn_handle = node_builder.task_manager.spawn_handle();
440

            
441
        // This considers that the container chains have the same APIs as dancebox, which
442
        // is not the case. However the spawner don't call APIs that are not part of the expected
443
        // common APIs for a container chain.
444
        // TODO: Depend on the simple container chain runtime which should be the minimal api?
445
        let container_chain_spawner = ContainerChainSpawner {
446
            params: ContainerChainSpawnParams {
447
                orchestrator_chain_interface,
448
                container_chain_cli,
449
                tokio_handle,
450
                chain_type,
451
                relay_chain,
452
                relay_chain_interface,
453
                sync_keystore,
454
                orchestrator_para_id: para_id,
455
                data_preserver: false,
456
                collation_params: if validator {
457
                    Some(spawner::CollationParams {
458
                        orchestrator_client: Some(orchestrator_client.clone()),
459
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
460
                        orchestrator_para_id: para_id,
461
                        collator_key: collator_key
462
                            .expect("there should be a collator key if we're a validator"),
463
                        solochain: false,
464
                    })
465
                } else {
466
                    None
467
                },
468
                spawn_handle,
469
                generate_rpc_builder: tc_service_container_chain::rpc::GenerateSubstrateRpcBuilder::<
470
                    dancebox_runtime::RuntimeApi,
471
                >::new(),
472
                phantom: PhantomData,
473
            },
474
            state: Default::default(),
475
            db_folder_cleanup_done: false,
476
            collate_on_tanssi,
477
            collation_cancellation_constructs: None,
478
        };
479
        let state = container_chain_spawner.state.clone();
480

            
481
        node_builder.task_manager.spawn_essential_handle().spawn(
482
            "container-chain-spawner-rx-loop",
483
            None,
484
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
485
        );
486

            
487
        node_builder.task_manager.spawn_essential_handle().spawn(
488
            "container-chain-spawner-debug-state",
489
            None,
490
            monitor::monitor_task(state),
491
        )
492
    }
493

            
494
    Ok((node_builder.task_manager, node_builder.client))
495
}
496

            
497
/// Build the import queue for the parachain runtime (manual seal).
498
194
fn build_manual_seal_import_queue(
499
194
    _client: Arc<ParachainClient>,
500
194
    block_import: DevParachainBlockImport,
501
194
    config: &Configuration,
502
194
    _telemetry: Option<TelemetryHandle>,
503
194
    task_manager: &TaskManager,
504
194
) -> Result<sc_consensus::DefaultImportQueue<Block>, sc_service::Error> {
505
194
    Ok(sc_consensus_manual_seal::import_queue(
506
194
        Box::new(block_import),
507
194
        &task_manager.spawn_essential_handle(),
508
194
        config.prometheus_registry(),
509
194
    ))
510
194
}
511

            
512
/// Start collator task for orchestrator chain.
513
/// Returns a `CancellationToken` that can be used to cancel the collator task,
514
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
515
fn start_consensus_orchestrator(
516
    client: Arc<ParachainClient>,
517
    backend: Arc<FullBackend>,
518
    block_import: ParachainBlockImport,
519
    spawner: SpawnTaskHandle,
520
    relay_chain_interface: Arc<dyn RelayChainInterface>,
521
    sync_oracle: Arc<SyncingService<Block>>,
522
    keystore: KeystorePtr,
523
    force_authoring: bool,
524
    relay_chain_slot_duration: Duration,
525
    para_id: ParaId,
526
    collator_key: CollatorPair,
527
    overseer_handle: OverseerHandle,
528
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
529
    proposer_factory: ParachainProposerFactory,
530
    orchestrator_tx_pool: Arc<TransactionPoolHandle<Block, ParachainClient>>,
531
    max_pov_percentage: Option<u32>,
532
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
533
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
534
        .expect("start_consensus_orchestrator: slot duration should exist");
535

            
536
    let proposer = Proposer::new(proposer_factory);
537

            
538
    let collator_service = CollatorService::new(
539
        client.clone(),
540
        Arc::new(spawner.clone()),
541
        announce_block,
542
        client.clone(),
543
    );
544

            
545
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
546
    let client_set_aside_for_cidp = client.clone();
547
    let client_set_aside_for_orch = client.clone();
548
    let client_for_hash_provider = client.clone();
549
    let client_for_slot_duration_provider = client.clone();
550

            
551
    let code_hash_provider = move |block_hash| {
552
        client_for_hash_provider
553
            .code_at(block_hash)
554
            .ok()
555
            .map(polkadot_primitives::ValidationCode)
556
            .map(|c| c.hash())
557
    };
558

            
559
    let cancellation_token = CancellationToken::new();
560
    let buy_core_params = BuyCoreParams::Orchestrator {
561
        orchestrator_tx_pool,
562
        orchestrator_client: client.clone(),
563
    };
564

            
565
    let params = LookaheadTanssiAuraParams {
566
        max_pov_percentage,
567
        get_current_slot_duration: move |block_hash| {
568
            sc_consensus_aura::standalone::slot_duration_at(
569
                &*client_for_slot_duration_provider,
570
                block_hash,
571
            )
572
            .expect("Slot duration should be set")
573
        },
574
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
575
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
576
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
577
            async move {
578
                // We added a new runtime api that allows to know which parachains have
579
                // some collators assigned to them. We'll now only include those. For older
580
                // runtimes we continue to write all of them.
581
                let para_ids = match client_set_aside_for_cidp
582
                    .runtime_api()
583
                    .api_version::<dyn CollatorAssignmentApi<Block, AccountId, ParaId>>(
584
                    block_hash,
585
                )? {
586
                    Some(version) if version >= 2 => client_set_aside_for_cidp
587
                        .runtime_api()
588
                        .parachains_with_some_collators(block_hash)?,
589
                    _ => client_set_aside_for_cidp
590
                        .runtime_api()
591
                        .registered_paras(block_hash)?,
592
                };
593
                let para_ids: Vec<_> = para_ids.into_iter().collect();
594
                let author_noting_inherent =
595
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
596
                        relay_parent,
597
                        &relay_chain_interface,
598
                        &para_ids,
599
                    )
600
                    .await;
601

            
602
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
603
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
604
                    &*client_set_aside_for_cidp.clone(),
605
                    block_hash,
606
                )
607
                .expect("Slot duration should be set");
608

            
609
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
610

            
611
                let slot =
612
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
613
							*timestamp,
614
							slot_duration,
615
						);
616

            
617
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
618
                    Box::<dyn std::error::Error + Send + Sync>::from(
619
                        "Failed to create author noting inherent",
620
                    )
621
                })?;
622

            
623
                Ok((slot, timestamp, author_noting_inherent))
624
            }
625
        },
626
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
627
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
628

            
629
            async move {
630
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
631
                    client_set_aside_for_orch.as_ref(),
632
                    &block_hash,
633
                    para_id,
634
                );
635

            
636
                let authorities = authorities.ok_or_else(|| {
637
                    Box::<dyn std::error::Error + Send + Sync>::from(
638
                        "Failed to fetch authorities with error",
639
                    )
640
                })?;
641

            
642
                log::info!(
643
                    "Authorities {:?} found for header {:?}",
644
                    authorities,
645
                    block_hash
646
                );
647

            
648
                let aux_data = OrchestratorAuraWorkerAuxData {
649
                    authorities,
650
                    // This is the orchestrator consensus, it does not have a slot frequency
651
                    slot_freq: None,
652
                };
653

            
654
                Ok(aux_data)
655
            }
656
        },
657
        block_import,
658
        para_client: client,
659
        relay_client: relay_chain_interface,
660
        sync_oracle,
661
        keystore,
662
        collator_key,
663
        para_id,
664
        overseer_handle,
665
        orchestrator_slot_duration: slot_duration,
666
        relay_chain_slot_duration,
667
        force_authoring,
668
        proposer,
669
        collator_service,
670
        authoring_duration: Duration::from_millis(2000),
671
        code_hash_provider,
672
        para_backend: backend,
673
        cancellation_token: cancellation_token.clone(),
674
        buy_core_params,
675
    };
676

            
677
    let (fut, exit_notification_receiver) =
678
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
679
            params,
680
        );
681
    spawner.spawn("tanssi-aura", None, fut);
682

            
683
    (cancellation_token, exit_notification_receiver)
684
}
685

            
686
/// Start a parachain node.
687
pub async fn start_parachain_node(
688
    parachain_config: Configuration,
689
    polkadot_config: Configuration,
690
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
691
    collator_options: CollatorOptions,
692
    para_id: ParaId,
693
    hwbench: Option<sc_sysinfo::HwBench>,
694
    max_pov_percentage: Option<u32>,
695
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
696
    start_node_impl(
697
        parachain_config,
698
        polkadot_config,
699
        container_config,
700
        collator_options,
701
        para_id,
702
        hwbench,
703
        max_pov_percentage,
704
    )
705
    .instrument(sc_tracing::tracing::info_span!(
706
        sc_tracing::logging::PREFIX_LOG_SPAN,
707
        name = "Orchestrator",
708
    ))
709
    .await
710
}
711

            
712
/// Start a solochain node.
713
pub async fn start_solochain_node(
714
    polkadot_config: Configuration,
715
    container_chain_cli: ContainerChainCli,
716
    collator_options: CollatorOptions,
717
    hwbench: Option<sc_sysinfo::HwBench>,
718
) -> sc_service::error::Result<TaskManager> {
719
    let tokio_handle = polkadot_config.tokio_handle.clone();
720
    let orchestrator_para_id = Default::default();
721

            
722
    let chain_type = polkadot_config.chain_spec.chain_type().clone();
723
    let relay_chain = polkadot_config.chain_spec.id().to_string();
724

            
725
    let base_path = container_chain_cli
726
        .base
727
        .base
728
        .shared_params
729
        .base_path
730
        .as_ref()
731
        .expect("base_path is always set");
732
    let config_dir = build_solochain_config_dir(base_path);
733
    let keystore = keystore_config(container_chain_cli.keystore_params(), &config_dir)
734
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
735

            
736
    // Instead of putting keystore in
737
    // Collator1000-01/data/chains/simple_container_2000/keystore
738
    // We put it in
739
    // Collator1000-01/data/config/keystore
740
    // And same for "network" folder
741
    // But zombienet will put the keys in the old path, so we need to manually copy it if we
742
    // are running under zombienet
743
    copy_zombienet_keystore(&keystore)?;
744

            
745
    let keystore_container = KeystoreContainer::new(&keystore)?;
746

            
747
    // No metrics so no prometheus registry
748
    let prometheus_registry = None;
749
    let mut task_manager = TaskManager::new(tokio_handle.clone(), prometheus_registry)?;
750

            
751
    // Each container chain will spawn its own telemetry
752
    let telemetry_worker_handle = None;
753

            
754
    // Dummy parachain config only needed because `build_relay_chain_interface` needs to know if we
755
    // are collators or not
756
    let validator = container_chain_cli.base.collator;
757
    let mut dummy_parachain_config = dummy_config(
758
        polkadot_config.tokio_handle.clone(),
759
        polkadot_config.base_path.clone(),
760
    );
761
    dummy_parachain_config.role = if validator {
762
        Role::Authority
763
    } else {
764
        Role::Full
765
    };
766
    let (relay_chain_interface, collator_key) =
767
        cumulus_client_service::build_relay_chain_interface(
768
            polkadot_config,
769
            &dummy_parachain_config,
770
            telemetry_worker_handle.clone(),
771
            &mut task_manager,
772
            collator_options.clone(),
773
            hwbench.clone(),
774
        )
775
        .await
776
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
777

            
778
    log::info!("start_solochain_node: is validator? {}", validator);
779

            
780
    let overseer_handle = relay_chain_interface
781
        .overseer_handle()
782
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
783
    let sync_keystore = keystore_container.keystore();
784
    let collate_on_tanssi: Arc<
785
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
786
    > = Arc::new(move || {
787
        // collate_on_tanssi will not be called in solochains because solochains use a different consensus
788
        // mechanism and need validators instead of collators.
789
        // The runtime enforces this because the orchestrator_chain is never assigned any collators.
790
        panic!("Called collate_on_tanssi on solochain collator. This is unsupported and the runtime shouldn't allow this, it is a bug")
791
    });
792

            
793
    let orchestrator_chain_interface_builder = OrchestratorChainSolochainInterfaceBuilder {
794
        overseer_handle: overseer_handle.clone(),
795
        relay_chain_interface: relay_chain_interface.clone(),
796
    };
797
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
798
    // Channel to send messages to start/stop container chains
799
    let (cc_spawn_tx, cc_spawn_rx) = unbounded_channel();
800

            
801
    if validator {
802
        // Start task which detects para id assignment, and starts/stops container chains.
803
        build_check_assigned_para_id(
804
            orchestrator_chain_interface.clone(),
805
            sync_keystore.clone(),
806
            cc_spawn_tx.clone(),
807
            task_manager.spawn_essential_handle(),
808
        );
809
    }
810

            
811
    // If the orchestrator chain is running as a full-node, we start a full node for the
812
    // container chain immediately, because only collator nodes detect their container chain
813
    // assignment so otherwise it will never start.
814
    if !validator {
815
        if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
816
            // Spawn new container chain node
817
            cc_spawn_tx
818
                .send(CcSpawnMsg::UpdateAssignment {
819
                    current: Some(container_chain_para_id.into()),
820
                    next: Some(container_chain_para_id.into()),
821
                })
822
                .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
823
        }
824
    }
825

            
826
    // Start container chain spawner task. This will start and stop container chains on demand.
827
    let spawn_handle = task_manager.spawn_handle();
828

            
829
    let container_chain_spawner = ContainerChainSpawner {
830
        params: ContainerChainSpawnParams {
831
            orchestrator_chain_interface,
832
            container_chain_cli,
833
            tokio_handle,
834
            chain_type,
835
            relay_chain,
836
            relay_chain_interface,
837
            sync_keystore,
838
            orchestrator_para_id,
839
            collation_params: if validator {
840
                Some(spawner::CollationParams {
841
                    // TODO: all these args must be solochain instead of orchestrator
842
                    orchestrator_client: None,
843
                    orchestrator_tx_pool: None,
844
                    orchestrator_para_id,
845
                    collator_key: collator_key
846
                        .expect("there should be a collator key if we're a validator"),
847
                    solochain: true,
848
                })
849
            } else {
850
                None
851
            },
852
            spawn_handle,
853
            data_preserver: false,
854
            generate_rpc_builder: tc_service_container_chain::rpc::GenerateSubstrateRpcBuilder::<
855
                dancebox_runtime::RuntimeApi,
856
            >::new(),
857
            phantom: PhantomData,
858
        },
859
        state: Default::default(),
860
        db_folder_cleanup_done: false,
861
        collate_on_tanssi,
862
        collation_cancellation_constructs: None,
863
    };
864
    let state = container_chain_spawner.state.clone();
865

            
866
    task_manager.spawn_essential_handle().spawn(
867
        "container-chain-spawner-rx-loop",
868
        None,
869
        container_chain_spawner.rx_loop(cc_spawn_rx, validator, true),
870
    );
871

            
872
    task_manager.spawn_essential_handle().spawn(
873
        "container-chain-spawner-debug-state",
874
        None,
875
        monitor::monitor_task(state),
876
    );
877

            
878
    Ok(task_manager)
879
}
880

            
881
pub const SOFT_DEADLINE_PERCENT: sp_runtime::Percent = sp_runtime::Percent::from_percent(100);
882

            
883
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
884
///
885
/// This is the actual implementation that is abstract over the executor and the runtime api.
886
194
#[sc_tracing::logging::prefix_logs_with("Orchestrator Dev Node")]
887
pub fn start_dev_node(
888
    orchestrator_config: Configuration,
889
    sealing: Sealing,
890
    hwbench: Option<sc_sysinfo::HwBench>,
891
    para_id: ParaId,
892
) -> sc_service::error::Result<TaskManager> {
893
    let parachain_config = prepare_node_config(orchestrator_config);
894

            
895
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
896
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
897

            
898
    // This node block import.
899
    let block_import = DevParachainBlockImport::new(node_builder.client.clone());
900
    let import_queue = build_manual_seal_import_queue(
901
        node_builder.client.clone(),
902
        block_import.clone(),
903
        &parachain_config,
904
        node_builder
905
            .telemetry
906
            .as_ref()
907
            .map(|telemetry| telemetry.handle()),
908
        &node_builder.task_manager,
909
    )?;
910

            
911
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
912
    // the relaychain)
913
    let mut node_builder = node_builder
914
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
915
            &parachain_config,
916
            import_queue,
917
        )?;
918

            
919
    // If we're running a collator dev node we must install manual seal block
920
    // production.
921
    let mut command_sink = None;
922
    let mut xcm_senders = None;
923
    let mut randomness_sender = None;
924
    let mut container_chains_exclusion_sender = None;
925
    if parachain_config.role.is_authority() {
926
        let client = node_builder.client.clone();
927
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
928
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
929
        // Create channels for mocked parachain candidates.
930
        let (mock_randomness_sender, mock_randomness_receiver) =
931
            flume::bounded::<(bool, Option<[u8; 32]>)>(100);
932
        // Create channels for mocked exclusion of parachains from producing blocks
933
        let (mock_container_chains_exclusion_sender, mock_container_chains_exclusion_receiver) =
934
            flume::bounded::<Vec<ParaId>>(100);
935

            
936
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
937
        randomness_sender = Some(mock_randomness_sender);
938
        container_chains_exclusion_sender = Some(mock_container_chains_exclusion_sender);
939

            
940
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
941
            block_import,
942
            sealing,
943
            soft_deadline: Some(SOFT_DEADLINE_PERCENT),
944
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
945
            consensus_data_provider: Some(Box::new(
946
                tc_consensus::OrchestratorManualSealAuraConsensusDataProvider::new(
947
                    node_builder.client.clone(),
948
                    node_builder.keystore_container.keystore(),
949
                    para_id,
950
                ),
951
            )),
952
7810
            create_inherent_data_providers: move |block: H256, ()| {
953
7810
                let current_para_block = client
954
7810
                    .number(block)
955
7810
                    .expect("Header lookup should succeed")
956
7810
                    .expect("Header passed in as parent should be present in backend.");
957
7810

            
958
7810
                let mut para_ids: Vec<ParaId> = client
959
7810
                    .runtime_api()
960
7810
                    .registered_paras(block)
961
7810
                    .expect("registered_paras runtime API should exist")
962
7810
                    .into_iter()
963
7810
                    .collect();
964
7810

            
965
7810
                let hash = client
966
7810
                    .hash(current_para_block.saturating_sub(1))
967
7810
                    .expect("Hash of the desired block must be present")
968
7810
                    .expect("Hash of the desired block should exist");
969
7810

            
970
7810
                let para_header = client
971
7810
                    .expect_header(hash)
972
7810
                    .expect("Expected parachain header should exist")
973
7810
                    .encode();
974
7810

            
975
7810
                let para_head_data = HeadData(para_header).encode();
976
7810
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
977
7810
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
978
7810

            
979
7810
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
980
7810
                    &*client.clone(),
981
7810
                    block,
982
7810
                ).expect("Slot duration should be set");
983
7810

            
984
7810
                let mut timestamp = 0u64;
985
7810
                TIMESTAMP.with(|x| {
986
7810
                    timestamp = x.clone().take();
987
7810
                });
988
7810

            
989
7810
                timestamp += dancebox_runtime::SLOT_DURATION;
990
7810
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
991
7810
						timestamp.into(),
992
7810
						slot_duration,
993
7810
                    );
994
7810
                let relay_slot = u64::from(*relay_slot);
995
7810

            
996
7810
                let downward_xcm_receiver = downward_xcm_receiver.clone();
997
7810
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
998
7810

            
999
7810
                let randomness_enabler_messages: Vec<(bool, Option<[u8; 32]>)> = mock_randomness_receiver.drain().collect();
                // If there is a value to be updated, we update it
7810
                if let Some((enable_randomness, new_seed)) = randomness_enabler_messages.last() {
4
                    let value = client
4
                        .get_aux(RANDOMNESS_ACTIVATED_AUX_KEY)
4
                        .expect("Should be able to query aux storage; qed").unwrap_or((false, Option::<[u8; 32]>::None).encode());
4
                    let (_mock_additional_randomness, mut mock_randomness_seed): (bool, Option<[u8; 32]>) = Decode::decode(&mut value.as_slice()).expect("Boolean non-decodable");
4
                    if let Some(new_seed) = new_seed {
2
                        mock_randomness_seed = Some(*new_seed);
2
                    }
4
                    client
4
                    .insert_aux(
4
                        &[(RANDOMNESS_ACTIVATED_AUX_KEY, (enable_randomness, mock_randomness_seed).encode().as_slice())],
4
                        &[],
4
                    )
4
                    .expect("Should be able to write to aux storage; qed");
7806
                }
                // We read the value
                // If error when reading, we simply put false
7810
                let value = client
7810
                    .get_aux(RANDOMNESS_ACTIVATED_AUX_KEY)
7810
                    .expect("Should be able to query aux storage; qed").unwrap_or((false, Option::<[u8; 32]>::None).encode());
7810
                let (mock_additional_randomness, mock_randomness_seed): (bool, Option<[u8; 32]>) = Decode::decode(&mut value.as_slice()).expect("Boolean non-decodable");
7810

            
7810
                let container_chains_exclusion_messages: Vec<Vec<ParaId>> = mock_container_chains_exclusion_receiver.drain().collect();
                // If there is a new set of excluded container chains, we update it
7810
                if let Some(mock_excluded_container_chains) = container_chains_exclusion_messages.last() {
2
                    client
2
                        .insert_aux(
2
                            &[(CONTAINER_CHAINS_EXCLUSION_AUX_KEY, mock_excluded_container_chains.encode().as_slice())],
2
                            &[],
2
                        )
2
                        .expect("Should be able to write to aux storage; qed");
7808
                }
7810
                let new_excluded_container_chains_value = client
7810
                    .get_aux(CONTAINER_CHAINS_EXCLUSION_AUX_KEY)
7810
                    .expect("Should be able to query aux storage; qed").unwrap_or(Vec::<ParaId>::new().encode());
7810
                let mock_excluded_container_chains: Vec<ParaId> = Decode::decode(&mut new_excluded_container_chains_value.as_slice()).expect("Vector non-decodable");
15394
                para_ids.retain(|x| !mock_excluded_container_chains.contains(x));
7810
                let client_set_aside_for_cidp = client.clone();
7810
                let client_for_xcm = client.clone();
7810
                async move {
7810
                    let mocked_author_noting =
7810
                        tp_author_noting_inherent::MockAuthorNotingInherentDataProvider {
7810
                            current_para_block,
7810
                            relay_offset: 1000,
7810
                            relay_blocks_per_para_block: 2,
7810
                            para_ids,
7810
                            slots_per_para_block: 1,
7810
                        };
7810
                    let mut additional_keys = mocked_author_noting.get_key_values();
7810
                    // Mock only chain 2002 in relay.
7810
                    // This will allow any signed origin to deregister chains 2000 and 2001, and register 2002.
7810
                    let (registrar_paras_key_2002, para_info_2002) = mocked_relay_keys::get_mocked_registrar_paras(2002.into());
7810
                    additional_keys.extend([(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode()), (registrar_paras_key_2002, para_info_2002)]);
7810

            
7810
                    if mock_additional_randomness {
200
                        let mut mock_randomness: [u8; 32] = [0u8; 32];
200
                        mock_randomness[..4].copy_from_slice(&current_para_block.to_be_bytes());
200
                        if let Some(seed) = mock_randomness_seed {
3300
                            for i in 0..32 {
3200
                                mock_randomness[i] ^= seed[i];
3200
                            }
100
                        }
200
                        additional_keys.extend([(RelayWellKnownKeys::CURRENT_BLOCK_RANDOMNESS.to_vec(), Some(mock_randomness).encode())]);
200
                        log::info!("mokcing randomnessss!!! {}", current_para_block);
7610
                    }
7810
                    let current_para_head = client_set_aside_for_cidp
7810
                            .header(block)
7810
                            .expect("Header lookup should succeed")
7810
                            .expect("Header passed in as parent should be present in backend.");
7810
                    let should_send_go_ahead = match client_set_aside_for_cidp
7810
                            .runtime_api()
7810
                            .collect_collation_info(block, &current_para_head)
                    {
7810
                            Ok(info) => info.new_validation_code.is_some(),
                            Err(e) => {
                                    log::error!("Failed to collect collation info: {:?}", e);
                                    false
                            },
                    };
7810
                    let time = MockTimestampInherentDataProvider;
7810
                    let mocked_parachain = MockValidationDataInherentDataProvider {
7810
                        current_para_block,
7810
                        current_para_block_head: None,
7810
                        relay_offset: 1000,
7810
                        relay_blocks_per_para_block: 2,
7810
                        // TODO: Recheck
7810
                        para_blocks_per_relay_epoch: 10,
7810
                        relay_randomness_config: (),
7810
                        xcm_config: MockXcmConfig::new(
7810
                            &*client_for_xcm,
7810
                            block,
7810
                            Default::default(),
7810
                        ),
7810
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
7810
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
7810
                        additional_key_values: Some(additional_keys),
7810
                        para_id,
7810
                        upgrade_go_ahead: should_send_go_ahead.then(|| {
2
                            log::info!(
2
                                "Detected pending validation code, sending go-ahead signal."
                            );
2
                            UpgradeGoAhead::GoAhead
7810
                        }),
7810
                    };
7810

            
7810
                    Ok((time, mocked_parachain, mocked_author_noting))
7810
                }
7810
            },
        })?;
    }
    // This node RPC builder.
    let rpc_builder = {
        let client = node_builder.client.clone();
        let transaction_pool = node_builder.transaction_pool.clone();
388
        Box::new(move |_| {
388
            let deps = crate::rpc::FullDeps {
388
                client: client.clone(),
388
                pool: transaction_pool.clone(),
388
                command_sink: command_sink.clone(),
388
                xcm_senders: xcm_senders.clone(),
388
                randomness_sender: randomness_sender.clone(),
388
                container_chain_exclusion_sender: container_chains_exclusion_sender.clone(),
388
            };
388

            
388
            crate::rpc::create_full(deps).map_err(Into::into)
388
        })
    };
    // We spawn all the common substrate tasks to properly run a node.
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
    log::info!("Development Service Ready");
    // We start the networking part.
    node_builder.network.start_network.start_network();
    Ok(node_builder.task_manager)
}
/// Can be called for a `Configuration` to check if it is a configuration for
/// the orchestrator network.
pub trait IdentifyVariant {
    /// Returns `true` if this is a configuration for a dev network.
    fn is_dev(&self) -> bool;
}
impl IdentifyVariant for Box<dyn sc_service::ChainSpec> {
194
    fn is_dev(&self) -> bool {
194
        self.chain_type() == sc_chain_spec::ChainType::Development
194
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainInProcessInterfaceBuilder {
    client: Arc<ParachainClient>,
    backend: Arc<FullBackend>,
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    overseer_handle: Handle,
}
impl OrchestratorChainInProcessInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainInProcessInterface::new(
            self.client,
            self.backend,
            self.sync_oracle,
            self.overseer_handle,
        ))
    }
}
/// Builder for a concrete relay chain interface, created from a full node. Builds
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
///
/// The builder takes a [`polkadot_client::Client`]
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
struct OrchestratorChainSolochainInterfaceBuilder {
    overseer_handle: Handle,
    relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterfaceBuilder {
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
        Arc::new(OrchestratorChainSolochainInterface::new(
            self.overseer_handle,
            self.relay_chain_interface,
        ))
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainInProcessInterface<Client> {
    pub full_client: Arc<Client>,
    pub backend: Arc<FullBackend>,
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
    pub overseer_handle: Handle,
}
impl<Client> OrchestratorChainInProcessInterface<Client> {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        full_client: Arc<Client>,
        backend: Arc<FullBackend>,
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
        overseer_handle: Handle,
    ) -> Self {
        Self {
            full_client,
            backend,
            sync_oracle,
            overseer_handle,
        }
    }
}
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
    fn clone(&self) -> Self {
        Self {
            full_client: self.full_client.clone(),
            backend: self.backend.clone(),
            sync_oracle: self.sync_oracle.clone(),
            overseer_handle: self.overseer_handle.clone(),
        }
    }
}
#[async_trait::async_trait]
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
where
    Client: ProvideRuntimeApi<Block>
        + BlockchainEvents<Block>
        + AuxStore
        + UsageProvider<Block>
        + Sync
        + Send,
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
        + RegistrarApi<Block, ParaId>
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
{
    async fn get_storage_by_key(
        &self,
        orchestrator_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        let state = self.backend.state_at(orchestrator_parent)?;
        state
            .storage(key)
            .map_err(OrchestratorChainError::GenericError)
    }
    async fn prove_read(
        &self,
        orchestrator_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        let state_backend = self.backend.state_at(orchestrator_parent)?;
        sp_state_machine::prove_read(state_backend, relevant_keys)
            .map_err(OrchestratorChainError::StateMachineError)
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .import_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notifications_stream =
            self.full_client
                .import_notification_stream()
                .filter_map(|notification| async move {
                    notification.is_new_best.then_some(notification.header)
                });
        Ok(Box::pin(notifications_stream))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        let notification_stream = self
            .full_client
            .finality_notification_stream()
            .map(|notification| notification.header);
        Ok(Box::pin(notification_stream))
    }
    async fn genesis_data(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
    }
    async fn boot_nodes(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
    }
    async fn latest_block_number(
        &self,
        orchestrator_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().best_hash)
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        Ok(self.backend.blockchain().info().finalized_hash)
    }
    async fn data_preserver_active_assignment(
        &self,
        orchestrator_parent: PHash,
        profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        use {
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
        };
        Ok(
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
            },
        )
    }
    async fn check_para_id_assignment(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        orchestrator_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let runtime_api = self.full_client.runtime_api();
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
    }
}
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
pub struct OrchestratorChainSolochainInterface {
    pub overseer_handle: Handle,
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
}
impl OrchestratorChainSolochainInterface {
    /// Create a new instance of [`RelayChainInProcessInterface`]
    pub fn new(
        overseer_handle: Handle,
        relay_chain_interface: Arc<dyn RelayChainInterface>,
    ) -> Self {
        Self {
            overseer_handle,
            relay_chain_interface,
        }
    }
}
#[async_trait::async_trait]
impl OrchestratorChainInterface for OrchestratorChainSolochainInterface {
    async fn get_storage_by_key(
        &self,
        relay_parent: PHash,
        key: &[u8],
    ) -> OrchestratorChainResult<Option<StorageValue>> {
        self.relay_chain_interface
            .get_storage_by_key(relay_parent, key)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn prove_read(
        &self,
        relay_parent: PHash,
        relevant_keys: &Vec<Vec<u8>>,
    ) -> OrchestratorChainResult<StorageProof> {
        self.relay_chain_interface
            .prove_read(relay_parent, relevant_keys)
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
        Ok(self.overseer_handle.clone())
    }
    /// Get a stream of import block notifications.
    async fn import_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .import_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of new best block notifications.
    async fn new_best_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .new_best_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    /// Get a stream of finality notifications.
    async fn finality_notification_stream(
        &self,
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
        self.relay_chain_interface
            .finality_notification_stream()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn genesis_data(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
        let res: Option<ContainerChainGenesisData> = call_runtime_api(
            &self.relay_chain_interface,
            "RegistrarApi_genesis_data",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn boot_nodes(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
        let res: Vec<Vec<u8>> = call_runtime_api(
            &self.relay_chain_interface,
            "RegistrarApi_boot_nodes",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn latest_block_number(
        &self,
        relay_parent: PHash,
        para_id: ParaId,
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
        let res: Option<BlockNumber> = call_runtime_api(
            &self.relay_chain_interface,
            "AuthorNotingApi_latest_block_number",
            relay_parent,
            &para_id,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .best_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
        self.relay_chain_interface
            .finalized_block_hash()
            .await
            .map_err(|e| OrchestratorChainError::Application(Box::new(e)))
    }
    async fn data_preserver_active_assignment(
        &self,
        _orchestrator_parent: PHash,
        _profile_id: DataPreserverProfileId,
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
        unimplemented!("Data preserver node does not support Dancelight yet")
    }
    async fn check_para_id_assignment(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_runtime_api(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
    async fn check_para_id_assignment_next_session(
        &self,
        relay_parent: PHash,
        authority: NimbusId,
    ) -> OrchestratorChainResult<Option<ParaId>> {
        let res: Option<ParaId> = call_runtime_api(
            &self.relay_chain_interface,
            "TanssiAuthorityAssignmentApi_check_para_id_assignment_next_session",
            relay_parent,
            &authority,
        )
        .await
        .map_err(|e| OrchestratorChainError::Application(Box::new(e)))?;
        Ok(res)
    }
}