1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
pub mod rpc;
18

            
19
use {
20
    cumulus_client_cli::CollatorOptions,
21
    cumulus_client_collator::service::CollatorService,
22
    cumulus_client_consensus_proposer::Proposer,
23
    cumulus_client_service::{
24
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
25
    },
26
    cumulus_primitives_core::{relay_chain::CollatorPair, ParaId},
27
    cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface},
28
    dancebox_runtime::{
29
        opaque::{Block, Hash},
30
        AccountId, RuntimeApi,
31
    },
32
    dc_orchestrator_chain_interface::{
33
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
34
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
35
        PHeader,
36
    },
37
    futures::{Stream, StreamExt},
38
    nimbus_primitives::{NimbusId, NimbusPair},
39
    node_common::{service::node_builder::NodeBuilder, service::node_builder::NodeBuilderConfig},
40
    pallet_author_noting_runtime_api::AuthorNotingApi,
41
    pallet_collator_assignment_runtime_api::CollatorAssignmentApi,
42
    pallet_data_preservers_runtime_api::DataPreserversApi,
43
    pallet_registrar_runtime_api::RegistrarApi,
44
    polkadot_cli::ProvideRuntimeApi,
45
    sc_client_api::{
46
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
47
    },
48
    sc_consensus::BasicQueue,
49
    sc_network::{NetworkBackend, NetworkBlock},
50
    sc_network_sync::SyncingService,
51
    sc_service::{Configuration, SpawnTaskHandle, TaskManager},
52
    sc_tracing::tracing::Instrument,
53
    sc_transaction_pool::TransactionPoolHandle,
54
    sp_api::{ApiExt, StorageProof},
55
    sp_consensus::SyncOracle,
56
    sp_consensus_slots::Slot,
57
    sp_core::H256,
58
    sp_keystore::KeystorePtr,
59
    sp_state_machine::{Backend as StateBackend, StorageValue},
60
    std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration},
61
    tc_consensus::{
62
        collators::lookahead::{
63
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
64
        },
65
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
66
    },
67
    tc_service_container_chain_spawner::{
68
        cli::ContainerChainCli,
69
        monitor,
70
        service::{
71
            ParachainBlockImport, ParachainClient, ParachainExecutor, ParachainProposerFactory,
72
        },
73
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
74
    },
75
    tokio::sync::mpsc,
76
    tokio_util::sync::CancellationToken,
77
};
78

            
79
type FullBackend = sc_service::TFullBackend<Block>;
80

            
81
pub struct NodeConfig;
82
impl NodeBuilderConfig for NodeConfig {
83
    type Block = Block;
84
    type RuntimeApi = RuntimeApi;
85
    type ParachainExecutor = ParachainExecutor;
86
}
87

            
88
pub struct ParachainNodeStarted {
89
    pub task_manager: TaskManager,
90
    pub client: Arc<ParachainClient>,
91
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
92
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
93
    pub keystore: KeystorePtr,
94
}
95

            
96
/// Start a parachain node.
97
pub async fn start_parachain_node<Net>(
98
    parachain_config: Configuration,
99
    polkadot_config: Configuration,
100
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
101
    collator_options: CollatorOptions,
102
    para_id: ParaId,
103
    hwbench: Option<sc_sysinfo::HwBench>,
104
    max_pov_percentage: Option<u32>,
105
) -> sc_service::error::Result<ParachainNodeStarted>
106
where
107
    Net: NetworkBackend<Block, Hash>,
