1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_channel::Receiver,
19
    async_io::Timer,
20
    core::time::Duration,
21
    core_extensions::TypeIdentity,
22
    cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams},
23
    cumulus_client_cli::CollatorOptions,
24
    cumulus_client_service::{
25
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
26
        StartFullNodeParams,
27
    },
28
    cumulus_primitives_core::ParaId,
29
    cumulus_relay_chain_interface::RelayChainInterface,
30
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
31
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
32
    jsonrpsee::RpcModule,
33
    polkadot_primitives::CollatorPair,
34
    sc_client_api::Backend,
35
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
36
    sc_consensus_manual_seal::{
37
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
38
    },
39
    sc_executor::{
40
        sp_wasm_interface::HostFunctions, HeapAllocStrategy, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{
44
        config::FullNetworkConfiguration, request_responses::IncomingRequest,
45
        service::traits::NetworkService, NetworkBlock,
46
    },
47
    sc_network_sync::SyncingService,
48
    sc_network_transactions::TransactionsHandlerController,
49
    sc_service::{
50
        config::Multiaddr, Configuration, KeystoreContainer, TFullBackend, TFullClient, TaskManager,
51
    },
52
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
53
    sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool},
54
    sc_utils::mpsc::TracingUnboundedSender,
55
    sp_api::{ConstructRuntimeApi, StorageProof},
56
    sp_block_builder::BlockBuilder,
57
    sp_consensus::SelectChain,
58
    sp_core::{
59
        traits::{CodeExecutor, SpawnNamed},
60
        H256,
61
    },
62
    sp_inherents::CreateInherentDataProviders,
63
    sp_offchain::OffchainWorkerApi,
64
    sp_runtime::Percent,
65
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
66
    std::{str::FromStr, sync::Arc},
67
};
68

            
69
tp_traits::alias!(
70
    pub trait MinimalRuntimeApi<
71
        Block: (cumulus_primitives_core::BlockT),
72
        Client: (sp_api::CallApiAt<Block>),
73
    > :
74
        ConstructRuntimeApi<
75
            Block,
76
            Client,
77
            RuntimeApi:
78
                TaggedTransactionQueue<Block>
79
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
80
                + sp_api::Metadata<Block>
81
                + sp_session::SessionKeys<Block>,
82
        > + Send + Sync + 'static
83
);
84

            
85
tp_traits::alias!(
86
    pub trait MinimalCumulusRuntimeApi<
87
        Block: (cumulus_primitives_core::BlockT),
88
        Client: (sp_api::CallApiAt<Block>),
89
    > :
90
        MinimalRuntimeApi<Block, Client> +
91
        ConstructRuntimeApi<
92
            Block,
93
            Client,
94
            RuntimeApi:
95
                cumulus_primitives_core::CollectCollationInfo<Block>,
96
        >
97
);
98

            
99
/// Trait to configure the main types the builder rely on, bundled in a single
100
/// type to reduce verbosity and the amount of type parameters.
101
pub trait NodeBuilderConfig {
102
    type Block;
103
    type RuntimeApi;
104
    type ParachainExecutor;
105

            
106
    /// Create a new `NodeBuilder` using the types of this `Config`, along
107
    /// with the parachain `Configuration` and an optional `HwBench`.
108
448
    fn new_builder(
109
448
        parachain_config: &Configuration,
110
448
        hwbench: Option<sc_sysinfo::HwBench>,
111
448
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
112
448
    where
113
448
        Self: Sized,
114
448
        BlockOf<Self>: cumulus_primitives_core::BlockT,
115
448
        ExecutorOf<Self>:
116
448
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
117
448
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
118
448
        BlockHashOf<Self>: Unpin,
119
    {
120
448
        NodeBuilder::<Self>::new(parachain_config, hwbench)
121
448
    }
122
}
123

            
124
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
125
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
126
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
127
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
128
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
129
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
130
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
131
pub type ConstructedRuntimeApiOf<T> =
132
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
133
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
134

            
135
// `Cumulus` and `TxHandler` are types that will change during the life of
136
// a `NodeBuilder` because they are generated and consumed when calling
137
// certain functions, with absence of data represented with `()`. Some
138
// function are implemented only for a given concrete type, which ensure it
139
// can only be called if the required data is available (generated and not yet
140
// consumed).
141
//
142
// While this could be implemented with multiple impl blocks with concrete types,
143
// we use here `core_extensions::TypeIdentity` which allow to express type
144
// identity/equality as a trait bound on each function as it removes the
145
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
146
// are still required since Rust can't infer the types in the `new` function
147
// that doesn't take `self`.
148
pub struct NodeBuilder<
149
    T: NodeBuilderConfig,
150
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
151
    // but can only be called with an `import_queue` which can be different in
152
    // each node. For that reason it is a `()` when calling `new`, then the
153
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
154
    // then call `build_cumulus_network` with it to generate the cumulus systems.
155
    SNetwork = (),
156
    // The `TxHandler` is constructed in `build_X_network`
157
    // and is then consumed when calling `spawn_common_tasks`.
158
    STxHandler = (),
159
    // The import queue service is obtained from the import queue in
160
    // `build_cumulus_network` or `build_substrate_network`, which also
161
    // consumes the import queue. Neither of them are clonable, so we need to
162
    // to store the service here to be able to consume it later in
163
    // `start_full_node`.
164
    SImportQueueService = (),
165
> where
166
    BlockOf<T>: cumulus_primitives_core::BlockT,
167
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
168
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
169
{
170
    pub client: Arc<ClientOf<T>>,
171
    pub backend: Arc<BackendOf<T>>,
172
    pub task_manager: TaskManager,
173
    pub keystore_container: KeystoreContainer,
174
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
175
    pub telemetry: Option<Telemetry>,
176
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
177

            
178
    pub hwbench: Option<sc_sysinfo::HwBench>,
179
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
180

            
181
    pub network: SNetwork,
182
    pub tx_handler_controller: STxHandler,
183
    pub import_queue_service: SImportQueueService,
184
}
185

            
186
pub struct Network<Block: cumulus_primitives_core::BlockT> {
187
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
188
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
189
    pub sync_service: Arc<SyncingService<Block>>,
190
}
191

            
192
/// Allows to create a parachain-defined executor from a `WasmExecutor`
193
pub trait TanssiExecutorExt {
194
    type HostFun: HostFunctions;
195
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
196
}
197

            
198
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
199
    type HostFun = ParachainHostFunctions;
200

            
201
1344
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
202
1344
        wasm_executor
203
1344
    }
