1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::HostFunctions, HeapAllocStrategy, RuntimeVersionOf, WasmExecutor,
40
        DEFAULT_HEAP_ALLOC_STRATEGY,
41
    },
42
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
43
    sc_network_sync::SyncingService,
44
    sc_network_transactions::TransactionsHandlerController,
45
    sc_service::{
46
        Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
47
    },
48
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
49
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
50
    sc_utils::mpsc::TracingUnboundedSender,
51
    sp_api::ConstructRuntimeApi,
52
    sp_block_builder::BlockBuilder,
53
    sp_consensus::SelectChain,
54
    sp_core::traits::CodeExecutor,
55
    sp_inherents::CreateInherentDataProviders,
56
    sp_offchain::OffchainWorkerApi,
57
    sp_runtime::Percent,
58
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
59
    std::{str::FromStr, sync::Arc},
60
};
61
use {sc_transaction_pool_api::TransactionPool, sp_api::StorageProof, sp_core::traits::SpawnNamed};
62

            
63
tp_traits::alias!(
64
    pub trait MinimalRuntimeApi<
65
        Block: (cumulus_primitives_core::BlockT),
66
        Client: (sp_api::CallApiAt<Block>),
67
    > :
68
        ConstructRuntimeApi<
69
            Block,
70
            Client,
71
            RuntimeApi:
72
                TaggedTransactionQueue<Block>
73
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
74
                + sp_api::Metadata<Block>
75
                + sp_session::SessionKeys<Block>,
76
        > + Send + Sync + 'static
77
);
78

            
79
tp_traits::alias!(
80
    pub trait MinimalCumulusRuntimeApi<
81
        Block: (cumulus_primitives_core::BlockT),
82
        Client: (sp_api::CallApiAt<Block>),
83
    > :
84
        MinimalRuntimeApi<Block, Client> +
85
        ConstructRuntimeApi<
86
            Block,
87
            Client,
88
            RuntimeApi:
89
                cumulus_primitives_core::CollectCollationInfo<Block>,
90
        >
91
);
92

            
93
/// Trait to configure the main types the builder rely on, bundled in a single
94
/// type to reduce verbosity and the amount of type parameters.
95
pub trait NodeBuilderConfig {
96
    type Block;
97
    type RuntimeApi;
98
    type ParachainExecutor;
99

            
100
    /// Create a new `NodeBuilder` using the types of this `Config`, along
101
    /// with the parachain `Configuration` and an optional `HwBench`.
102
442
    fn new_builder(
103
442
        parachain_config: &Configuration,
104
442
        hwbench: Option<sc_sysinfo::HwBench>,
105
442
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
106
442
    where
107
442
        Self: Sized,
108
442
        BlockOf<Self>: cumulus_primitives_core::BlockT,
109
442
        ExecutorOf<Self>:
110
442
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
111
442
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
112
442
        BlockHashOf<Self>: Unpin,
113
    {
114
442
        NodeBuilder::<Self>::new(parachain_config, hwbench)
115
442
    }
116
}
117

            
118
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
119
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
120
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
121
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
122
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
123
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
124
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
125
pub type ConstructedRuntimeApiOf<T> =
126
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
127
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
128
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
129

            
130
// `Cumulus` and `TxHandler` are types that will change during the life of
131
// a `NodeBuilder` because they are generated and consumed when calling
132
// certain functions, with absence of data represented with `()`. Some
133
// function are implemented only for a given concrete type, which ensure it
134
// can only be called if the required data is available (generated and not yet
135
// consumed).
136
//
137
// While this could be implemented with multiple impl blocks with concrete types,
138
// we use here `core_extensions::TypeIdentity` which allow to express type
139
// identity/equality as a trait bound on each function as it removes the
140
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
141
// are still required since Rust can't infer the types in the `new` function
142
// that doesn't take `self`.
143
pub struct NodeBuilder<
144
    T: NodeBuilderConfig,
145
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
146
    // but can only be called with an `import_queue` which can be different in
147
    // each node. For that reason it is a `()` when calling `new`, then the
148
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
149
    // then call `build_cumulus_network` with it to generate the cumulus systems.
150
    SNetwork = (),
151
    // The `TxHandler` is constructed in `build_X_network`
152
    // and is then consumed when calling `spawn_common_tasks`.
153
    STxHandler = (),
154
    // The import queue service is obtained from the import queue in
155
    // `build_cumulus_network` or `build_substrate_network`, which also
156
    // consumes the import queue. Neither of them are clonable, so we need to
157
    // to store the service here to be able to consume it later in
158
    // `start_full_node`.
159
    SImportQueueService = (),
160
> where
161
    BlockOf<T>: cumulus_primitives_core::BlockT,
162
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
163
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
164
{
165
    pub client: Arc<ClientOf<T>>,
166
    pub backend: Arc<BackendOf<T>>,
167
    pub task_manager: TaskManager,
168
    pub keystore_container: KeystoreContainer,
169
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
170
    pub telemetry: Option<Telemetry>,
171
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
172

            
173
    pub hwbench: Option<sc_sysinfo::HwBench>,
174
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
175

            
176
    pub network: SNetwork,
177
    pub tx_handler_controller: STxHandler,
178
    pub import_queue_service: SImportQueueService,
179
}
180

            
181
pub struct Network<Block: cumulus_primitives_core::BlockT> {
182
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
183
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
184
    pub sync_service: Arc<SyncingService<Block>>,
185
}
186

            
187
/// Allows to create a parachain-defined executor from a `WasmExecutor`
188
pub trait TanssiExecutorExt {
189
    type HostFun: HostFunctions;
190
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
191
}
192

            
193
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
194
    type HostFun = ParachainHostFunctions;
195

            
196
1326
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
197
1326
        wasm_executor
198
1326
    }