108
{
109
    start_node_impl::<Net>(
110
        parachain_config,
111
        polkadot_config,
112
        container_config,
113
        collator_options,
114
        para_id,
115
        hwbench,
116
        max_pov_percentage,
117
    )
118
    .instrument(sc_tracing::tracing::info_span!(
119
        sc_tracing::logging::PREFIX_LOG_SPAN,
120
        name = "Orchestrator",
121
    ))
122
    .await
123
}
124

            
125
/// Start collator task for orchestrator chain.
126
/// Returns a `CancellationToken` that can be used to cancel the collator task,
127
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
128
fn start_consensus_orchestrator(
129
    client: Arc<ParachainClient>,
130
    backend: Arc<FullBackend>,
131
    block_import: ParachainBlockImport,
132
    spawner: SpawnTaskHandle,
133
    relay_chain_interface: Arc<dyn RelayChainInterface>,
134
    sync_oracle: Arc<SyncingService<Block>>,
135
    keystore: KeystorePtr,
136
    force_authoring: bool,
137
    relay_chain_slot_duration: Duration,
138
    para_id: ParaId,
139
    collator_key: CollatorPair,
140
    overseer_handle: OverseerHandle,
141
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
142
    proposer_factory: ParachainProposerFactory,
143
    orchestrator_tx_pool: Arc<TransactionPoolHandle<Block, ParachainClient>>,
144
    max_pov_percentage: Option<u32>,
145
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
146
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
147
        .expect("start_consensus_orchestrator: slot duration should exist");
148

            
149
    let proposer = Proposer::new(proposer_factory);
150

            
151
    let collator_service = CollatorService::new(
152
        client.clone(),
153
        Arc::new(spawner.clone()),
154
        announce_block,
155
        client.clone(),
156
    );
157

            
158
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
159
    let client_set_aside_for_cidp = client.clone();
160
    let client_set_aside_for_orch = client.clone();
161
    let client_for_hash_provider = client.clone();
162
    let client_for_slot_duration_provider = client.clone();
163

            
164
    let code_hash_provider = move |block_hash| {
165
        client_for_hash_provider
166
            .code_at(block_hash)
167
            .ok()
168
            .map(polkadot_primitives::ValidationCode)
169
            .map(|c| c.hash())
170
    };
171

            
172
    let cancellation_token = CancellationToken::new();
173
    let buy_core_params = BuyCoreParams::Orchestrator {
174
        orchestrator_tx_pool,
175
        orchestrator_client: client.clone(),
176
    };
177

            
178
    let params = LookaheadTanssiAuraParams {
179
        max_pov_percentage,
180
        get_current_slot_duration: move |block_hash| {
181
            sc_consensus_aura::standalone::slot_duration_at(
182
                &*client_for_slot_duration_provider,
183
                block_hash,
184
            )
185
            .expect("Slot duration should be set")
186
        },
187
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
188
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
189
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
190
            async move {
191
                // We added a new runtime api that allows to know which parachains have
192
                // some collators assigned to them. We'll now only include those. For older
193
                // runtimes we continue to write all of them.
194
                let para_ids = match client_set_aside_for_cidp
195
                    .runtime_api()
196
                    .api_version::<dyn CollatorAssignmentApi<Block, AccountId, ParaId>>(
197
                    block_hash,
198
                )? {
199
                    Some(version) if version >= 2 => client_set_aside_for_cidp
200
                        .runtime_api()
201
                        .parachains_with_some_collators(block_hash)?,
202
                    _ => client_set_aside_for_cidp
203
                        .runtime_api()
204
                        .registered_paras(block_hash)?,
205
                };
206
                let para_ids: Vec<_> = para_ids.into_iter().collect();
207
                let author_noting_inherent =
208
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
209
                        relay_parent,
210
                        &relay_chain_interface,
211
                        &para_ids,
212
                    )
213
                    .await;
214

            
215
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
216
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
217
                    &*client_set_aside_for_cidp.clone(),
218
                    block_hash,
219
                )
220
                .expect("Slot duration should be set");
221

            
222
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
223

            
224
                let slot =
225
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
226
							*timestamp,
227
							slot_duration,
228
						);
229

            
230
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
231
                    Box::<dyn std::error::Error + Send + Sync>::from(
232
                        "Failed to create author noting inherent",
233
                    )
234
                })?;
235

            
236
                Ok((slot, timestamp, author_noting_inherent))
237
            }
238
        },
239
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
240
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
241

            
242
            async move {
243
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
244
                    client_set_aside_for_orch.as_ref(),
245
                    &block_hash,
246
                    para_id,
247
                );
248

            
249
                let authorities = authorities.ok_or_else(|| {
250
                    Box::<dyn std::error::Error + Send + Sync>::from(
251
                        "Failed to fetch authorities with error",
252
                    )
253
                })?;
254

            
255
                log::info!(
256
                    "Authorities {:?} found for header {:?}",
257
                    authorities,
258
                    block_hash
259
                );
260

            
261
                let aux_data = OrchestratorAuraWorkerAuxData {
262
                    authorities,
263
                    // This is the orchestrator consensus, it does not have a slot frequency
264
                    slot_freq: None,
265
                };