204
}
205

            
206
// `new` function doesn't take self, and the Rust compiler cannot infer that
207
// only one type T implements `TypeIdentity`. With thus need a separate impl
208
// block with concrete types `()`.
209
impl<T: NodeBuilderConfig> NodeBuilder<T>
210
where
211
    BlockOf<T>: cumulus_primitives_core::BlockT,
212
    ExecutorOf<T>:
213
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
214
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
215
    BlockHashOf<T>: Unpin,
216
{
217
    /// Create a new `NodeBuilder` which prepare objects required to launch a
218
    /// node. However it only starts telemetry, and doesn't provide any
219
    /// network-dependent objects (as it requires an import queue, which usually
220
    /// is different for each node).
221
448
    fn new(
222
448
        parachain_config: &Configuration,
223
448
        hwbench: Option<sc_sysinfo::HwBench>,
224
448
    ) -> Result<Self, sc_service::Error> {
225
        // Refactor: old new_partial
226

            
227
448
        let telemetry = parachain_config
228
448
            .telemetry_endpoints
229
448
            .clone()
230
448
            .filter(|x| !x.is_empty())
231
448
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
232
                let worker = TelemetryWorker::new(16)?;
233
                let telemetry = worker.handle().new_telemetry(endpoints);
234
                Ok((worker, telemetry))
235
            })
236
448
            .transpose()?;
237

            
238
448
        let heap_pages =
239
448
            parachain_config
240
448
                .executor
241
448
                .default_heap_pages
242
448
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
243
                    extra_pages: h as u32,
244
                });
245

            
246
        // Default runtime_cache_size is 2
247
        // For now we can work with this, but it will likely need
248
        // to change once we start having runtime_cache_sizes, or
249
        // run nodes with the maximum for this value
250
448
        let mut wasm_builder = WasmExecutor::builder()
251
448
            .with_execution_method(parachain_config.executor.wasm_method)
252
448
            .with_onchain_heap_alloc_strategy(heap_pages)
253
448
            .with_offchain_heap_alloc_strategy(heap_pages)