199
}
200

            
201
// `new` function doesn't take self, and the Rust compiler cannot infer that
202
// only one type T implements `TypeIdentity`. With thus need a separate impl
203
// block with concrete types `()`.
204
impl<T: NodeBuilderConfig> NodeBuilder<T>
205
where
206
    BlockOf<T>: cumulus_primitives_core::BlockT,
207
    ExecutorOf<T>:
208
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
209
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
210
    BlockHashOf<T>: Unpin,
211
{
212
    /// Create a new `NodeBuilder` which prepare objects required to launch a
213
    /// node. However it only starts telemetry, and doesn't provide any
214
    /// network-dependent objects (as it requires an import queue, which usually
215
    /// is different for each node).
216
442
    fn new(
217
442
        parachain_config: &Configuration,
218
442
        hwbench: Option<sc_sysinfo::HwBench>,
219
442
    ) -> Result<Self, sc_service::Error> {
220
        // Refactor: old new_partial
221

            
222
442
        let telemetry = parachain_config
223
442
            .telemetry_endpoints
224
442
            .clone()
225
442
            .filter(|x| !x.is_empty())
226
442
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
227
                let worker = TelemetryWorker::new(16)?;
228
                let telemetry = worker.handle().new_telemetry(endpoints);
229
                Ok((worker, telemetry))
230
            })
231
442
            .transpose()?;
232

            
233
442
        let heap_pages =
234
442
            parachain_config
235
442
                .executor
236
442
                .default_heap_pages
237
442
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
238
                    extra_pages: h as u32,
239
                });
240

            
241
        // Default runtime_cache_size is 2
242
        // For now we can work with this, but it will likely need
243
        // to change once we start having runtime_cache_sizes, or
244
        // run nodes with the maximum for this value
245
442
        let mut wasm_builder = WasmExecutor::builder()
246
442
            .with_execution_method(parachain_config.executor.wasm_method)
247
442
            .with_onchain_heap_alloc_strategy(heap_pages)
248
442
            .with_offchain_heap_alloc_strategy(heap_pages)
249
442
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
250
442
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
251
442
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
252
416
        {
253
416
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
254
416
        }
255

            
256
442
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
257

            
258
442
        let (client, backend, keystore_container, task_manager) =
259
442
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
260
442
                parachain_config,
261
442
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
262
442
                executor,
263
                true,
264
            )?;
265
442
        let client = Arc::new(client);
266

            
267
442
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
268

            
269
442
        let telemetry = telemetry.map(|(worker, telemetry)| {
270
            task_manager
271
                .spawn_handle()
272
                .spawn("telemetry", None, worker.run());
273
            telemetry
274
        });
275

            
276
442
        let transaction_pool = sc_transaction_pool::Builder::new(
277
442
            task_manager.spawn_essential_handle(),
278
442
            client.clone(),
279
442
            parachain_config.role.is_authority().into(),
280
        )