266

            
267
                Ok(aux_data)
268
            }
269
        },
270
        block_import,
271
        para_client: client,
272
        relay_client: relay_chain_interface,
273
        sync_oracle,
274
        keystore,
275
        collator_key,
276
        para_id,
277
        overseer_handle,
278
        orchestrator_slot_duration: slot_duration,
279
        relay_chain_slot_duration,
280
        force_authoring,
281
        proposer,
282
        collator_service,
283
        authoring_duration: Duration::from_millis(2000),
284
        code_hash_provider,
285
        para_backend: backend,
286
        cancellation_token: cancellation_token.clone(),
287
        buy_core_params,
288
    };
289

            
290
    let (fut, exit_notification_receiver) =
291
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
292
            params,
293
        );
294
    spawner.spawn("tanssi-aura", None, fut);
295

            
296
    (cancellation_token, exit_notification_receiver)
297
}
298

            
299
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
300
///
301
/// This is the actual implementation that is abstract over the executor and the runtime api.
302
async fn start_node_impl<Net>(
303
    orchestrator_config: Configuration,
304
    polkadot_config: Configuration,
305
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
306
    collator_options: CollatorOptions,
307
    para_id: ParaId,
308
    hwbench: Option<sc_sysinfo::HwBench>,
309
    max_pov_percentage: Option<u32>,
310
) -> sc_service::error::Result<ParachainNodeStarted>
311
where
312
    Net: NetworkBackend<Block, Hash>,
313
{
314
    let parachain_config = prepare_node_config(orchestrator_config);
315
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
316
    let relay_chain = node_common::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
317
        .map(|e| e.relay_chain.clone())
318
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
319

            
320
    // Channel to send messages to start/stop container chains
321
    let (cc_spawn_tx, cc_spawn_rx) = mpsc::unbounded_channel();
322

            
323
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
324
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
325

            
326
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
327

            
328
    let (relay_chain_interface, collator_key) = node_builder
329
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
330
        .await?;
331

            
332
    let validator = parachain_config.role.is_authority();
333
    let force_authoring = parachain_config.force_authoring;
334

            
335
    let node_builder = node_builder
336
        .build_cumulus_network::<_, Net>(
337
            &parachain_config,
338
            para_id,
339
            import_queue,
340
            relay_chain_interface.clone(),
341
        )
342
        .await?;
343

            
344
    let rpc_builder = {
345
        let client = node_builder.client.clone();
346
        let transaction_pool = node_builder.transaction_pool.clone();
347

            
348
        Box::new(move |_| {
349
            let deps = rpc::FullDeps {
350
                client: client.clone(),
351
                pool: transaction_pool.clone(),
352
                command_sink: None,
353
                xcm_senders: None,
354
                randomness_sender: None,
355
                container_chain_exclusion_sender: None,
356
            };
357

            
358
            rpc::create_full(deps).map_err(Into::into)
359
        })
360
    };
361

            
362
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
363

            
364
    let relay_chain_slot_duration = Duration::from_secs(6);
365
    let overseer_handle = relay_chain_interface
366
        .overseer_handle()
367
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
368
    let sync_keystore = node_builder.keystore_container.keystore();
369
    let mut collate_on_tanssi: Arc<
370
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
371
    > = Arc::new(move || {
372
        if validator {
373
            panic!("Called uninitialized collate_on_tanssi");
374
        } else {
375
            panic!("Called collate_on_tanssi when node is not running as a validator");
376
        }
377
    });
378

            
379
    let announce_block = {
380
        let sync_service = node_builder.network.sync_service.clone();
381
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
382
    };
383

            
384
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
385

            
386
    start_relay_chain_tasks(StartRelayChainTasksParams {
387
        client: node_builder.client.clone(),
388
        announce_block: announce_block.clone(),
389
        para_id,
390
        relay_chain_interface: relay_chain_interface.clone(),
391
        task_manager: &mut node_builder.task_manager,
392
        da_recovery_profile: if validator {
393
            DARecoveryProfile::Collator
394
        } else {
395
            DARecoveryProfile::FullNode
396
        },
397
        import_queue: import_queue_service,
398
        relay_chain_slot_duration,
399
        recovery_handle: Box::new(overseer_handle.clone()),
400
        sync_service: node_builder.network.sync_service.clone(),
401
    })?;
402

            
403
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
404
        client: node_builder.client.clone(),
405
        backend: node_builder.backend.clone(),
406
        sync_oracle: node_builder.network.sync_service.clone(),
407
        overseer_handle: overseer_handle.clone(),
408
    };