254
448
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
255
448
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
256
448
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
257
422
        {
258
422
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
259
422
        }
260

            
261
448
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
262

            
263
448
        let (client, backend, keystore_container, task_manager) =
264
448
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
265
448
                parachain_config,
266
448
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
267
448
                executor,
268
                true,
269
            )?;
270
448
        let client = Arc::new(client);
271

            
272
448
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
273

            
274
448
        let telemetry = telemetry.map(|(worker, telemetry)| {
275
            task_manager
276
                .spawn_handle()
277
                .spawn("telemetry", None, worker.run());
278
            telemetry
279
        });
280

            
281
448
        let transaction_pool = sc_transaction_pool::Builder::new(
282
448
            task_manager.spawn_essential_handle(),
283
448
            client.clone(),
284
448
            parachain_config.role.is_authority().into(),
285
        )
286
448
        .with_options(parachain_config.transaction_pool.clone())
287
448
        .with_prometheus(parachain_config.prometheus_registry())
288
448
        .build();
289

            
290
448
        Ok(Self {
291
448
            client,
292
448
            backend,
293
448
            transaction_pool: transaction_pool.into(),
294
448
            telemetry,
295
448
            telemetry_worker_handle,
296
448
            task_manager,
297
448
            keystore_container,
298
448
            hwbench,
299
448
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
300
448
            network: TypeIdentity::from_type(()),
301
448
            tx_handler_controller: TypeIdentity::from_type(()),
302
448
            import_queue_service: TypeIdentity::from_type(()),
303
448
        })
304
448
    }
305
}
306

            
307
#[derive(Clone)]
308
pub struct StartBootnodeParams {
309
    pub relay_chain_fork_id: Option<String>,
310
    pub parachain_fork_id: Option<String>,
311
    pub advertise_non_global_ips: bool,
312
    pub parachain_public_addresses: Vec<Multiaddr>,
313
    pub relay_chain_network: Arc<dyn NetworkService>,
314
    pub paranode_rx: Receiver<IncomingRequest>,
315
    pub embedded_dht_bootnode: bool,
316
    pub dht_bootnode_discovery: bool,
317
}
318

            
319
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
320
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
321
where
322
    BlockOf<T>: cumulus_primitives_core::BlockT,
323
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
324
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
325
{
326
    pub async fn build_relay_chain_interface(
327
        &mut self,
328
        parachain_config: &Configuration,
329
        polkadot_config: Configuration,
330
        collator_options: CollatorOptions,
331
    ) -> sc_service::error::Result<(
332
        Arc<(dyn RelayChainInterface + 'static)>,
333
        Option<CollatorPair>,
334
        StartBootnodeParams,
335
    )> {
336
        let relay_chain_fork_id = polkadot_config
337
            .chain_spec
338
            .fork_id()
339
            .map(ToString::to_string);
340
        let parachain_fork_id = parachain_config
341
            .chain_spec
342
            .fork_id()
343
            .map(ToString::to_string);
344
        let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
345
        let parachain_public_addresses = parachain_config.network.public_addresses.clone();
346

            
347
        let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
348
            build_relay_chain_interface(
349
                polkadot_config,
350
                parachain_config,
351
                self.telemetry_worker_handle.clone(),
352
                &mut self.task_manager,
353
                collator_options.clone(),
354
                self.hwbench.clone(),
355
            )
356
            .await
357
            .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
358

            
359
        let start_bootnode_params = StartBootnodeParams {
360
            relay_chain_fork_id,
361
            parachain_fork_id,
362
            advertise_non_global_ips,
363
            parachain_public_addresses,
364
            relay_chain_network,
365
            paranode_rx,
366
            embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
367
            dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
368
        };
369

            
370
        Ok((relay_chain_interface, collator_key, start_bootnode_params))
371
    }
372

            
373
    /// Given an import queue, calls [`cumulus_client_service::build_network`] and
374
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
375
    ///
376
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
377
    /// data.
378
    pub async fn build_cumulus_network<RCInterface, Net>(
379
        self,
380
        parachain_config: &Configuration,
381
        para_id: ParaId,
382
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
383
        relay_chain_interface: RCInterface,
384
    ) -> sc_service::error::Result<
385
        NodeBuilder<
386
            T,
387
            Network<BlockOf<T>>,
388
            TransactionsHandlerController<BlockHashOf<T>>,
389
            ImportQueueServiceOf<T>,
390
        >,
391
    >
392
    where
393
        SNetwork: TypeIdentity<Type = ()>,
394
        STxHandler: TypeIdentity<Type = ()>,
395
        SImportQueueService: TypeIdentity<Type = ()>,
396
        RCInterface: RelayChainInterface + Clone + 'static,
397
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
398
    {
399
        let Self {
400
            client,
401
            backend,
402
            transaction_pool,
403
            telemetry,
404
            telemetry_worker_handle,
405
            task_manager,
406
            keystore_container,
407
            hwbench,
408
            prometheus_registry,
409
            network: _,
410
            tx_handler_controller: _,
411
            import_queue_service: _,
412
        } = self;
413

            
414
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
415
            &parachain_config.network,
416
            prometheus_registry.clone(),
417
        );
418

            
419
        let import_queue_service = import_queue.service();
420
        let spawn_handle = task_manager.spawn_handle();
421

            
422
        let metrics = Net::register_notification_metrics(
423
            parachain_config
424
                .prometheus_config
425
                .as_ref()
426
                .map(|config| &config.registry),
427
        );
428

            
429
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
430
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
431
                parachain_config,
432
                client: client.clone(),
433
                transaction_pool: transaction_pool.clone(),
434
                spawn_handle,
435
                import_queue,
436
                para_id,
437
                relay_chain_interface,
438
                net_config,
439
                sybil_resistance_level: CollatorSybilResistance::Resistant,
440
                metrics,
441
            })
