1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
pub mod rpc;
18

            
19
use sc_client_api::TrieCacheContext;
20
use {
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_collator::service::CollatorService,
23
    cumulus_client_consensus_proposer::Proposer,
24
    cumulus_client_service::{
25
        prepare_node_config, start_relay_chain_tasks, DARecoveryProfile, StartRelayChainTasksParams,
26
    },
27
    cumulus_primitives_core::{relay_chain::CollatorPair, ParaId},
28
    cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface},
29
    dancebox_runtime::{
30
        opaque::{Block, Hash},
31
        AccountId, RuntimeApi,
32
    },
33
    dc_orchestrator_chain_interface::{
34
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
35
        OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult, PHash,
36
        PHeader,
37
    },
38
    futures::{Stream, StreamExt},
39
    nimbus_primitives::{NimbusId, NimbusPair},
40
    node_common::{service::node_builder::NodeBuilder, service::node_builder::NodeBuilderConfig},
41
    pallet_author_noting_runtime_api::AuthorNotingApi,
42
    pallet_collator_assignment_runtime_api::CollatorAssignmentApi,
43
    pallet_data_preservers_runtime_api::DataPreserversApi,
44
    pallet_registrar_runtime_api::RegistrarApi,
45
    polkadot_cli::ProvideRuntimeApi,
46
    sc_client_api::{
47
        AuxStore, Backend as BackendT, BlockchainEvents, HeaderBackend, UsageProvider,
48
    },
49
    sc_consensus::BasicQueue,
50
    sc_network::{NetworkBackend, NetworkBlock},
51
    sc_network_sync::SyncingService,
52
    sc_service::{Configuration, SpawnTaskHandle, TaskManager},
53
    sc_tracing::tracing::Instrument,
54
    sc_transaction_pool::TransactionPoolHandle,
55
    sp_api::{ApiExt, StorageProof},
56
    sp_consensus::SyncOracle,
57
    sp_consensus_slots::Slot,
58
    sp_core::H256,
59
    sp_keystore::KeystorePtr,
60
    sp_state_machine::{Backend as StateBackend, StorageValue},
61
    std::{marker::PhantomData, pin::Pin, sync::Arc, time::Duration},
62
    tc_consensus::{
63
        collators::lookahead::{
64
            self as lookahead_tanssi_aura, BuyCoreParams, Params as LookaheadTanssiAuraParams,
65
        },
66
        OnDemandBlockProductionApi, OrchestratorAuraWorkerAuxData, TanssiAuthorityAssignmentApi,
67
    },
68
    tc_service_container_chain_spawner::{
69
        cli::ContainerChainCli,
70
        monitor,
71
        service::{
72
            ParachainBlockImport, ParachainClient, ParachainExecutor, ParachainProposerFactory,
73
        },
74
        spawner::{self, CcSpawnMsg, ContainerChainSpawnParams, ContainerChainSpawner},
75
    },
76
    tokio::sync::mpsc,
77
    tokio_util::sync::CancellationToken,
78
};
79

            
80
type FullBackend = sc_service::TFullBackend<Block>;
81

            
82
pub struct NodeConfig;
83
impl NodeBuilderConfig for NodeConfig {
84
    type Block = Block;
85
    type RuntimeApi = RuntimeApi;
86
    type ParachainExecutor = ParachainExecutor;
87
}
88

            
89
pub struct ParachainNodeStarted {
90
    pub task_manager: TaskManager,
91
    pub client: Arc<ParachainClient>,
92
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
93
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
94
    pub keystore: KeystorePtr,
95
}
96

            
97
/// Start a parachain node.
98
pub async fn start_parachain_node<Net>(
99
    parachain_config: Configuration,
100
    polkadot_config: Configuration,
101
    container_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
102
    collator_options: CollatorOptions,
103
    para_id: ParaId,
104
    hwbench: Option<sc_sysinfo::HwBench>,
105
    max_pov_percentage: Option<u32>,
106
) -> sc_service::error::Result<ParachainNodeStarted>
107
where
108
    Net: NetworkBackend<Block, Hash>,