409
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
410

            
411
    if validator {
412
        let collator_key = collator_key
413
            .clone()
414
            .expect("Command line arguments do not allow this. qed");
415

            
416
        // Start task which detects para id assignment, and starts/stops container chains.
417
        // Note that if this node was started without a `container_chain_config`, we don't
418
        // support collation on container chains, so there is no need to detect changes to assignment
419
        if container_chain_config.is_some() {
420
            crate::build_check_assigned_para_id(
421
                orchestrator_chain_interface.clone(),
422
                sync_keystore.clone(),
423
                cc_spawn_tx.clone(),
424
                node_builder.task_manager.spawn_essential_handle(),
425
            );
426
        }
427

            
428
        let start_collation = {
429
            // Params for collate_on_tanssi closure
430
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
431
            let node_keystore = node_builder.keystore_container.keystore().clone();
432
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
433
            let node_client = node_builder.client.clone();
434
            let node_backend = node_builder.backend.clone();
435
            let relay_interface = relay_chain_interface.clone();
436
            let node_sync_service = node_builder.network.sync_service.clone();
437
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
438
            let overseer = overseer_handle.clone();
439
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
440
                node_spawn_handle.clone(),
441
                node_client.clone(),
442
                node_builder.transaction_pool.clone(),
443
                node_builder.prometheus_registry.as_ref(),
444
                node_telemetry_handle.clone(),
445
            );
446

            
447
            move || {
448
                start_consensus_orchestrator(
449
                    node_client.clone(),
450
                    node_backend.clone(),
451
                    block_import.clone(),
452
                    node_spawn_handle.clone(),
453
                    relay_interface.clone(),
454
                    node_sync_service.clone(),
455
                    node_keystore.clone(),
456
                    force_authoring,
457
                    relay_chain_slot_duration,
458
                    para_id,
459
                    collator_key.clone(),
460
                    overseer.clone(),
461
                    announce_block.clone(),
462
                    proposer_factory.clone(),
463
                    orchestrator_tx_pool.clone(),
464
                    max_pov_percentage,
465
                )
466
            }
467
        };
468
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
469
        collate_on_tanssi = Arc::new(start_collation);
470
    }
471

            
472
    let sync_keystore = node_builder.keystore_container.keystore();
473

            
474
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
475
        // If the orchestrator chain is running as a full-node, we start a full node for the
476
        // container chain immediately, because only collator nodes detect their container chain
477
        // assignment so otherwise it will never start.
478
        if !validator {
479
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
480
                // Spawn new container chain node
481
                cc_spawn_tx
482
                    .send(CcSpawnMsg::UpdateAssignment {
483
                        current: Some(container_chain_para_id.into()),
484
                        next: Some(container_chain_para_id.into()),
485
                    })
486
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
487
            }
488
        }
489

            
490
        // Start container chain spawner task. This will start and stop container chains on demand.
491
        let orchestrator_client = node_builder.client.clone();
492
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
493
        let spawn_handle = node_builder.task_manager.spawn_handle();
494
        let relay_chain_interface = relay_chain_interface.clone();
495
        let orchestrator_chain_interface = orchestrator_chain_interface.clone();
496

            
497
        // This considers that the container chains have the same APIs as dancebox, which
498
        // is not the case. However the spawner don't call APIs that are not part of the expected
499
        // common APIs for a container chain.
500
        // TODO: Depend on the simple container chain runtime which should be the minimal api?