281
442
        .with_options(parachain_config.transaction_pool.clone())
282
442
        .with_prometheus(parachain_config.prometheus_registry())
283
442
        .build();
284

            
285
442
        Ok(Self {
286
442
            client,
287
442
            backend,
288
442
            transaction_pool: transaction_pool.into(),
289
442
            telemetry,
290
442
            telemetry_worker_handle,
291
442
            task_manager,
292
442
            keystore_container,
293
442
            hwbench,
294
442
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
295
442
            network: TypeIdentity::from_type(()),
296
442
            tx_handler_controller: TypeIdentity::from_type(()),
297
442
            import_queue_service: TypeIdentity::from_type(()),
298
442
        })
299
442
    }
300
}
301

            
302
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
303
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
304
where
305
    BlockOf<T>: cumulus_primitives_core::BlockT,
306
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
307
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
308
{
309
    pub async fn build_relay_chain_interface(
310
        &mut self,
311
        parachain_config: &Configuration,
312
        polkadot_config: Configuration,
313
        collator_options: CollatorOptions,
314
    ) -> sc_service::error::Result<(
315
        Arc<(dyn RelayChainInterface + 'static)>,
316
        Option<CollatorPair>,
317
    )> {
318
        // FIXME(MD-1374): support DHT bootnodes
319
        let (relay_chain_interface, collator_key, _relay_chain_network, _paranode_rx) =
320
            build_relay_chain_interface(
321
                polkadot_config,
322
                parachain_config,
323
                self.telemetry_worker_handle.clone(),
324
                &mut self.task_manager,
325
                collator_options.clone(),
326
                self.hwbench.clone(),
327
            )
328
            .await
329
            .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
330

            
331
        Ok((relay_chain_interface, collator_key))
332
    }
333

            
334
    /// Given an import queue, calls [`cumulus_client_service::build_network`] and
335
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
336
    ///
337
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
338
    /// data.
339
    pub async fn build_cumulus_network<RCInterface, Net>(
340
        self,
341
        parachain_config: &Configuration,
342
        para_id: ParaId,
343
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
344
        relay_chain_interface: RCInterface,
345
    ) -> sc_service::error::Result<
346
        NodeBuilder<
347
            T,
348
            Network<BlockOf<T>>,
349
            TransactionsHandlerController<BlockHashOf<T>>,
350
            ImportQueueServiceOf<T>,
351
        >,
352
    >
353
    where
354
        SNetwork: TypeIdentity<Type = ()>,
355
        STxHandler: TypeIdentity<Type = ()>,
356
        SImportQueueService: TypeIdentity<Type = ()>,
357
        RCInterface: RelayChainInterface + Clone + 'static,
358
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
359
    {
360
        let Self {
361
            client,
362
            backend,
363
            transaction_pool,
364
            telemetry,
365
            telemetry_worker_handle,
366
            task_manager,
367
            keystore_container,
368
            hwbench,
369
            prometheus_registry,
370
            network: _,
371
            tx_handler_controller: _,
372
            import_queue_service: _,
373
        } = self;
374

            
375
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
376
            &parachain_config.network,
377
            prometheus_registry.clone(),
378
        );
379

            
380
        let import_queue_service = import_queue.service();
381
        let spawn_handle = task_manager.spawn_handle();
382

            
383
        let metrics = Net::register_notification_metrics(
384
            parachain_config
385
                .prometheus_config
386
                .as_ref()
387
                .map(|config| &config.registry),
388
        );
389

            
390
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
391
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
392
                parachain_config,
393
                client: client.clone(),
394
                transaction_pool: transaction_pool.clone(),
395
                spawn_handle,
396
                import_queue,
397
                para_id,
398
                relay_chain_interface,
399
                net_config,
400
                sybil_resistance_level: CollatorSybilResistance::Resistant,
401
                metrics,
402
            })
403
            .await?;
404

            
405
        Ok(NodeBuilder {
406
            client,
407
            backend,
408
            transaction_pool,
409
            telemetry,
410
            telemetry_worker_handle,
411
            task_manager,
412
            keystore_container,
413
            hwbench,
414
            prometheus_registry,
415
            network: Network {
416
                network,
417
                system_rpc_tx,
418
                sync_service,
419
            },
420
            tx_handler_controller,
421
            import_queue_service,
422
        })
423
    }