442
            .await?;
443

            
444
        Ok(NodeBuilder {
445
            client,
446
            backend,
447
            transaction_pool,
448
            telemetry,
449
            telemetry_worker_handle,
450
            task_manager,
451
            keystore_container,
452
            hwbench,
453
            prometheus_registry,
454
            network: Network {
455
                network,
456
                system_rpc_tx,
457
                sync_service,
458
            },
459
            tx_handler_controller,
460
            import_queue_service,
461
        })
462
    }
463

            
464
    /// Given an import queue, calls `sc_service::build_network` and
465
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
466
    ///
467
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
468
    /// data.
469
442
    pub fn build_substrate_network<Net>(
470
442
        self,
471
442
        parachain_config: &Configuration,
472
442
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
473
442
    ) -> sc_service::error::Result<
474
442
        NodeBuilder<
475
442
            T,
476
442
            Network<BlockOf<T>>,
477
442
            TransactionsHandlerController<BlockHashOf<T>>,
478
442
            ImportQueueServiceOf<T>,
479
442
        >,
480
442
    >
481
442
    where
482
442
        SNetwork: TypeIdentity<Type = ()>,
483
442
        STxHandler: TypeIdentity<Type = ()>,
484
442
        SImportQueueService: TypeIdentity<Type = ()>,
485
442
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
486
    {
487
        let Self {
488
442
            client,
489
442
            backend,
490
442
            transaction_pool,
491
442
            telemetry,
492
442
            telemetry_worker_handle,
493
442
            task_manager,
494
442
            keystore_container,
495
442
            hwbench,
496
442
            prometheus_registry,
497
            network: _,
498
            tx_handler_controller: _,
499
            import_queue_service: _,
500
442
        } = self;
501

            
502
442
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
503
442
            &parachain_config.network,
504
442
            prometheus_registry.clone(),
505
        );
506

            
507
442
        let metrics = Net::register_notification_metrics(
508
442
            parachain_config
509
442
                .prometheus_config
510
442
                .as_ref()
511
442
                .map(|cfg| &cfg.registry),
512
        );
513

            
514
442
        let import_queue_service = import_queue.service();
515

            
516
442
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
517
442
            sc_service::build_network(sc_service::BuildNetworkParams {
518
442
                config: parachain_config,
519
442
                client: client.clone(),
520
442
                transaction_pool: transaction_pool.clone(),
521
442
                spawn_handle: task_manager.spawn_handle(),
522
442
                import_queue,
523
442
                warp_sync_config: None,
524
442
                block_announce_validator_builder: None,
525
442
                net_config,
526
442
                block_relay: None,
527
442
                metrics,
528
442
            })?;