501
        let container_chain_spawner = ContainerChainSpawner {
502
            params: ContainerChainSpawnParams {
503
                orchestrator_chain_interface,
504
                container_chain_cli,
505
                tokio_handle,
506
                chain_type,
507
                relay_chain,
508
                relay_chain_interface,
509
                sync_keystore,
510
                data_preserver: false,
511
                collation_params: if validator {
512
                    Some(spawner::CollationParams {
513
                        orchestrator_client: Some(orchestrator_client.clone()),
514
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
515
                        orchestrator_para_id: para_id,
516
                        collator_key: collator_key
517
                            .expect("there should be a collator key if we're a validator"),
518
                        solochain: false,
519
                    })
520
                } else {
521
                    None
522
                },
523
                spawn_handle,
524
                generate_rpc_builder:
525
                    tc_service_container_chain_spawner::rpc::GenerateSubstrateRpcBuilder::<
526
                        dancebox_runtime::RuntimeApi,
527
                    >::new(),
528
                override_sync_mode: Some(sc_cli::SyncMode::Warp),
529
                phantom: PhantomData,
530
            },
531
            state: Default::default(),
532
            db_folder_cleanup_done: false,
533
            collate_on_tanssi,
534
            collation_cancellation_constructs: None,
535
        };
536
        let state = container_chain_spawner.state.clone();
537

            
538
        node_builder.task_manager.spawn_essential_handle().spawn(
539
            "container-chain-spawner-rx-loop",
540
            None,
541
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
542
        );
543

            
544
        node_builder.task_manager.spawn_essential_handle().spawn(
545
            "container-chain-spawner-debug-state",
546
            None,
547
            monitor::monitor_task(state),
548
        )
549
    }
550

            
551
    Ok(ParachainNodeStarted {
552
        task_manager: node_builder.task_manager,
553
        client: node_builder.client,
554
        relay_chain_interface,
555
        orchestrator_chain_interface,
556
        keystore: node_builder.keystore_container.keystore(),
557
    })
558
}
559

            
560
pub fn import_queue(
561
    parachain_config: &Configuration,
562
    node_builder: &NodeBuilder<NodeConfig>,
563
) -> (ParachainBlockImport, BasicQueue<Block>) {
564
    // The nimbus import queue ONLY checks the signature correctness
565
    // Any other checks corresponding to the author-correctness should be done
566
    // in the runtime
567
    let block_import =
568
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
569

            
570
    let import_queue = nimbus_consensus::import_queue(
571
        node_builder.client.clone(),
572
        block_import.clone(),
573
        move |_, _| async move {
574
            let time = sp_timestamp::InherentDataProvider::from_system_time();
575

            
576
            Ok((time,))
577
        },
578
        &node_builder.task_manager.spawn_essential_handle(),
579
        parachain_config.prometheus_registry(),
580
        false,
581
        false,
582
    )
583
    .expect("function never fails");
584

            
585
    (block_import, import_queue)
586
}
587

            
588
/// Builder for a concrete relay chain interface, created from a full node. Builds
589
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
590
///
591
/// The builder takes a [`polkadot_client::Client`]
592
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
593
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
594
struct OrchestratorChainInProcessInterfaceBuilder {
595
    client: Arc<ParachainClient>,
596
    backend: Arc<FullBackend>,
597
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
598
    overseer_handle: OverseerHandle,
599
}
600

            
601
impl OrchestratorChainInProcessInterfaceBuilder {
602
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
603
        Arc::new(OrchestratorChainInProcessInterface::new(
604
            self.client,
605
            self.backend,
606
            self.sync_oracle,
607
            self.overseer_handle,
608
        ))
609
    }
610
}
611

            
612
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
613
pub struct OrchestratorChainInProcessInterface<Client> {
614
    pub full_client: Arc<Client>,
615
    pub backend: Arc<FullBackend>,
616
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
617
    pub overseer_handle: OverseerHandle,
618
}
619

            
620
impl<Client> OrchestratorChainInProcessInterface<Client> {
621
    /// Create a new instance of [`RelayChainInProcessInterface`]
622
    pub fn new(
623
        full_client: Arc<Client>,
624
        backend: Arc<FullBackend>,
625
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
626
        overseer_handle: OverseerHandle,
627
    ) -> Self {
628
        Self {
629
            full_client,
630
            backend,
631
            sync_oracle,
632
            overseer_handle,
633
        }
634
    }
635
}
636

            
637
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
638
    fn clone(&self) -> Self {
639
        Self {
640
            full_client: self.full_client.clone(),
641
            backend: self.backend.clone(),
642
            sync_oracle: self.sync_oracle.clone(),
643
            overseer_handle: self.overseer_handle.clone(),
644
        }
645
    }
646
}
647

            
648
#[async_trait::async_trait]
649
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
650
where
651
    Client: ProvideRuntimeApi<Block>