109
{
110
    start_node_impl::<Net>(
111
        parachain_config,
112
        polkadot_config,
113
        container_config,
114
        collator_options,
115
        para_id,
116
        hwbench,
117
        max_pov_percentage,
118
    )
119
    .instrument(sc_tracing::tracing::info_span!(
120
        sc_tracing::logging::PREFIX_LOG_SPAN,
121
        name = "Orchestrator",
122
    ))
123
    .await
124
}
125

            
126
/// Start collator task for orchestrator chain.
127
/// Returns a `CancellationToken` that can be used to cancel the collator task,
128
/// and a `oneshot::Receiver<()>` that can be used to wait until the task has ended.
129
fn start_consensus_orchestrator(
130
    client: Arc<ParachainClient>,
131
    backend: Arc<FullBackend>,
132
    block_import: ParachainBlockImport,
133
    spawner: SpawnTaskHandle,
134
    relay_chain_interface: Arc<dyn RelayChainInterface>,
135
    sync_oracle: Arc<SyncingService<Block>>,
136
    keystore: KeystorePtr,
137
    force_authoring: bool,
138
    relay_chain_slot_duration: Duration,
139
    para_id: ParaId,
140
    collator_key: CollatorPair,
141
    overseer_handle: OverseerHandle,
142
    announce_block: Arc<dyn Fn(Hash, Option<Vec<u8>>) + Send + Sync>,
143
    proposer_factory: ParachainProposerFactory,
144
    orchestrator_tx_pool: Arc<TransactionPoolHandle<Block, ParachainClient>>,
145
    max_pov_percentage: Option<u32>,
146
) -> (CancellationToken, futures::channel::oneshot::Receiver<()>) {
147
    let slot_duration = cumulus_client_consensus_aura::slot_duration(&*client)
148
        .expect("start_consensus_orchestrator: slot duration should exist");
149

            
150
    let proposer = Proposer::new(proposer_factory);
151

            
152
    let collator_service = CollatorService::new(
153
        client.clone(),
154
        Arc::new(spawner.clone()),
155
        announce_block,
156
        client.clone(),
157
    );
158

            
159
    let relay_chain_interace_for_cidp = relay_chain_interface.clone();
160
    let client_set_aside_for_cidp = client.clone();
161
    let client_set_aside_for_orch = client.clone();
162
    let client_for_hash_provider = client.clone();
163
    let client_for_slot_duration_provider = client.clone();
164

            
165
    let code_hash_provider = move |block_hash| {
166
        client_for_hash_provider
167
            .code_at(block_hash)
168
            .ok()
169
            .map(polkadot_primitives::ValidationCode)
170
            .map(|c| c.hash())
171
    };
172

            
173
    let cancellation_token = CancellationToken::new();
174
    let buy_core_params = BuyCoreParams::Orchestrator {
175
        orchestrator_tx_pool,
176
        orchestrator_client: client.clone(),
177
    };
178

            
179
    let params = LookaheadTanssiAuraParams {
180
        max_pov_percentage,
181
        get_current_slot_duration: move |block_hash| {
182
            sc_consensus_aura::standalone::slot_duration_at(
183
                &*client_for_slot_duration_provider,
184
                block_hash,
185
            )
186
            .expect("Slot duration should be set")
187
        },
188
        create_inherent_data_providers: move |block_hash, (relay_parent, _validation_data)| {
189
            let relay_chain_interface = relay_chain_interace_for_cidp.clone();
190
            let client_set_aside_for_cidp = client_set_aside_for_cidp.clone();
191
            async move {
192
                // We added a new runtime api that allows to know which parachains have
193
                // some collators assigned to them. We'll now only include those. For older
194
                // runtimes we continue to write all of them.
195
                let para_ids = match client_set_aside_for_cidp
196
                    .runtime_api()
197
                    .api_version::<dyn CollatorAssignmentApi<Block, AccountId, ParaId>>(
198
                    block_hash,
199
                )? {
200
                    Some(version) if version >= 2 => client_set_aside_for_cidp
201
                        .runtime_api()
202
                        .parachains_with_some_collators(block_hash)?,
203
                    _ => client_set_aside_for_cidp
204
                        .runtime_api()
205
                        .registered_paras(block_hash)?,
206
                };
207
                let para_ids: Vec<_> = para_ids.into_iter().collect();
208
                let author_noting_inherent =
209
                    tp_author_noting_inherent::OwnParachainInherentData::create_at(
210
                        relay_parent,
211
                        &relay_chain_interface,
212
                        &para_ids,
213
                    )
214
                    .await;
215

            
216
                // Fetch duration every block to avoid downtime when passing from 12 to 6s
217
                let slot_duration = sc_consensus_aura::standalone::slot_duration_at(
218
                    &*client_set_aside_for_cidp.clone(),
219
                    block_hash,
220
                )
221
                .expect("Slot duration should be set");
222

            
223
                let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
224

            
225
                let slot =
226
						sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
227
							*timestamp,
228
							slot_duration,
229
						);
230

            
231
                let author_noting_inherent = author_noting_inherent.ok_or_else(|| {
232
                    Box::<dyn std::error::Error + Send + Sync>::from(
233
                        "Failed to create author noting inherent",
234
                    )
235
                })?;
236

            
237
                Ok((slot, timestamp, author_noting_inherent))
238
            }
239
        },
240
        get_orchestrator_aux_data: move |block_hash: H256, (_relay_parent, _validation_data)| {
241
            let client_set_aside_for_orch = client_set_aside_for_orch.clone();
242

            
243
            async move {
244
                let authorities = tc_consensus::authorities::<Block, ParachainClient, NimbusPair>(
245
                    client_set_aside_for_orch.as_ref(),
246
                    &block_hash,
247
                    para_id,
248
                );
249

            
250
                let authorities = authorities.ok_or_else(|| {
251
                    Box::<dyn std::error::Error + Send + Sync>::from(
252
                        "Failed to fetch authorities with error",
253
                    )
254
                })?;
255

            
256
                log::info!(
257
                    "Authorities {:?} found for header {:?}",
258
                    authorities,
259
                    block_hash
260
                );
261

            
262
                let aux_data = OrchestratorAuraWorkerAuxData {
263
                    authorities,
264
                    // This is the orchestrator consensus, it does not have a slot frequency
265
                    slot_freq: None,
266
                };
267

            
268
                Ok(aux_data)
269
            }
270
        },
271
        block_import,
272
        para_client: client,
273
        relay_client: relay_chain_interface,
274
        sync_oracle,
275
        keystore,
276
        collator_key,
277
        para_id,
278
        overseer_handle,
279
        orchestrator_slot_duration: slot_duration,
280
        relay_chain_slot_duration,
281
        force_authoring,
282
        proposer,
283
        collator_service,
284
        authoring_duration: Duration::from_millis(2000),
285
        code_hash_provider,
286
        para_backend: backend,
287
        cancellation_token: cancellation_token.clone(),
288
        buy_core_params,
289
    };