424

            
425
    /// Given an import queue, calls `sc_service::build_network` and
426
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
427
    ///
428
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
429
    /// data.
430
436
    pub fn build_substrate_network<Net>(
431
436
        self,
432
436
        parachain_config: &Configuration,
433
436
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
434
436
    ) -> sc_service::error::Result<
435
436
        NodeBuilder<
436
436
            T,
437
436
            Network<BlockOf<T>>,
438
436
            TransactionsHandlerController<BlockHashOf<T>>,
439
436
            ImportQueueServiceOf<T>,
440
436
        >,
441
436
    >
442
436
    where
443
436
        SNetwork: TypeIdentity<Type = ()>,
444
436
        STxHandler: TypeIdentity<Type = ()>,
445
436
        SImportQueueService: TypeIdentity<Type = ()>,
446
436
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
447
    {
448
        let Self {
449
436
            client,
450
436
            backend,
451
436
            transaction_pool,
452
436
            telemetry,
453
436
            telemetry_worker_handle,
454
436
            task_manager,
455
436
            keystore_container,
456
436
            hwbench,
457
436
            prometheus_registry,
458
            network: _,
459
            tx_handler_controller: _,
460
            import_queue_service: _,
461
436
        } = self;
462

            
463
436
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
464
436
            &parachain_config.network,
465
436
            prometheus_registry.clone(),
466
        );
467

            
468
436
        let metrics = Net::register_notification_metrics(
469
436
            parachain_config
470
436
                .prometheus_config
471
436
                .as_ref()
472
436
                .map(|cfg| &cfg.registry),
473
        );
474

            
475
436
        let import_queue_service = import_queue.service();
476

            
477
436
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
478
436
            sc_service::build_network(sc_service::BuildNetworkParams {
479
436
                config: parachain_config,
480
436
                client: client.clone(),
481
436
                transaction_pool: transaction_pool.clone(),
482
436
                spawn_handle: task_manager.spawn_handle(),
483
436
                import_queue,
484
436
                warp_sync_config: None,
485
436
                block_announce_validator_builder: None,
486
436
                net_config,
487
436
                block_relay: None,
488
436
                metrics,
489
436
            })?;
490

            
491
436
        Ok(NodeBuilder {
492
436
            client,
493
436
            backend,
494
436
            transaction_pool,
495
436
            telemetry,
496
436
            telemetry_worker_handle,
497
436
            task_manager,
498
436
            keystore_container,
499
436
            hwbench,
500
436
            prometheus_registry,
501
436
            network: Network {
502
436
                network,
503
436
                system_rpc_tx,
504
436
                sync_service,
505
436
            },
506
436
            tx_handler_controller,
507
436
            import_queue_service,
508
436
        })
509
436
    }
510

            
511
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
512
    /// node. It consumes `self.tx_handler_controller` in the process, which means
513
    /// it can only be called once, and any other code that would need this
514
    /// controller should interact with it before calling this function.
515
436
    pub fn spawn_common_tasks<TRpc>(
516
436
        self,
517
436
        parachain_config: Configuration,
518
436
        rpc_builder: Box<
519
436
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
520
436
        >,
521
436
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
522
436
    where
523
436
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
524
436
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
525
436
        BlockHashOf<T>: Unpin,
526
436
        BlockHeaderOf<T>: Unpin,
527
436
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
528
436
            + BlockBuilder<BlockOf<T>>
529
436
            + OffchainWorkerApi<BlockOf<T>>
530
436
            + sp_api::Metadata<BlockOf<T>>
531
436
            + sp_session::SessionKeys<BlockOf<T>>,
532
    {
533
        let NodeBuilder {
534
436
            client,
535
436
            backend,
536
436
            transaction_pool,
537
436
            mut telemetry,
538
436
            telemetry_worker_handle,
539
436
            mut task_manager,
540
436
            keystore_container,
541
436
            hwbench,
542
436
            prometheus_registry,
543
436
            network,
544
436
            tx_handler_controller,
545
436
            import_queue_service,
546
436
        } = self;
547

            
548
436
        let network = TypeIdentity::into_type(network);
549
436
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
550

            
551
436
        let collator = parachain_config.role.is_authority();
552

            
553
436
        if parachain_config.offchain_worker.enabled {
554
436
            task_manager.spawn_handle().spawn(
555
                "offchain-workers-runner",
556
                "offchain-work",
557
436
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
558
436
                    runtime_api_provider: client.clone(),
559
436
                    keystore: Some(keystore_container.keystore()),
560
436
                    offchain_db: backend.offchain_storage(),
561
436
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
562
436
                        transaction_pool.clone(),
563
436
                    )),
564
436
                    network_provider: Arc::new(network.network.clone()),
565
436
                    is_validator: parachain_config.role.is_authority(),
566
                    enable_http_requests: false,
567
                    custom_extensions: move |_| vec![],
568
                })?