529

            
530
442
        Ok(NodeBuilder {
531
442
            client,
532
442
            backend,
533
442
            transaction_pool,
534
442
            telemetry,
535
442
            telemetry_worker_handle,
536
442
            task_manager,
537
442
            keystore_container,
538
442
            hwbench,
539
442
            prometheus_registry,
540
442
            network: Network {
541
442
                network,
542
442
                system_rpc_tx,
543
442
                sync_service,
544
442
            },
545
442
            tx_handler_controller,
546
442
            import_queue_service,
547
442
        })
548
442
    }
549

            
550
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
551
    /// node. It consumes `self.tx_handler_controller` in the process, which means
552
    /// it can only be called once, and any other code that would need this
553
    /// controller should interact with it before calling this function.
554
442
    pub fn spawn_common_tasks<TRpc>(
555
442
        self,
556
442
        parachain_config: Configuration,
557
442
        rpc_builder: Box<
558
442
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
559
442
        >,
560
442
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
561
442
    where
562
442
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
563
442
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
564
442
        BlockHashOf<T>: Unpin,
565
442
        BlockHeaderOf<T>: Unpin,
566
442
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
567
442
            + BlockBuilder<BlockOf<T>>
568
442
            + OffchainWorkerApi<BlockOf<T>>
569
442
            + sp_api::Metadata<BlockOf<T>>
570
442
            + sp_session::SessionKeys<BlockOf<T>>,
571
    {
572
        let NodeBuilder {
573
442
            client,
574
442
            backend,
575
442
            transaction_pool,
576
442
            mut telemetry,
577
442
            telemetry_worker_handle,
578
442
            mut task_manager,
579
442
            keystore_container,
580
442
            hwbench,
581
442
            prometheus_registry,
582
442
            network,
583
442
            tx_handler_controller,
584
442
            import_queue_service,
585
442
        } = self;
586

            
587
442
        let network = TypeIdentity::into_type(network);
588
442
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
589

            
590
442
        let collator = parachain_config.role.is_authority();
591

            
592
442
        if parachain_config.offchain_worker.enabled {
593
442
            task_manager.spawn_handle().spawn(
594
                "offchain-workers-runner",
595
                "offchain-work",
596
442
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
597
442
                    runtime_api_provider: client.clone(),
598
442
                    keystore: Some(keystore_container.keystore()),
599
442
                    offchain_db: backend.offchain_storage(),
600
442
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
601
442
                        transaction_pool.clone(),
602
442
                    )),
603
442
                    network_provider: Arc::new(network.network.clone()),
604
442
                    is_validator: parachain_config.role.is_authority(),
605
                    enable_http_requests: false,
606
                    custom_extensions: move |_| vec![],
607
                })?
608
442
                .run(client.clone(), task_manager.spawn_handle())
609
442
                .boxed(),
610
            );
611
        }
612

            
613
442
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
614
442
            rpc_builder,
615
442
            client: client.clone(),
616
442
            transaction_pool: transaction_pool.clone(),
617
442
            task_manager: &mut task_manager,
618
442
            config: parachain_config,
619
442
            keystore: keystore_container.keystore(),
620
442
            backend: backend.clone(),
621
442
            network: network.network.clone(),
622
442
            system_rpc_tx: network.system_rpc_tx.clone(),
623
442
            tx_handler_controller,
624
442
            telemetry: telemetry.as_mut(),
625
442
            sync_service: network.sync_service.clone(),
626
442
        })?;
627

            
628
442
        if let Some(hwbench) = &hwbench {
629
            sc_sysinfo::print_hwbench(hwbench);
630
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
631
            // in there and swapping out the requirements for your own are probably a good idea. The
632
            // requirements for a para-chain are dictated by its relay-chain.
633
            if collator {
634
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
635
                    log::warn!(
636
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
637
                        err
638
                    );
639
                }
640
            }
641

            
642
            if let Some(ref mut telemetry) = telemetry {
643
                let telemetry_handle = telemetry.handle();
644
                task_manager.spawn_handle().spawn(
645
                    "telemetry_hwbench",
646
                    None,
647
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
648
                );
649
            }
650
442
        }
651

            
652
442
        Ok(NodeBuilder {
653
442
            client,
654
442
            backend,
655
442
            transaction_pool,
656
442
            telemetry,
657
442
            telemetry_worker_handle,
658
442
            task_manager,
659
442
            keystore_container,
660
442
            hwbench,
661
442
            prometheus_registry,
662
442
            network: TypeIdentity::from_type(network),
663
442
            tx_handler_controller: TypeIdentity::from_type(()),
664
442
            import_queue_service,
665
442
        })