290

            
291
    let (fut, exit_notification_receiver) =
292
        lookahead_tanssi_aura::run::<_, Block, NimbusPair, _, _, _, _, _, _, _, _, _, _, _, _, _>(
293
            params,
294
        );
295
    spawner.spawn("tanssi-aura", None, fut);
296

            
297
    (cancellation_token, exit_notification_receiver)
298
}
299

            
300
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
301
///
302
/// This is the actual implementation that is abstract over the executor and the runtime api.
303
async fn start_node_impl<Net>(
304
    orchestrator_config: Configuration,
305
    polkadot_config: Configuration,
306
    container_chain_config: Option<(ContainerChainCli, tokio::runtime::Handle)>,
307
    collator_options: CollatorOptions,
308
    para_id: ParaId,
309
    hwbench: Option<sc_sysinfo::HwBench>,
310
    max_pov_percentage: Option<u32>,
311
) -> sc_service::error::Result<ParachainNodeStarted>
312
where
313
    Net: NetworkBackend<Block, Hash>,
314
{
315
    let parachain_config = prepare_node_config(orchestrator_config);
316
    let chain_type: sc_chain_spec::ChainType = parachain_config.chain_spec.chain_type();
317
    let relay_chain = node_common::chain_spec::Extensions::try_get(&*parachain_config.chain_spec)
318
        .map(|e| e.relay_chain.clone())
319
        .ok_or("Could not find relay_chain extension in chain-spec.")?;
320

            
321
    // Channel to send messages to start/stop container chains
322
    let (cc_spawn_tx, cc_spawn_rx) = mpsc::unbounded_channel();
323

            
324
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
325
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
326

            
327
    let (block_import, import_queue) = import_queue(&parachain_config, &node_builder);
328

            
329
    let (relay_chain_interface, collator_key) = node_builder
330
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
331
        .await?;
332

            
333
    let validator = parachain_config.role.is_authority();
334
    let force_authoring = parachain_config.force_authoring;
335

            
336
    let node_builder = node_builder
337
        .build_cumulus_network::<_, Net>(
338
            &parachain_config,
339
            para_id,
340
            import_queue,
341
            relay_chain_interface.clone(),
342
        )
343
        .await?;
344

            
345
    let rpc_builder = {
346
        let client = node_builder.client.clone();
347
        let transaction_pool = node_builder.transaction_pool.clone();
348

            
349
        Box::new(move |_| {
350
            let deps = rpc::FullDeps {
351
                client: client.clone(),
352
                pool: transaction_pool.clone(),
353
                command_sink: None,
354
                xcm_senders: None,
355
                randomness_sender: None,
356
                container_chain_exclusion_sender: None,
357
            };
358

            
359
            rpc::create_full(deps).map_err(Into::into)
360
        })
361
    };
362

            
363
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
364

            
365
    let relay_chain_slot_duration = Duration::from_secs(6);
366
    let overseer_handle = relay_chain_interface
367
        .overseer_handle()
368
        .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
369
    let sync_keystore = node_builder.keystore_container.keystore();
370
    let mut collate_on_tanssi: Arc<
371
        dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
372
    > = Arc::new(move || {
373
        if validator {
374
            panic!("Called uninitialized collate_on_tanssi");
375
        } else {
376
            panic!("Called collate_on_tanssi when node is not running as a validator");
377
        }
378
    });
379

            
380
    let announce_block = {
381
        let sync_service = node_builder.network.sync_service.clone();
382
        Arc::new(move |hash, data| sync_service.announce_block(hash, data))
383
    };
384

            
385
    let (mut node_builder, import_queue_service) = node_builder.extract_import_queue_service();
386

            
387
    start_relay_chain_tasks(StartRelayChainTasksParams {
388
        client: node_builder.client.clone(),
389
        announce_block: announce_block.clone(),
390
        para_id,
391
        relay_chain_interface: relay_chain_interface.clone(),
392
        task_manager: &mut node_builder.task_manager,
393
        da_recovery_profile: if validator {
394
            DARecoveryProfile::Collator
395
        } else {
396
            DARecoveryProfile::FullNode
397
        },
398
        import_queue: import_queue_service,
399
        relay_chain_slot_duration,
400
        recovery_handle: Box::new(overseer_handle.clone()),
401
        sync_service: node_builder.network.sync_service.clone(),
402
        prometheus_registry: node_builder.prometheus_registry.as_ref(),
403
    })?;
404

            
405
    let orchestrator_chain_interface_builder = OrchestratorChainInProcessInterfaceBuilder {
406
        client: node_builder.client.clone(),
407
        backend: node_builder.backend.clone(),
408
        sync_oracle: node_builder.network.sync_service.clone(),
409
        overseer_handle: overseer_handle.clone(),
410
    };
411
    let orchestrator_chain_interface = orchestrator_chain_interface_builder.build();
412

            
413
    if validator {
414
        let collator_key = collator_key
415
            .clone()
416
            .expect("Command line arguments do not allow this. qed");
417

            
418
        // Start task which detects para id assignment, and starts/stops container chains.
419
        // Note that if this node was started without a `container_chain_config`, we don't
420
        // support collation on container chains, so there is no need to detect changes to assignment
421
        if container_chain_config.is_some() {
422
            crate::build_check_assigned_para_id(
423
                orchestrator_chain_interface.clone(),
424
                sync_keystore.clone(),
425
                cc_spawn_tx.clone(),
426
                node_builder.task_manager.spawn_essential_handle(),
427
            );
428
        }
429

            
430
        let start_collation = {
431
            // Params for collate_on_tanssi closure
432
            let node_spawn_handle = node_builder.task_manager.spawn_handle().clone();
433
            let node_keystore = node_builder.keystore_container.keystore().clone();
434
            let node_telemetry_handle = node_builder.telemetry.as_ref().map(|t| t.handle()).clone();
435
            let node_client = node_builder.client.clone();
436
            let node_backend = node_builder.backend.clone();
437
            let relay_interface = relay_chain_interface.clone();
438
            let node_sync_service = node_builder.network.sync_service.clone();
439
            let orchestrator_tx_pool = node_builder.transaction_pool.clone();
440
            let overseer = overseer_handle.clone();
441
            let proposer_factory = sc_basic_authorship::ProposerFactory::with_proof_recording(
442
                node_spawn_handle.clone(),
443
                node_client.clone(),
444
                node_builder.transaction_pool.clone(),
445
                node_builder.prometheus_registry.as_ref(),
446
                node_telemetry_handle.clone(),
447
            );
448

            
449
            move || {
450
                start_consensus_orchestrator(
451
                    node_client.clone(),
452
                    node_backend.clone(),
453
                    block_import.clone(),
454
                    node_spawn_handle.clone(),
455
                    relay_interface.clone(),
456
                    node_sync_service.clone(),
457
                    node_keystore.clone(),
458
                    force_authoring,
459
                    relay_chain_slot_duration,
460
                    para_id,
461
                    collator_key.clone(),
462
                    overseer.clone(),
463
                    announce_block.clone(),
464
                    proposer_factory.clone(),
465
                    orchestrator_tx_pool.clone(),
466
                    max_pov_percentage,
467
                )
468
            }
469
        };
470
        // Save callback for later, used when collator rotates from container chain back to orchestrator chain
471
        collate_on_tanssi = Arc::new(start_collation);
472
    }
473

            
474
    let sync_keystore = node_builder.keystore_container.keystore();
475

            
476
    if let Some((container_chain_cli, tokio_handle)) = container_chain_config {
477
        // If the orchestrator chain is running as a full-node, we start a full node for the
478
        // container chain immediately, because only collator nodes detect their container chain
479
        // assignment so otherwise it will never start.
480
        if !validator {
481
            if let Some(container_chain_para_id) = container_chain_cli.base.para_id {
482
                // Spawn new container chain node
483
                cc_spawn_tx
484
                    .send(CcSpawnMsg::UpdateAssignment {
485
                        current: Some(container_chain_para_id.into()),
486
                        next: Some(container_chain_para_id.into()),
487
                    })
488
                    .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
489
            }
490
        }
491

            
492
        // Start container chain spawner task. This will start and stop container chains on demand.
493
        let orchestrator_client = node_builder.client.clone();
494
        let orchestrator_tx_pool = node_builder.transaction_pool.clone();
495
        let spawn_handle = node_builder.task_manager.spawn_handle();
496
        let relay_chain_interface = relay_chain_interface.clone();
497
        let orchestrator_chain_interface = orchestrator_chain_interface.clone();
498

            
499
        // This considers that the container chains have the same APIs as dancebox, which
500
        // is not the case. However the spawner don't call APIs that are not part of the expected
501
        // common APIs for a container chain.
502
        // TODO: Depend on the simple container chain runtime which should be the minimal api?
503
        let container_chain_spawner = ContainerChainSpawner {
504
            params: ContainerChainSpawnParams {
505
                orchestrator_chain_interface,
506
                container_chain_cli,
507
                tokio_handle,
508
                chain_type,
509
                relay_chain,
510
                relay_chain_interface,
511
                sync_keystore,
512
                data_preserver: false,
513
                collation_params: if validator {
514
                    Some(spawner::CollationParams {
515
                        orchestrator_client: Some(orchestrator_client.clone()),
516
                        orchestrator_tx_pool: Some(orchestrator_tx_pool),
517
                        orchestrator_para_id: para_id,
518
                        collator_key: collator_key
519
                            .expect("there should be a collator key if we're a validator"),
520
                        solochain: false,
521
                    })
522
                } else {
523
                    None
524
                },
525
                spawn_handle,
526
                generate_rpc_builder:
527
                    tc_service_container_chain_spawner::rpc::GenerateSubstrateRpcBuilder::<
528
                        dancebox_runtime::RuntimeApi,
529
                    >::new(),
530
                override_sync_mode: Some(sc_cli::SyncMode::Warp),
531
                phantom: PhantomData,
532
            },
533
            state: Default::default(),
534
            db_folder_cleanup_done: false,
535
            collate_on_tanssi,
536
            collation_cancellation_constructs: None,
537
        };
538
        let state = container_chain_spawner.state.clone();
539

            
540
        node_builder.task_manager.spawn_essential_handle().spawn(
541
            "container-chain-spawner-rx-loop",
542
            None,
543
            container_chain_spawner.rx_loop(cc_spawn_rx, validator, false),
544
        );
545

            
546
        node_builder.task_manager.spawn_essential_handle().spawn(
547
            "container-chain-spawner-debug-state",
548
            None,
549
            monitor::monitor_task(state),
550
        )
551
    }
552

            
553
    Ok(ParachainNodeStarted {
554
        task_manager: node_builder.task_manager,
555
        client: node_builder.client,
556
        relay_chain_interface,
557
        orchestrator_chain_interface,
558
        keystore: node_builder.keystore_container.keystore(),
559
    })
560
}
561

            
562
pub fn import_queue(
563
    parachain_config: &Configuration,
564
    node_builder: &NodeBuilder<NodeConfig>,
565
) -> (ParachainBlockImport, BasicQueue<Block>) {
566
    // The nimbus import queue ONLY checks the signature correctness
567
    // Any other checks corresponding to the author-correctness should be done
568
    // in the runtime
569
    let block_import =
570
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
571

            
572
    let import_queue = nimbus_consensus::import_queue(
573
        node_builder.client.clone(),
574
        block_import.clone(),
575
        move |_, _| async move {
576
            let time = sp_timestamp::InherentDataProvider::from_system_time();
577

            
578
            Ok((time,))
579
        },
580
        &node_builder.task_manager.spawn_essential_handle(),
581
        parachain_config.prometheus_registry(),
582
        false,
583
        false,
584
    )
585
    .expect("function never fails");
586

            
587
    (block_import, import_queue)
588
}
589

            
590
/// Builder for a concrete relay chain interface, created from a full node. Builds
591
/// a [`RelayChainInProcessInterface`] to access relay chain data necessary for parachain operation.
592
///
593
/// The builder takes a [`polkadot_client::Client`]
594
/// that wraps a concrete instance. By using [`polkadot_client::ExecuteWithClient`]
595
/// the builder gets access to this concrete instance and instantiates a [`RelayChainInProcessInterface`] with it.
596
struct OrchestratorChainInProcessInterfaceBuilder {
597
    client: Arc<ParachainClient>,
598
    backend: Arc<FullBackend>,
599
    sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
600
    overseer_handle: OverseerHandle,
601
}
602

            
603
impl OrchestratorChainInProcessInterfaceBuilder {
604
    pub fn build(self) -> Arc<dyn OrchestratorChainInterface> {
605
        Arc::new(OrchestratorChainInProcessInterface::new(
606
            self.client,
607
            self.backend,
608
            self.sync_oracle,
609
            self.overseer_handle,
610
        ))
611
    }
612
}
613

            
614
/// Provides an implementation of the [`RelayChainInterface`] using a local in-process relay chain node.
615
pub struct OrchestratorChainInProcessInterface<Client> {
616
    pub full_client: Arc<Client>,
617
    pub backend: Arc<FullBackend>,
618
    pub sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
619
    pub overseer_handle: OverseerHandle,
620
}
621

            
622
impl<Client> OrchestratorChainInProcessInterface<Client> {
623
    /// Create a new instance of [`RelayChainInProcessInterface`]
624
    pub fn new(
625
        full_client: Arc<Client>,
626
        backend: Arc<FullBackend>,
627
        sync_oracle: Arc<dyn SyncOracle + Send + Sync>,
628
        overseer_handle: OverseerHandle,
629
    ) -> Self {
630
        Self {
631
            full_client,
632
            backend,
633
            sync_oracle,
634
            overseer_handle,
635
        }
636
    }
637
}
638

            
639
impl<T> Clone for OrchestratorChainInProcessInterface<T> {
640
    fn clone(&self) -> Self {
641
        Self {
642
            full_client: self.full_client.clone(),
643
            backend: self.backend.clone(),
644
            sync_oracle: self.sync_oracle.clone(),
645
            overseer_handle: self.overseer_handle.clone(),
646
        }
647
    }
648
}
649

            
650
#[async_trait::async_trait]
651
impl<Client> OrchestratorChainInterface for OrchestratorChainInProcessInterface<Client>
652
where
653
    Client: ProvideRuntimeApi<Block>