569
436
                .run(client.clone(), task_manager.spawn_handle())
570
436
                .boxed(),
571
            );
572
        }
573

            
574
436
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
575
436
            rpc_builder,
576
436
            client: client.clone(),
577
436
            transaction_pool: transaction_pool.clone(),
578
436
            task_manager: &mut task_manager,
579
436
            config: parachain_config,
580
436
            keystore: keystore_container.keystore(),
581
436
            backend: backend.clone(),
582
436
            network: network.network.clone(),
583
436
            system_rpc_tx: network.system_rpc_tx.clone(),
584
436
            tx_handler_controller,
585
436
            telemetry: telemetry.as_mut(),
586
436
            sync_service: network.sync_service.clone(),
587
436
        })?;
588

            
589
436
        if let Some(hwbench) = &hwbench {
590
            sc_sysinfo::print_hwbench(hwbench);
591
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
592
            // in there and swapping out the requirements for your own are probably a good idea. The
593
            // requirements for a para-chain are dictated by its relay-chain.
594
            if collator {
595
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
596
                    log::warn!(
597
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
598
                        err
599
                    );
600
                }
601
            }
602

            
603
            if let Some(ref mut telemetry) = telemetry {
604
                let telemetry_handle = telemetry.handle();
605
                task_manager.spawn_handle().spawn(
606
                    "telemetry_hwbench",
607
                    None,
608
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
609
                );
610
            }
611
436
        }
612

            
613
436
        Ok(NodeBuilder {
614
436
            client,
615
436
            backend,
616
436
            transaction_pool,
617
436
            telemetry,
618
436
            telemetry_worker_handle,
619
436
            task_manager,
620
436
            keystore_container,
621
436
            hwbench,
622
436
            prometheus_registry,
623
436
            network: TypeIdentity::from_type(network),
624
436
            tx_handler_controller: TypeIdentity::from_type(()),
625
436
            import_queue_service,
626
436
        })
627
436
    }
628

            
629
436
    pub fn install_manual_seal<BI, SC, CIDP>(
630
436
        &mut self,
631
436
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
632
436
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
633
436
    where
634
436
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
635
436
        SC: SelectChain<BlockOf<T>> + 'static,
636
436
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
637
    {
638
        let ManualSealConfiguration {
639
436
            sealing,
640
436
            soft_deadline,
641
436
            block_import,
642
436
            select_chain,
643
436
            consensus_data_provider,
644
436
            create_inherent_data_providers,
645
436
        } = manual_seal_config;
646

            
647
436
        let prometheus_registry = self.prometheus_registry.clone();
648

            
649
436
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
650
436
            self.task_manager.spawn_handle(),
651
436
            self.client.clone(),
652
436
            self.transaction_pool.clone(),
653
436
            prometheus_registry.as_ref(),
654
436
            self.telemetry.as_ref().map(|x| x.handle()),
655
        );
656

            
657
436
        let mut command_sink = None;
658

            
659
436
        if let Some(deadline) = soft_deadline {
660
198
            env.set_soft_deadline(deadline);
661
436
        }
662

            
663
436
        let commands_stream: Box<
664
436
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
665
436
        > = match sealing {
666
            Sealing::Instant => {
667
20
                Box::new(
668
                    // This bit cribbed from the implementation of instant seal.
669
20
                    self.transaction_pool.import_notification_stream().map(|_| {
670
                        EngineCommand::SealNewBlock {
671
                            create_empty: false,
672
                            finalize: false,
673
                            parent_hash: None,
674
                            sender: None,
675
                        }
676
                    }),
677
                )
678
            }
679
            Sealing::Manual => {
680
416
                let (sink, stream) = futures::channel::mpsc::channel(1000);
681
                // Keep a reference to the other end of the channel. It goes to the RPC.
682
416
                command_sink = Some(sink);
683
416
                Box::new(stream)
684
            }
685
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
686
                Timer::interval(Duration::from_millis(millis)),
687
                |_| EngineCommand::SealNewBlock {
688
                    create_empty: true,
689
                    finalize: true,
690
                    parent_hash: None,
691
                    sender: None,
692
                },
693
            )),
