1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_service::{
47
        Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend, TFullClient, TaskManager,
48
    },
49
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
50
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
51
    sc_utils::mpsc::TracingUnboundedSender,
52
    sp_api::ConstructRuntimeApi,
53
    sp_block_builder::BlockBuilder,
54
    sp_consensus::SelectChain,
55
    sp_core::traits::CodeExecutor,
56
    sp_inherents::CreateInherentDataProviders,
57
    sp_offchain::OffchainWorkerApi,
58
    sp_runtime::Percent,
59
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
60
    std::{str::FromStr, sync::Arc},
61
};
62

            
63
#[allow(deprecated)]
64
use sc_executor::NativeElseWasmExecutor;
65
use {sc_transaction_pool_api::TransactionPool, sp_api::StorageProof, sp_core::traits::SpawnNamed};
66

            
67
tp_traits::alias!(
68
    pub trait MinimalRuntimeApi<
69
        Block: (cumulus_primitives_core::BlockT),
70
        Client: (sp_api::CallApiAt<Block>),
71
    > :
72
        ConstructRuntimeApi<
73
            Block,
74
            Client,
75
            RuntimeApi:
76
                TaggedTransactionQueue<Block>
77
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
78
                + sp_api::Metadata<Block>
79
                + sp_session::SessionKeys<Block>,
80
        > + Send + Sync + 'static
81
);
82

            
83
tp_traits::alias!(
84
    pub trait MinimalCumulusRuntimeApi<
85
        Block: (cumulus_primitives_core::BlockT),
86
        Client: (sp_api::CallApiAt<Block>),
87
    > :
88
        MinimalRuntimeApi<Block, Client> +
89
        ConstructRuntimeApi<
90
            Block,
91
            Client,
92
            RuntimeApi:
93
                cumulus_primitives_core::CollectCollationInfo<Block>,
94
        >
95
);
96

            
97
/// Trait to configure the main types the builder rely on, bundled in a single
98
/// type to reduce verbosity and the amount of type parameters.
99
pub trait NodeBuilderConfig {
100
    type Block;
101
    type RuntimeApi;
102
    type ParachainExecutor;
103

            
104
    /// Create a new `NodeBuilder` using the types of this `Config`, along
105
    /// with the parachain `Configuration` and an optional `HwBench`.
106
414
    fn new_builder(
107
414
        parachain_config: &Configuration,
108
414
        hwbench: Option<sc_sysinfo::HwBench>,
109
414
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
110
414
    where
111
414
        Self: Sized,
112
414
        BlockOf<Self>: cumulus_primitives_core::BlockT,
113
414
        ExecutorOf<Self>:
114
414
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
115
414
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
116
414
        BlockHashOf<Self>: Unpin,
117
414
    {
118
414
        NodeBuilder::<Self>::new(parachain_config, hwbench)
119
414
    }
120
}
121

            
122
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
123
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
124
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
125
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
126
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
127
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
128
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
129
pub type ConstructedRuntimeApiOf<T> =
130
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
131
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
132
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
133

            
134
// `Cumulus` and `TxHandler` are types that will change during the life of
135
// a `NodeBuilder` because they are generated and consumed when calling
136
// certain functions, with absence of data represented with `()`. Some
137
// function are implemented only for a given concrete type, which ensure it
138
// can only be called if the required data is available (generated and not yet
139
// consumed).
140
//
141
// While this could be implemented with multiple impl blocks with concrete types,
142
// we use here `core_extensions::TypeIdentity` which allow to express type
143
// identity/equality as a trait bound on each function as it removes the
144
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
145
// are still required since Rust can't infer the types in the `new` function
146
// that doesn't take `self`.
147
pub struct NodeBuilder<
148
    T: NodeBuilderConfig,
149
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
150
    // but can only be called with an `import_queue` which can be different in
151
    // each node. For that reason it is a `()` when calling `new`, then the
152
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
153
    // then call `build_cumulus_network` with it to generate the cumulus systems.
154
    SNetwork = (),
155
    // The `TxHandler` is constructed in `build_X_network`
156
    // and is then consumed when calling `spawn_common_tasks`.
157
    STxHandler = (),
158
    // The import queue service is obtained from the import queue in
159
    // `build_cumulus_network` or `build_substrate_network`, which also
160
    // consumes the import queue. Neither of them are clonable, so we need to
161
    // to store the service here to be able to consume it later in
162
    // `start_full_node`.
163
    SImportQueueService = (),
164
> where
165
    BlockOf<T>: cumulus_primitives_core::BlockT,
166
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
167
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
168
{
169
    pub client: Arc<ClientOf<T>>,
170
    pub backend: Arc<BackendOf<T>>,
171
    pub task_manager: TaskManager,
172
    pub keystore_container: KeystoreContainer,
173
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
174
    pub telemetry: Option<Telemetry>,
175
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
176

            
177
    pub hwbench: Option<sc_sysinfo::HwBench>,
178
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
179

            
180
    pub network: SNetwork,
181
    pub tx_handler_controller: STxHandler,
182
    pub import_queue_service: SImportQueueService,
183
}
184

            
185
pub struct Network<Block: cumulus_primitives_core::BlockT> {
186
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
187
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
188
    pub sync_service: Arc<SyncingService<Block>>,
189
}
190

            
191
/// Allows to create a parachain-defined executor from a `WasmExecutor`
192
pub trait TanssiExecutorExt {
193
    type HostFun: HostFunctions;
194
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
195
}
196

            
197
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
198
    type HostFun = ParachainHostFunctions;
199

            
200
654
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
201
654
        wasm_executor
202
654
    }