654
        + BlockchainEvents<Block>
655
        + AuxStore
656
        + UsageProvider<Block>
657
        + Sync
658
        + Send,
659
    Client::Api: TanssiAuthorityAssignmentApi<Block, NimbusId>
660
        + OnDemandBlockProductionApi<Block, ParaId, Slot>
661
        + RegistrarApi<Block, ParaId>
662
        + AuthorNotingApi<Block, AccountId, BlockNumber, ParaId>
663
        + DataPreserversApi<Block, DataPreserverProfileId, ParaId>,
664
{
665
    async fn get_storage_by_key(
666
        &self,
667
        orchestrator_parent: PHash,
668
        key: &[u8],
669
    ) -> OrchestratorChainResult<Option<StorageValue>> {
670
        let state = self
671
            .backend
672
            .state_at(orchestrator_parent, TrieCacheContext::Untrusted)?;
673
        state
674
            .storage(key)
675
            .map_err(OrchestratorChainError::GenericError)
676
    }
677

            
678
    async fn prove_read(
679
        &self,
680
        orchestrator_parent: PHash,
681
        relevant_keys: &Vec<Vec<u8>>,
682
    ) -> OrchestratorChainResult<StorageProof> {
683
        let state_backend = self
684
            .backend
685
            .state_at(orchestrator_parent, TrieCacheContext::Untrusted)?;
686

            
687
        sp_state_machine::prove_read(state_backend, relevant_keys)
688
            .map_err(OrchestratorChainError::StateMachineError)
689
    }
690

            
691
    fn overseer_handle(&self) -> OrchestratorChainResult<OverseerHandle> {
692
        Ok(self.overseer_handle.clone())
693
    }
694

            
695
    /// Get a stream of import block notifications.
696
    async fn import_notification_stream(
697
        &self,
698
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
699
        let notification_stream = self
700
            .full_client
701
            .import_notification_stream()
702
            .map(|notification| notification.header);
703
        Ok(Box::pin(notification_stream))
704
    }
705

            
706
    /// Get a stream of new best block notifications.
707
    async fn new_best_notification_stream(
708
        &self,
709
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
710
        let notifications_stream =
711
            self.full_client
712
                .import_notification_stream()
713
                .filter_map(|notification| async move {
714
                    notification.is_new_best.then_some(notification.header)
715
                });
716
        Ok(Box::pin(notifications_stream))
717
    }
718

            
719
    /// Get a stream of finality notifications.
720
    async fn finality_notification_stream(
721
        &self,
722
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
723
        let notification_stream = self
724
            .full_client
725
            .finality_notification_stream()
726
            .map(|notification| notification.header);
727
        Ok(Box::pin(notification_stream))
728
    }
729

            
730
    async fn genesis_data(
731
        &self,
732
        orchestrator_parent: PHash,
733
        para_id: ParaId,
734
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
735
        let runtime_api = self.full_client.runtime_api();
736

            
737
        Ok(runtime_api.genesis_data(orchestrator_parent, para_id)?)
738
    }
739

            
740
    async fn boot_nodes(
741
        &self,
742
        orchestrator_parent: PHash,
743
        para_id: ParaId,
744
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
745
        let runtime_api = self.full_client.runtime_api();
746

            
747
        Ok(runtime_api.boot_nodes(orchestrator_parent, para_id)?)
748
    }
749

            
750
    async fn latest_block_number(
751
        &self,
752
        orchestrator_parent: PHash,
753
        para_id: ParaId,
754
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
755
        let runtime_api = self.full_client.runtime_api();
756

            
757
        Ok(runtime_api.latest_block_number(orchestrator_parent, para_id)?)
758
    }
759

            
760
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
761
        Ok(self.backend.blockchain().info().best_hash)
762
    }