694
        };
695

            
696
436
        self.task_manager.spawn_essential_handle().spawn_blocking(
697
            "authorship_task",
698
436
            Some("block-authoring"),
699
436
            run_manual_seal(ManualSealParams {
700
436
                block_import,
701
436
                env,
702
436
                client: self.client.clone(),
703
436
                pool: self.transaction_pool.clone(),
704
436
                commands_stream,
705
436
                select_chain,
706
436
                consensus_data_provider,
707
436
                create_inherent_data_providers,
708
436
            }),
709
        );
710

            
711
436
        Ok(command_sink)
712
436
    }
713

            
714
    pub fn start_full_node<RCInterface>(
715
        self,
716
        para_id: ParaId,
717
        relay_chain_interface: RCInterface,
718
        relay_chain_slot_duration: Duration,
719
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
720
    where
721
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
722
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
723
        RCInterface: RelayChainInterface + Clone + 'static,
724
    {
725
        let NodeBuilder {
726
            client,
727
            backend,
728
            transaction_pool,
729
            telemetry,
730
            telemetry_worker_handle,
731
            mut task_manager,
732
            keystore_container,
733
            hwbench,
734
            prometheus_registry,
735
            network,
736
            tx_handler_controller,
737
            import_queue_service,
738
        } = self;
739

            
740
        let network = TypeIdentity::into_type(network);
741
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
742

            
743
        let announce_block = {
744
            let sync_service = network.sync_service.clone();
745
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
746
        };
747

            
748
        let overseer_handle = relay_chain_interface
749
            .overseer_handle()
750
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
751

            
752
        let params = StartFullNodeParams {
753
            client: client.clone(),
754
            announce_block,
755
            task_manager: &mut task_manager,
756
            para_id,
757
            relay_chain_interface,
758
            relay_chain_slot_duration,
759
            import_queue: import_queue_service,
760
            recovery_handle: Box::new(overseer_handle),
761
            sync_service: network.sync_service.clone(),
762
            prometheus_registry: prometheus_registry.as_ref(),
763
        };
764

            
765
        // TODO: change for async backing
766
        #[allow(deprecated)]
767
        cumulus_client_service::start_full_node(params)?;
768

            
769
        Ok(NodeBuilder {
770
            client,
771
            backend,
772
            transaction_pool,
773
            telemetry,
774
            telemetry_worker_handle,
775
            task_manager,
776
            keystore_container,
777
            hwbench,
778
            prometheus_registry,
779
            network: TypeIdentity::from_type(network),
780
            tx_handler_controller,
781
            import_queue_service: (),
782
        })
783
    }
784

            
785
    pub async fn start_collator<RCInterface>(
786
        self,
787
        para_id: ParaId,
788
        relay_chain_interface: RCInterface,
789
        relay_chain_slot_duration: Duration,
790
        parachain_consensus: ParachainConsensusOf<T>,
791
        collator_key: CollatorPair,
792
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
793
    where
794
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
795
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
796
        RCInterface: RelayChainInterface + Clone + 'static,
797
    {
798
        let NodeBuilder {
799
            client,
800
            backend,
801
            transaction_pool,
802
            telemetry,
803
            telemetry_worker_handle,
804
            mut task_manager,
805
            keystore_container,
806
            hwbench,
807
            prometheus_registry,
808
            network,
809
            tx_handler_controller,
810
            import_queue_service,
811
        } = self;
812

            
813
        let network = TypeIdentity::into_type(network);
814
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
815

            
816
        let spawner = task_manager.spawn_handle();
817
        let announce_block = {
818
            let sync_service = network.sync_service.clone();
819
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
820
        };
821
        let overseer_handle = relay_chain_interface
822
            .overseer_handle()
823
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
824

            
825
        let params = cumulus_client_service::StartCollatorParams {
826
            para_id,
827
            block_status: client.clone(),
828
            announce_block: announce_block.clone(),
829
            client: client.clone(),
830
            task_manager: &mut task_manager,
831
            relay_chain_interface: relay_chain_interface.clone(),
832
            spawner: spawner.clone(),
833
            parachain_consensus,
834
            import_queue: import_queue_service,
835
            collator_key,
836
            relay_chain_slot_duration,
837
            recovery_handle: Box::new(overseer_handle.clone()),
838
            sync_service: network.sync_service.clone(),
839
            prometheus_registry: prometheus_registry.as_ref(),
840
        };
841

            
842
        // TODO: change for async backing
843
        #[allow(deprecated)]
844
        cumulus_client_service::start_collator(params).await?;
845

            
846
        Ok(NodeBuilder {
847
            client,
848
            backend,
849
            transaction_pool,
850
            telemetry,
851
            telemetry_worker_handle,
852
            task_manager,
853
            keystore_container,
854
            hwbench,
855
            prometheus_registry,
856
            network: TypeIdentity::from_type(network),
857
            tx_handler_controller,
858
            import_queue_service: (),
859
        })
860
    }
861

            
862
    pub fn extract_import_queue_service(
863
        self,
864
    ) -> (
865
        NodeBuilder<T, SNetwork, STxHandler, ()>,
866
        SImportQueueService,
867
    )
868
    where
869
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
870
    {
871
        let NodeBuilder {
872
            client,
873
            backend,
874
            transaction_pool,
875
            telemetry,
876
            telemetry_worker_handle,
877
            task_manager,
878
            keystore_container,
879
            hwbench,
880
            prometheus_registry,
881
            network,
882
            tx_handler_controller,
883
            import_queue_service,
884
        } = self;
885

            
886
        (
887
            NodeBuilder {
888
                client,
889
                backend,
890
                transaction_pool,
891
                telemetry,
892
                telemetry_worker_handle,
893
                task_manager,
894
                keystore_container,
895
                hwbench,
896
                prometheus_registry,
897
                network,
898
                tx_handler_controller,
899
                import_queue_service: (),
900
            },
901
            import_queue_service,
902
        )
903
    }
904

            
905
    pub fn cumulus_client_collator_params_generator(
906
        &self,
907
        para_id: ParaId,
908
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
909
        collator_key: CollatorPair,
910
        parachain_consensus: ParachainConsensusOf<T>,
911
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
912
        BlockOf<T>,
913
        ClientOf<T>,
914
        ClientOf<T>,
915
        SpawnTaskHandle,
916
    > + Send
917
           + Clone
918
           + 'static
919
    where
920
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
921
    {
922
        let network = TypeIdentity::as_type(&self.network);
923

            
924
        let client = self.client.clone();
925
        let announce_block = {
926
            let sync_service = network.sync_service.clone();
927
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
928
        };
929
        let spawner = self.task_manager.spawn_handle();
930

            
931
        move || cumulus_client_collator::StartCollatorParams {
932
            runtime_api: client.clone(),
933
            block_status: client.clone(),
934
            announce_block: announce_block.clone(),
935
            overseer_handle: overseer_handle.clone(),
936
            spawner: spawner.clone(),
937
            para_id,
938
            key: collator_key.clone(),
939
            parachain_consensus: parachain_consensus.clone(),
940
        }
941
    }
942
}
943

            
944
/// Block authoring scheme to be used by the dev service.
945
#[derive(Debug, Copy, Clone)]
946
pub enum Sealing {
947
    /// Author a block immediately upon receiving a transaction into the transaction pool
948
    Instant,
949
    /// Author a block upon receiving an RPC command
950
    Manual,
951
    /// Author blocks at a regular interval specified in milliseconds
952
    Interval(u64),
953
}
954

            
955
impl FromStr for Sealing {
956
    type Err = String;
957

            
958
1410
    fn from_str(s: &str) -> Result<Self, Self::Err> {
959
1410
        Ok(match s {
960
1410
            "instant" => Self::Instant,
961
1248
            "manual" => Self::Manual,
962
            s => {
963
                let millis = s
964
                    .parse::<u64>()
965
                    .map_err(|_| "couldn't decode sealing param")?;
966
                Self::Interval(millis)
967
            }
968
        })
969
1410
    }
970
}
971

            
972
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
973
    pub sealing: Sealing,
974
    pub block_import: BI,
975
    pub soft_deadline: Option<Percent>,
976
    pub select_chain: SC,
977
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
978
    pub create_inherent_data_providers: CIDP,
979
}