203
}
204

            
205
#[allow(deprecated)]
206
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
207
where
208
    D: NativeExecutionDispatch,
209
{
210
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
211

            
212
196
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
213
196
        #[allow(deprecated)]
214
196
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
215
196
    }
216
}
217

            
218
// `new` function doesn't take self, and the Rust compiler cannot infer that
219
// only one type T implements `TypeIdentity`. With thus need a separate impl
220
// block with concrete types `()`.
221
impl<T: NodeBuilderConfig> NodeBuilder<T>
222
where
223
    BlockOf<T>: cumulus_primitives_core::BlockT,
224
    ExecutorOf<T>:
225
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
226
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
227
    BlockHashOf<T>: Unpin,
228
{
229
    /// Create a new `NodeBuilder` which prepare objects required to launch a
230
    /// node. However it only starts telemetry, and doesn't provide any
231
    /// network-dependent objects (as it requires an import queue, which usually
232
    /// is different for each node).
233
414
    fn new(
234
414
        parachain_config: &Configuration,
235
414
        hwbench: Option<sc_sysinfo::HwBench>,
236
414
    ) -> Result<Self, sc_service::Error> {
237
        // Refactor: old new_partial
238

            
239
414
        let telemetry = parachain_config
240
414
            .telemetry_endpoints
241
414
            .clone()
242
414
            .filter(|x| !x.is_empty())
243
414
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
244
                let worker = TelemetryWorker::new(16)?;
245
                let telemetry = worker.handle().new_telemetry(endpoints);
246
                Ok((worker, telemetry))
247
414
            })
248
414
            .transpose()?;
249

            
250
414
        let heap_pages =
251
414
            parachain_config
252
414
                .executor
253
414
                .default_heap_pages
254
414
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
255
                    extra_pages: h as u32,
256
414
                });
257
414

            
258
414
        // Default runtime_cache_size is 2
259
414
        // For now we can work with this, but it will likely need
260
414
        // to change once we start having runtime_cache_sizes, or
261
414
        // run nodes with the maximum for this value
262
414
        let mut wasm_builder = WasmExecutor::builder()
263
414
            .with_execution_method(parachain_config.executor.wasm_method)
264
414
            .with_onchain_heap_alloc_strategy(heap_pages)
265
414
            .with_offchain_heap_alloc_strategy(heap_pages)
266
414
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
267
414
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
268
414
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
269
388
        {
270
388
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
271
388
        }
272

            
273
414
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
274

            
275
414
        let (client, backend, keystore_container, task_manager) =
276
414
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
277
414
                parachain_config,
278
414
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
279
414
                executor,
280
414
                true,
281
414
            )?;
282
414
        let client = Arc::new(client);
283
414

            
284
414
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
285
414

            
286
414
        let telemetry = telemetry.map(|(worker, telemetry)| {
287
            task_manager
288
                .spawn_handle()
289
                .spawn("telemetry", None, worker.run());
290
            telemetry
291
414
        });