763

            
764
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
765
        Ok(self.backend.blockchain().info().finalized_hash)
766
    }
767

            
768
    async fn data_preserver_active_assignment(
769
        &self,
770
        orchestrator_parent: PHash,
771
        profile_id: DataPreserverProfileId,
772
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
773
        let runtime_api = self.full_client.runtime_api();
774

            
775
        use {
776
            dc_orchestrator_chain_interface::DataPreserverAssignment as InterfaceAssignment,
777
            pallet_data_preservers_runtime_api::Assignment as RuntimeAssignment,
778
        };
779

            
780
        Ok(
781
            match runtime_api.get_active_assignment(orchestrator_parent, profile_id)? {
782
                RuntimeAssignment::NotAssigned => InterfaceAssignment::NotAssigned,
783
                RuntimeAssignment::Active(para_id) => InterfaceAssignment::Active(para_id),
784
                RuntimeAssignment::Inactive(para_id) => InterfaceAssignment::Inactive(para_id),
785
            },
786
        )
787
    }
788

            
789
    async fn check_para_id_assignment(
790
        &self,
791
        orchestrator_parent: PHash,
792
        authority: NimbusId,
793
    ) -> OrchestratorChainResult<Option<ParaId>> {
794
        let runtime_api = self.full_client.runtime_api();
795

            
796
        Ok(runtime_api.check_para_id_assignment(orchestrator_parent, authority)?)
797
    }
798

            
799
    async fn check_para_id_assignment_next_session(
800
        &self,
801
        orchestrator_parent: PHash,
802
        authority: NimbusId,
803
    ) -> OrchestratorChainResult<Option<ParaId>> {
804
        let runtime_api = self.full_client.runtime_api();
805

            
806
        Ok(runtime_api.check_para_id_assignment_next_session(orchestrator_parent, authority)?)
807
    }
808
}