652
        + BlockchainEvents<Block>
653
        + AuxStore
654
        + UsageProvider<Block>
655
        + Sync
656
        + Send,
657
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
658
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
659
        + RegistrarApi<Block, ParaId>
660
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
661
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
662
{
663
    async fn get_storage_by_key(
664
        &self,
665
        orchestrator_parent: PHash,
666
        key: &[u8],
667
    ) -> OrchestratorChainResult<Option<StorageValue>> {
668
        let state = self.backend.state_at(orchestrator_parent)?;
669
        state
670
            .storage(key)
671
            .map_err(OrchestratorChainError::GenericError)
672
    }
673

            
674
    async fn prove_read(
675
        &self,
676
        orchestrator_parent: PHash,
677
        relevant_keys: &Vec<Vec<u8>>,
678
    ) -> OrchestratorChainResult<StorageProof> {
679
        let state_backend = self.backend.state_at(orchestrator_parent)?;
680

            
681
        sp_state_machine::prove_read(state_backend, relevant_keys)
682
            .map_err(OrchestratorChainError::StateMachineError)
683
    }
684

            
685
    fn overseer_handle(&self) -> OrchestratorChainResult<OverseerHandle> {
686
        Ok(self.overseer_handle.clone())
687
    }
688

            
689
    /// Get a stream of import block notifications.
690
    async fn import_notification_stream(
691
        &self,
692
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
693
        let notification_stream = self
694
            .full_client
695
            .import_notification_stream()
696
            .map(|notification| notification.header);
697
        Ok(Box::pin(notification_stream))
698
    }
699

            
700
    /// Get a stream of new best block notifications.
701
    async fn new_best_notification_stream(
702
        &self,
703
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
704
        let notifications_stream =
705
            self.full_client
706
                .import_notification_stream()
707
                .filter_map(|notification| async move {
708
                    notification.is_new_best.then_some(notification.header)
709
                });
710
        Ok(Box::pin(notifications_stream))
711
    }
712

            
713
    /// Get a stream of finality notifications.
714
    async fn finality_notification_stream(
715
        &self,
716
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
717
        let notification_stream = self
718
            .full_client
719
            .finality_notification_stream()
720
            .map(|notification| notification.header);
721
        Ok(Box::pin(notification_stream))
722
    }
723

            
724
    async fn genesis_data(
725
        &self,
726
        orchestrator_parent: PHash,
727
        para_id: ParaId,
728
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
729
        let runtime_api = self.full_client.runtime_api();
730

            
731
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
732
    }
733

            
734
    async fn boot_nodes(
735
        &self,
736
        orchestrator_parent: PHash,
737
        para_id: ParaId,
738
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
739
        let runtime_api = self.full_client.runtime_api();
740

            
741
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
742
    }
743

            
744
    async fn latest_block_number(
745
        &self,
746
        orchestrator_parent: PHash,
747
        para_id: ParaId,
748
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
749
        let runtime_api = self.full_client.runtime_api();
750

            
751
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
752
    }
753

            
754
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
755
        Ok(self.backend.blockchain().info().best_hash)
756
    }
757

            
758
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
759
        Ok(self.backend.blockchain().info().finalized_hash)
760
    }
761

            
762
    async fn data_preserver_active_assignment(
763
        &self,
764
        orchestrator_parent: PHash,
765
        profile_id: DataPreserverProfileId,
766
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
767
        let runtime_api = self.full_client.runtime_api();
768

            
769
        use {
770
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
771
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
772
        };
773

            
774
        Ok(
775
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
776
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
777
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
778
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
779
            },
780
        )
781
    }
782

            
783
    async fn check_para_id_assignment(
784
        &self,
785
        orchestrator_parent: PHash,
786
        authority: NimbusId,
787
    ) -> OrchestratorChainResult<Option<ParaId>> {
788
        let runtime_api = self.full_client.runtime_api();
789

            
790
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
791
    }
792

            
793
    async fn check_para_id_assignment_next_session(
794
        &self,
795
        orchestrator_parent: PHash,
796
        authority: NimbusId,
797
    ) -> OrchestratorChainResult<Option<ParaId>> {
798
        let runtime_api = self.full_client.runtime_api();
799

            
800
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
801
    }
802
}