292
414

            
293
414
        let transaction_pool = sc_transaction_pool::Builder::new(
294
414
            task_manager.spawn_essential_handle(),
295
414
            client.clone(),
296
414
            parachain_config.role.is_authority().into(),
297
414
        )
298
414
        .with_options(parachain_config.transaction_pool.clone())
299
414
        .with_prometheus(parachain_config.prometheus_registry())
300
414
        .build();
301
414

            
302
414
        Ok(Self {
303
414
            client,
304
414
            backend,
305
414
            transaction_pool: transaction_pool.into(),
306
414
            telemetry,
307
414
            telemetry_worker_handle,
308
414
            task_manager,
309
414
            keystore_container,
310
414
            hwbench,
311
414
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
312
414
            network: TypeIdentity::from_type(()),
313
414
            tx_handler_controller: TypeIdentity::from_type(()),
314
414
            import_queue_service: TypeIdentity::from_type(()),
315
414
        })
316
414
    }
317
}
318

            
319
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
320
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
321
where
322
    BlockOf<T>: cumulus_primitives_core::BlockT,
323
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
324
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
325
{
326
    pub async fn build_relay_chain_interface(
327
        &mut self,
328
        parachain_config: &Configuration,
329
        polkadot_config: Configuration,
330
        collator_options: CollatorOptions,
331
    ) -> sc_service::error::Result<(
332
        Arc<(dyn RelayChainInterface + 'static)>,
333
        Option<CollatorPair>,
334
    )> {
335
        build_relay_chain_interface(
336
            polkadot_config,
337
            parachain_config,
338
            self.telemetry_worker_handle.clone(),
339
            &mut self.task_manager,
340
            collator_options.clone(),
341
            self.hwbench.clone(),
342
        )
343
        .await
344
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
345
    }
346

            
347
    /// Given an import queue, calls [`cumulus_client_service::build_network`] and
348
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
349
    ///
350
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
351
    /// data.
352
    pub async fn build_cumulus_network<RCInterface, Net>(
353
        self,
354
        parachain_config: &Configuration,
355
        para_id: ParaId,
356
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
357
        relay_chain_interface: RCInterface,
358
    ) -> sc_service::error::Result<
359
        NodeBuilder<
360
            T,
361
            Network<BlockOf<T>>,
362
            TransactionsHandlerController<BlockHashOf<T>>,
363
            ImportQueueServiceOf<T>,
364
        >,
365
    >
366
    where
367
        SNetwork: TypeIdentity<Type = ()>,
368
        STxHandler: TypeIdentity<Type = ()>,
369
        SImportQueueService: TypeIdentity<Type = ()>,
370
        RCInterface: RelayChainInterface + Clone + 'static,
371
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
372
    {
373
        let Self {
374
            client,
375
            backend,
376
            transaction_pool,
377
            telemetry,
378
            telemetry_worker_handle,
379
            task_manager,
380
            keystore_container,
381
            hwbench,
382
            prometheus_registry,
383
            network: _,
384
            tx_handler_controller: _,
385
            import_queue_service: _,
386
        } = self;
387

            
388
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
389
            &parachain_config.network,
390
            prometheus_registry.clone(),
391
        );
392

            
393
        let import_queue_service = import_queue.service();
394
        let spawn_handle = task_manager.spawn_handle();
395

            
396
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
397
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
398
                parachain_config,
399
                client: client.clone(),
400
                transaction_pool: transaction_pool.clone(),
401
                spawn_handle,
402
                import_queue,
403
                para_id,
404
                relay_chain_interface,
405
                net_config,
406
                sybil_resistance_level: CollatorSybilResistance::Resistant,
407
            })
408
            .await?;
409

            
410
        Ok(NodeBuilder {
411
            client,
412
            backend,
413
            transaction_pool,
414
            telemetry,
415
            telemetry_worker_handle,
416
            task_manager,
417
            keystore_container,
418
            hwbench,
419
            prometheus_registry,
420
            network: Network {
421
                network,
422
                system_rpc_tx,
423
                sync_service,
424
            },
425
            tx_handler_controller,
426
            import_queue_service,
427
        })
428
    }
429

            
430
    /// Given an import queue, calls `sc_service::build_network` and
431
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
432
    ///
433
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
434
    /// data.