666
442
    }
667

            
668
442
    pub fn install_manual_seal<BI, SC, CIDP>(
669
442
        &mut self,
670
442
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
671
442
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
672
442
    where
673
442
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
674
442
        SC: SelectChain<BlockOf<T>> + 'static,
675
442
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
676
    {
677
        let ManualSealConfiguration {
678
442
            sealing,
679
442
            soft_deadline,
680
442
            block_import,
681
442
            select_chain,
682
442
            consensus_data_provider,
683
442
            create_inherent_data_providers,
684
442
        } = manual_seal_config;
685

            
686
442
        let prometheus_registry = self.prometheus_registry.clone();
687

            
688
442
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
689
442
            self.task_manager.spawn_handle(),
690
442
            self.client.clone(),
691
442
            self.transaction_pool.clone(),
692
442
            prometheus_registry.as_ref(),
693
442
            self.telemetry.as_ref().map(|x| x.handle()),
694
        );
695

            
696
442
        let mut command_sink = None;
697

            
698
442
        if let Some(deadline) = soft_deadline {
699
200
            env.set_soft_deadline(deadline);
700
442
        }
701

            
702
442
        let commands_stream: Box<
703
442
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
704
442
        > = match sealing {
705
            Sealing::Instant => {
706
20
                Box::new(
707
                    // This bit cribbed from the implementation of instant seal.
708
20
                    self.transaction_pool.import_notification_stream().map(|_| {
709
                        EngineCommand::SealNewBlock {
710
                            create_empty: false,
711
                            finalize: false,
712
                            parent_hash: None,
713
                            sender: None,
714
                        }
715
                    }),
716
                )
717
            }
718
            Sealing::Manual => {
719
422
                let (sink, stream) = futures::channel::mpsc::channel(1000);
720
                // Keep a reference to the other end of the channel. It goes to the RPC.
721
422
                command_sink = Some(sink);
722
422
                Box::new(stream)
723
            }
724
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
725
                Timer::interval(Duration::from_millis(millis)),
726
                |_| EngineCommand::SealNewBlock {
727
                    create_empty: true,
728
                    finalize: true,
729
                    parent_hash: None,
730
                    sender: None,
731
                },
732
            )),
733
        };
734

            
735
442
        self.task_manager.spawn_essential_handle().spawn_blocking(
736
            "authorship_task",
737
442
            Some("block-authoring"),
738
442
            run_manual_seal(ManualSealParams {
739
442
                block_import,
740
442
                env,
741
442
                client: self.client.clone(),
742
442
                pool: self.transaction_pool.clone(),
743
442
                commands_stream,
744
442
                select_chain,
745
442
                consensus_data_provider,
746
442
                create_inherent_data_providers,
747
442
            }),
748
        );
749

            
750
442
        Ok(command_sink)
751
442
    }