435
408
    pub fn build_substrate_network<Net>(
436
408
        self,
437
408
        parachain_config: &Configuration,
438
408
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
439
408
    ) -> sc_service::error::Result<
440
408
        NodeBuilder<
441
408
            T,
442
408
            Network<BlockOf<T>>,
443
408
            TransactionsHandlerController<BlockHashOf<T>>,
444
408
            ImportQueueServiceOf<T>,
445
408
        >,
446
408
    >
447
408
    where
448
408
        SNetwork: TypeIdentity<Type = ()>,
449
408
        STxHandler: TypeIdentity<Type = ()>,
450
408
        SImportQueueService: TypeIdentity<Type = ()>,
451
408
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
452
408
    {
453
408
        let Self {
454
408
            client,
455
408
            backend,
456
408
            transaction_pool,
457
408
            telemetry,
458
408
            telemetry_worker_handle,
459
408
            task_manager,
460
408
            keystore_container,
461
408
            hwbench,
462
408
            prometheus_registry,
463
408
            network: _,
464
408
            tx_handler_controller: _,
465
408
            import_queue_service: _,
466
408
        } = self;
467
408

            
468
408
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
469
408
            &parachain_config.network,
470
408
            prometheus_registry.clone(),
471
408
        );
472
408

            
473
408
        let metrics = Net::register_notification_metrics(
474
408
            parachain_config
475
408
                .prometheus_config
476
408
                .as_ref()
477
408
                .map(|cfg| &cfg.registry),
478
408
        );
479
408

            
480
408
        let import_queue_service = import_queue.service();
481

            
482
408
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
483
408
            sc_service::build_network(sc_service::BuildNetworkParams {
484
408
                config: parachain_config,
485
408
                client: client.clone(),
486
408
                transaction_pool: transaction_pool.clone(),
487
408
                spawn_handle: task_manager.spawn_handle(),
488
408
                import_queue,
489
408
                warp_sync_config: None,
490
408
                block_announce_validator_builder: None,
491
408
                net_config,
492
408
                block_relay: None,
493
408
                metrics,
494
408
            })?;
495

            
496
408
        Ok(NodeBuilder {
497
408
            client,
498
408
            backend,
499
408
            transaction_pool,
500
408
            telemetry,
501
408
            telemetry_worker_handle,
502
408
            task_manager,
503
408
            keystore_container,
504
408
            hwbench,
505
408
            prometheus_registry,
506
408
            network: Network {
507
408
                network,
508
408
                system_rpc_tx,
509
408
                sync_service,
510
408
            },
511
408
            tx_handler_controller,
512
408
            import_queue_service,
513
408
        })
514
408
    }
515

            
516
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
517
    /// node. It consumes `self.tx_handler_controller` in the process, which means
518
    /// it can only be called once, and any other code that would need this
519
    /// controller should interact with it before calling this function.
520
408
    pub fn spawn_common_tasks<TRpc>(
521
408
        self,
522
408
        parachain_config: Configuration,
523
408
        rpc_builder: Box<
524
408
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
525
408
        >,
526
408
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
527
408
    where
528
408
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
529
408
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
530
408
        BlockHashOf<T>: Unpin,
531
408
        BlockHeaderOf<T>: Unpin,
532
408
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
533
408
            + BlockBuilder<BlockOf<T>>
534
408
            + OffchainWorkerApi<BlockOf<T>>
535
408
            + sp_api::Metadata<BlockOf<T>>
536
408
            + sp_session::SessionKeys<BlockOf<T>>,
537
408
    {
538
408
        let NodeBuilder {
539
408
            client,
540
408
            backend,
541
408
            transaction_pool,
542
408
            mut telemetry,
543
408
            telemetry_worker_handle,
544
408
            mut task_manager,
545
408
            keystore_container,
546
408
            hwbench,
547
408
            prometheus_registry,
548
408
            network,
549
408
            tx_handler_controller,
550
408
            import_queue_service,
551
408
        } = self;
552
408

            
553
408
        let network = TypeIdentity::into_type(network);
554
408
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
555
408

            
556
408
        let collator = parachain_config.role.is_authority();
557
408

            
558
408
        if parachain_config.offchain_worker.enabled {
559
408
            task_manager.spawn_handle().spawn(
560
408
                "offchain-workers-runner",
561
408
                "offchain-work",
562
408
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
563
408
                    runtime_api_provider: client.clone(),
564
408
                    keystore: Some(keystore_container.keystore()),
565
408
                    offchain_db: backend.offchain_storage(),
566
408
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
567
408
                        transaction_pool.clone(),
568
408
                    )),
569
408
                    network_provider: Arc::new(network.network.clone()),
570
408
                    is_validator: parachain_config.role.is_authority(),
571
408
                    enable_http_requests: false,
572
11384
                    custom_extensions: move |_| vec![],
573
408
                })?
574
408
                .run(client.clone(), task_manager.spawn_handle())
575
408
                .boxed(),
576
            );
577
        }
578

            
579
408
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
580
408
            rpc_builder,
581
408
            client: client.clone(),
582
408
            transaction_pool: transaction_pool.clone(),
583
408
            task_manager: &mut task_manager,
584
408
            config: parachain_config,
585
408
            keystore: keystore_container.keystore(),
586
408
            backend: backend.clone(),
587
408
            network: network.network.clone(),
588
408
            system_rpc_tx: network.system_rpc_tx.clone(),
589
408
            tx_handler_controller,
590
408
            telemetry: telemetry.as_mut(),
591
408
            sync_service: network.sync_service.clone(),
592
408
        })?;
593

            
594
408
        if let Some(hwbench) = &hwbench {
595
            sc_sysinfo::print_hwbench(hwbench);
596
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
597
            // in there and swapping out the requirements for your own are probably a good idea. The
598
            // requirements for a para-chain are dictated by its relay-chain.
599
            if collator {
600
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
601
                    log::warn!(
602
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
603
                        err
604
                    );
605
                }
606
            }
607

            
608
            if let Some(ref mut telemetry) = telemetry {
609
                let telemetry_handle = telemetry.handle();
610
                task_manager.spawn_handle().spawn(
611
                    "telemetry_hwbench",
612
                    None,
613
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
614
                );
615
            }
616
408
        }
617

            
618
408
        Ok(NodeBuilder {
619
408
            client,
620
408
            backend,
621
408
            transaction_pool,
622
408
            telemetry,
623
408
            telemetry_worker_handle,
624
408
            task_manager,
625
408
            keystore_container,
626
408
            hwbench,
627
408
            prometheus_registry,
628
408
            network: TypeIdentity::from_type(network),
629
408
            tx_handler_controller: TypeIdentity::from_type(()),
630
408
            import_queue_service,
631
408
        })
632
408
    }
633

            
634
408
    pub fn install_manual_seal<BI, SC, CIDP>(
635
408
        &mut self,
636
408
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
637
408
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
638
408
    where
639
408
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
640
408
        SC: SelectChain<BlockOf<T>> + 'static,
641
408
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
642
408
    {
643
408
        let ManualSealConfiguration {
644
408
            sealing,
645
408
            soft_deadline,
646
408
            block_import,
647
408
            select_chain,
648
408
            consensus_data_provider,
649
408
            create_inherent_data_providers,
650
408
        } = manual_seal_config;
651
408

            
652
408
        let prometheus_registry = self.prometheus_registry.clone();
653
408

            
654
408
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
655
408
            self.task_manager.spawn_handle(),
656
408
            self.client.clone(),
657
408
            self.transaction_pool.clone(),
658
408
            prometheus_registry.as_ref(),
659
408
            self.telemetry.as_ref().map(|x| x.handle()),
660
408
        );
661
408

            
662
408
        let mut command_sink = None;
663

            
664
408
        if let Some(deadline) = soft_deadline {
665
194
            env.set_soft_deadline(deadline);
666
408
        }
667

            
668
408
        let commands_stream: Box<
669
408
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
670
408
        > = match sealing {
671
            Sealing::Instant => {
672
20
                Box::new(
673
20
                    // This bit cribbed from the implementation of instant seal.
674
20
                    self.transaction_pool.import_notification_stream().map(|_| {
675
                        EngineCommand::SealNewBlock {
676
                            create_empty: false,
677
                            finalize: false,
678
                            parent_hash: None,
679
                            sender: None,
680
                        }
681
20
                    }),
682
20
                )
683
            }
684
            Sealing::Manual => {
685
388
                let (sink, stream) = futures::channel::mpsc::channel(1000);
686
388
                // Keep a reference to the other end of the channel. It goes to the RPC.
687
388
                command_sink = Some(sink);
688
388
                Box::new(stream)
689
            }
690
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
691
                Timer::interval(Duration::from_millis(millis)),
692
                |_| EngineCommand::SealNewBlock {
693
                    create_empty: true,
694
                    finalize: true,
695
                    parent_hash: None,
696
                    sender: None,
697
                },
698
            )),