752

            
753
    pub fn start_full_node<RCInterface>(
754
        self,
755
        para_id: ParaId,
756
        relay_chain_interface: RCInterface,
757
        relay_chain_slot_duration: Duration,
758
        start_bootnode_params: StartBootnodeParams,
759
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
760
    where
761
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
762
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
763
        RCInterface: RelayChainInterface + Clone + 'static,
764
        RCInterface: TypeIdentity<Type = Arc<dyn RelayChainInterface + 'static>>,
765
        BlockHashOf<T>: TypeIdentity<Type = H256>,
766
    {
767
        let NodeBuilder {
768
            client,
769
            backend,
770
            transaction_pool,
771
            telemetry,
772
            telemetry_worker_handle,
773
            mut task_manager,
774
            keystore_container,
775
            hwbench,
776
            prometheus_registry,
777
            network,
778
            tx_handler_controller,
779
            import_queue_service,
780
        } = self;
781

            
782
        let network = TypeIdentity::into_type(network);
783
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
784

            
785
        let announce_block = {
786
            let sync_service = network.sync_service.clone();
787
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
788
        };
789

            
790
        let overseer_handle = relay_chain_interface
791
            .overseer_handle()
792
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
793

            
794
        let params = StartFullNodeParams {
795
            client: client.clone(),
796
            announce_block,
797
            task_manager: &mut task_manager,
798
            para_id,
799
            relay_chain_interface: relay_chain_interface.clone(),
800
            relay_chain_slot_duration,
801
            import_queue: import_queue_service,
802
            recovery_handle: Box::new(overseer_handle),
803
            sync_service: network.sync_service.clone(),
804
            prometheus_registry: prometheus_registry.as_ref(),
805
        };
806

            
807
        // TODO: change for async backing
808
        // TODO: to fix deprecation warning, we only need to change
809
        // `start_full_node` to `start_relay_chain_tasks`
810
        #[allow(deprecated)]
811
        cumulus_client_service::start_full_node(params)?;
812

            
813
        let StartBootnodeParams {
814
            relay_chain_fork_id,
815
            parachain_fork_id,
816
            advertise_non_global_ips,
817
            parachain_public_addresses,
818
            relay_chain_network,
819
            paranode_rx,
820
            embedded_dht_bootnode,
821
            dht_bootnode_discovery,
822
        } = start_bootnode_params;
823

            
824
        // Advertise parachain bootnode address in relay chain DHT
825
        start_bootnode_tasks(StartBootnodeTasksParams {
826
            embedded_dht_bootnode,
827
            dht_bootnode_discovery,
828
            para_id,
829
            task_manager: &mut task_manager,
830
            relay_chain_interface: TypeIdentity::into_type(relay_chain_interface),
831
            relay_chain_fork_id,
832
            relay_chain_network,
833
            request_receiver: paranode_rx,
834
            parachain_network: network.network.clone(),
835
            advertise_non_global_ips,
836
            parachain_genesis_hash: TypeIdentity::into_type(client.chain_info().genesis_hash),
837
            parachain_fork_id,
838
            parachain_public_addresses,
839
        });
840

            
841
        Ok(NodeBuilder {
842
            client,
843
            backend,
844
            transaction_pool,
845
            telemetry,
846
            telemetry_worker_handle,
847
            task_manager,
848
            keystore_container,
849
            hwbench,
850
            prometheus_registry,
851
            network: TypeIdentity::from_type(network),
852
            tx_handler_controller,
853
            import_queue_service: (),
854
        })
855
    }
856

            
857
    pub fn extract_import_queue_service(
858
        self,
859
    ) -> (
860
        NodeBuilder<T, SNetwork, STxHandler, ()>,
861
        SImportQueueService,
862
    )
863
    where
864
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
865
    {
866
        let NodeBuilder {
867
            client,
868
            backend,
869
            transaction_pool,
870
            telemetry,
871
            telemetry_worker_handle,
872
            task_manager,
873
            keystore_container,
874
            hwbench,
875
            prometheus_registry,
876
            network,
877
            tx_handler_controller,
878
            import_queue_service,
879
        } = self;
880

            
881
        (
882
            NodeBuilder {
883
                client,
884
                backend,
885
                transaction_pool,
886
                telemetry,
887
                telemetry_worker_handle,
888
                task_manager,
889
                keystore_container,
890
                hwbench,
891
                prometheus_registry,
892
                network,
893
                tx_handler_controller,
894
                import_queue_service: (),
895
            },
896
            import_queue_service,
897
        )
898
    }
899
}
900

            
901
/// Block authoring scheme to be used by the dev service.
902
#[derive(Debug, Copy, Clone)]
903
pub enum Sealing {
904
    /// Author a block immediately upon receiving a transaction into the transaction pool
905
    Instant,
906
    /// Author a block upon receiving an RPC command
907
    Manual,
908
    /// Author blocks at a regular interval specified in milliseconds
909
    Interval(u64),
910
}
911

            
912
impl FromStr for Sealing {
913
    type Err = String;
914

            
915
1428
    fn from_str(s: &str) -> Result<Self, Self::Err> {
916
1428
        Ok(match s {
917
1428
            "instant" => Self::Instant,
918
1266
            "manual" => Self::Manual,
919
            s => {
920
                let millis = s
921
                    .parse::<u64>()
922
                    .map_err(|_| "couldn't decode sealing param")?;
923
                Self::Interval(millis)
924
            }
925
        })
926
1428
    }
927
}
928

            
929
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
930
    pub sealing: Sealing,
931
    pub block_import: BI,
932
    pub soft_deadline: Option<Percent>,
933
    pub select_chain: SC,
934
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
935
    pub create_inherent_data_providers: CIDP,
936
}