699
        };
700

            
701
408
        self.task_manager.spawn_essential_handle().spawn_blocking(
702
408
            "authorship_task",
703
408
            Some("block-authoring"),
704
408
            run_manual_seal(ManualSealParams {
705
408
                block_import,
706
408
                env,
707
408
                client: self.client.clone(),
708
408
                pool: self.transaction_pool.clone(),
709
408
                commands_stream,
710
408
                select_chain,
711
408
                consensus_data_provider,
712
408
                create_inherent_data_providers,
713
408
            }),
714
408
        );
715
408

            
716
408
        Ok(command_sink)
717
408
    }
718

            
719
    pub fn start_full_node<RCInterface>(
720
        self,
721
        para_id: ParaId,
722
        relay_chain_interface: RCInterface,
723
        relay_chain_slot_duration: Duration,
724
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
725
    where
726
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
727
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
728
        RCInterface: RelayChainInterface + Clone + 'static,
729
    {
730
        let NodeBuilder {
731
            client,
732
            backend,
733
            transaction_pool,
734
            telemetry,
735
            telemetry_worker_handle,
736
            mut task_manager,
737
            keystore_container,
738
            hwbench,
739
            prometheus_registry,
740
            network,
741
            tx_handler_controller,
742
            import_queue_service,
743
        } = self;
744

            
745
        let network = TypeIdentity::into_type(network);
746
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
747

            
748
        let announce_block = {
749
            let sync_service = network.sync_service.clone();
750
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
751
        };
752

            
753
        let overseer_handle = relay_chain_interface
754
            .overseer_handle()
755
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
756

            
757
        let params = StartFullNodeParams {
758
            client: client.clone(),
759
            announce_block,
760
            task_manager: &mut task_manager,
761
            para_id,
762
            relay_chain_interface,
763
            relay_chain_slot_duration,
764
            import_queue: import_queue_service,
765
            recovery_handle: Box::new(overseer_handle),
766
            sync_service: network.sync_service.clone(),
767
        };
768

            
769
        // TODO: change for async backing
770
        #[allow(deprecated)]
771
        cumulus_client_service::start_full_node(params)?;
772

            
773
        Ok(NodeBuilder {
774
            client,
775
            backend,
776
            transaction_pool,
777
            telemetry,
778
            telemetry_worker_handle,
779
            task_manager,
780
            keystore_container,
781
            hwbench,
782
            prometheus_registry,
783
            network: TypeIdentity::from_type(network),
784
            tx_handler_controller,
785
            import_queue_service: (),
786
        })
787
    }
788

            
789
    pub async fn start_collator<RCInterface>(
790
        self,
791
        para_id: ParaId,
792
        relay_chain_interface: RCInterface,
793
        relay_chain_slot_duration: Duration,
794
        parachain_consensus: ParachainConsensusOf<T>,
795
        collator_key: CollatorPair,
796
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
797
    where
798
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
799
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
800
        RCInterface: RelayChainInterface + Clone + 'static,
801
    {
802
        let NodeBuilder {
803
            client,
804
            backend,
805
            transaction_pool,
806
            telemetry,
807
            telemetry_worker_handle,
808
            mut task_manager,
809
            keystore_container,
810
            hwbench,
811
            prometheus_registry,
812
            network,
813
            tx_handler_controller,
814
            import_queue_service,
815
        } = self;
816

            
817
        let network = TypeIdentity::into_type(network);
818
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
819

            
820
        let spawner = task_manager.spawn_handle();
821
        let announce_block = {
822
            let sync_service = network.sync_service.clone();
823
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
824
        };
825
        let overseer_handle = relay_chain_interface
826
            .overseer_handle()
827
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
828

            
829
        let params = cumulus_client_service::StartCollatorParams {
830
            para_id,
831
            block_status: client.clone(),
832
            announce_block: announce_block.clone(),
833
            client: client.clone(),
834
            task_manager: &mut task_manager,
835
            relay_chain_interface: relay_chain_interface.clone(),
836
            spawner: spawner.clone(),
837
            parachain_consensus,
838
            import_queue: import_queue_service,
839
            collator_key,
840
            relay_chain_slot_duration,
841
            recovery_handle: Box::new(overseer_handle.clone()),
842
            sync_service: network.sync_service.clone(),
843
        };
844

            
845
        // TODO: change for async backing
846
        #[allow(deprecated)]
847
        cumulus_client_service::start_collator(params).await?;
848

            
849
        Ok(NodeBuilder {
850
            client,
851
            backend,
852
            transaction_pool,
853
            telemetry,
854
            telemetry_worker_handle,
855
            task_manager,
856
            keystore_container,
857
            hwbench,
858
            prometheus_registry,
859
            network: TypeIdentity::from_type(network),
860
            tx_handler_controller,
861
            import_queue_service: (),
862
        })
863
    }
864

            
865
    pub fn extract_import_queue_service(
866
        self,
867
    ) -> (
868
        NodeBuilder<T, SNetwork, STxHandler, ()>,
869
        SImportQueueService,
870
    )
871
    where
872
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
873
    {
874
        let NodeBuilder {
875
            client,
876
            backend,
877
            transaction_pool,
878
            telemetry,
879
            telemetry_worker_handle,
880
            task_manager,
881
            keystore_container,
882
            hwbench,
883
            prometheus_registry,
884
            network,
885
            tx_handler_controller,
886
            import_queue_service,
887
        } = self;
888

            
889
        (
890
            NodeBuilder {
891
                client,
892
                backend,
893
                transaction_pool,
894
                telemetry,
895
                telemetry_worker_handle,
896
                task_manager,
897
                keystore_container,
898
                hwbench,
899
                prometheus_registry,
900
                network,
901
                tx_handler_controller,
902
                import_queue_service: (),
903
            },
904
            import_queue_service,
905
        )
906
    }
907

            
908
    pub fn cumulus_client_collator_params_generator(
909
        &self,
910
        para_id: ParaId,
911
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
912
        collator_key: CollatorPair,
913
        parachain_consensus: ParachainConsensusOf<T>,
914
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
915
        BlockOf<T>,
916
        ClientOf<T>,
917
        ClientOf<T>,
918
        SpawnTaskHandle,
919
    > + Send
920
           + Clone
921
           + 'static
922
    where
923
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
924
    {
925
        let network = TypeIdentity::as_type(&self.network);
926

            
927
        let client = self.client.clone();
928
        let announce_block = {
929
            let sync_service = network.sync_service.clone();
930
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
931
        };
932
        let spawner = self.task_manager.spawn_handle();
933

            
934
        move || cumulus_client_collator::StartCollatorParams {
935
            runtime_api: client.clone(),
936
            block_status: client.clone(),
937
            announce_block: announce_block.clone(),
938
            overseer_handle: overseer_handle.clone(),
939
            spawner: spawner.clone(),
940
            para_id,
941
            key: collator_key.clone(),
942
            parachain_consensus: parachain_consensus.clone(),
943
        }
944
    }
945
}
946

            
947
/// Block authoring scheme to be used by the dev service.
948
#[derive(Debug, Copy, Clone)]
949
pub enum Sealing {
950
    /// Author a block immediately upon receiving a transaction into the transaction pool
951
    Instant,
952
    /// Author a block upon receiving an RPC command
953
    Manual,
954
    /// Author blocks at a regular interval specified in milliseconds
955
    Interval(u64),
956
}
957

            
958
impl FromStr for Sealing {
959
    type Err = String;
960

            
961
1296
    fn from_str(s: &str) -> Result<Self, Self::Err> {
962
1296
        Ok(match s {
963
1296
            "instant" => Self::Instant,
964
1164
            "manual" => Self::Manual,
965
            s => {
966
                let millis = s
967
                    .parse::<u64>()
968
                    .map_err(|_| "couldn't decode sealing param")?;
969
                Self::Interval(millis)
970
            }
971
        })
972
1296
    }
973
}
974

            
975
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
976
    pub sealing: Sealing,
977
    pub block_import: BI,
978
    pub soft_deadline: Option<Percent>,
979
    pub select_chain: SC,
980
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
981
    pub create_inherent_data_providers: CIDP,
982
}