1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_service::{
47
        Configuration, KeystoreContainer, NetworkStarter, SpawnTaskHandle, TFullBackend,
48
        TFullClient, TaskManager,
49
    },
50
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
51
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
52
    sc_utils::mpsc::TracingUnboundedSender,
53
    sp_api::ConstructRuntimeApi,
54
    sp_block_builder::BlockBuilder,
55
    sp_consensus::SelectChain,
56
    sp_core::traits::CodeExecutor,
57
    sp_inherents::CreateInherentDataProviders,
58
    sp_offchain::OffchainWorkerApi,
59
    sp_runtime::Percent,
60
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
61
    std::{str::FromStr, sync::Arc},
62
};
63

            
64
#[allow(deprecated)]
65
use sc_executor::NativeElseWasmExecutor;
66
use {sc_transaction_pool_api::TransactionPool, sp_api::StorageProof, sp_core::traits::SpawnNamed};
67

            
68
tp_traits::alias!(
69
    pub trait MinimalRuntimeApi<
70
        Block: (cumulus_primitives_core::BlockT),
71
        Client: (sp_api::CallApiAt<Block>),
72
    > :
73
        ConstructRuntimeApi<
74
            Block,
75
            Client,
76
            RuntimeApi:
77
                TaggedTransactionQueue<Block>
78
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
79
                + sp_api::Metadata<Block>
80
                + sp_session::SessionKeys<Block>,
81
        > + Send + Sync + 'static
82
);
83

            
84
tp_traits::alias!(
85
    pub trait MinimalCumulusRuntimeApi<
86
        Block: (cumulus_primitives_core::BlockT),
87
        Client: (sp_api::CallApiAt<Block>),
88
    > :
89
        MinimalRuntimeApi<Block, Client> +
90
        ConstructRuntimeApi<
91
            Block,
92
            Client,
93
            RuntimeApi:
94
                cumulus_primitives_core::CollectCollationInfo<Block>,
95
        >
96
);
97

            
98
/// Trait to configure the main types the builder rely on, bundled in a single
99
/// type to reduce verbosity and the amount of type parameters.
100
pub trait NodeBuilderConfig {
101
    type Block;
102
    type RuntimeApi;
103
    type ParachainExecutor;
104

            
105
    /// Create a new `NodeBuilder` using the types of this `Config`, along
106
    /// with the parachain `Configuration` and an optional `HwBench`.
107
414
    fn new_builder(
108
414
        parachain_config: &Configuration,
109
414
        hwbench: Option<sc_sysinfo::HwBench>,
110
414
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
111
414
    where
112
414
        Self: Sized,
113
414
        BlockOf<Self>: cumulus_primitives_core::BlockT,
114
414
        ExecutorOf<Self>:
115
414
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
116
414
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
117
414
        BlockHashOf<Self>: Unpin,
118
414
    {
119
414
        NodeBuilder::<Self>::new(parachain_config, hwbench)
120
414
    }
121
}
122

            
123
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
124
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
125
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
126
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
127
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
128
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
129
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
130
pub type ConstructedRuntimeApiOf<T> =
131
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
132
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
133
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
134

            
135
// `Cumulus` and `TxHandler` are types that will change during the life of
136
// a `NodeBuilder` because they are generated and consumed when calling
137
// certain functions, with absence of data represented with `()`. Some
138
// function are implemented only for a given concrete type, which ensure it
139
// can only be called if the required data is available (generated and not yet
140
// consumed).
141
//
142
// While this could be implemented with multiple impl blocks with concrete types,
143
// we use here `core_extensions::TypeIdentity` which allow to express type
144
// identity/equality as a trait bound on each function as it removes the
145
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
146
// are still required since Rust can't infer the types in the `new` function
147
// that doesn't take `self`.
148
pub struct NodeBuilder<
149
    T: NodeBuilderConfig,
150
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
151
    // but can only be called with an `import_queue` which can be different in
152
    // each node. For that reason it is a `()` when calling `new`, then the
153
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
154
    // then call `build_cumulus_network` with it to generate the cumulus systems.
155
    SNetwork = (),
156
    // The `TxHandler` is constructed in `build_X_network`
157
    // and is then consumed when calling `spawn_common_tasks`.
158
    STxHandler = (),
159
    // The import queue service is obtained from the import queue in
160
    // `build_cumulus_network` or `build_substrate_network`, which also
161
    // consumes the import queue. Neither of them are clonable, so we need to
162
    // to store the service here to be able to consume it later in
163
    // `start_full_node`.
164
    SImportQueueService = (),
165
> where
166
    BlockOf<T>: cumulus_primitives_core::BlockT,
167
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
168
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
169
{
170
    pub client: Arc<ClientOf<T>>,
171
    pub backend: Arc<BackendOf<T>>,
172
    pub task_manager: TaskManager,
173
    pub keystore_container: KeystoreContainer,
174
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
175
    pub telemetry: Option<Telemetry>,
176
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
177

            
178
    pub hwbench: Option<sc_sysinfo::HwBench>,
179
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
180

            
181
    pub network: SNetwork,
182
    pub tx_handler_controller: STxHandler,
183
    pub import_queue_service: SImportQueueService,
184
}
185

            
186
pub struct Network<Block: cumulus_primitives_core::BlockT> {
187
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
188
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
189
    pub start_network: NetworkStarter,
190
    pub sync_service: Arc<SyncingService<Block>>,
191
}
192

            
193
/// Allows to create a parachain-defined executor from a `WasmExecutor`
194
pub trait TanssiExecutorExt {
195
    type HostFun: HostFunctions;
196
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
197
}
198

            
199
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
200
    type HostFun = ParachainHostFunctions;
201

            
202
654
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
203
654
        wasm_executor
204
654
    }
205
}
206

            
207
#[allow(deprecated)]
208
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
209
where
210
    D: NativeExecutionDispatch,
211
{
212
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
213

            
214
196
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
215
196
        #[allow(deprecated)]
216
196
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
217
196
    }
218
}
219

            
220
// `new` function doesn't take self, and the Rust compiler cannot infer that
221
// only one type T implements `TypeIdentity`. With thus need a separate impl
222
// block with concrete types `()`.
223
impl<T: NodeBuilderConfig> NodeBuilder<T>
224
where
225
    BlockOf<T>: cumulus_primitives_core::BlockT,
226
    ExecutorOf<T>:
227
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
228
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
229
    BlockHashOf<T>: Unpin,
230
{
231
    /// Create a new `NodeBuilder` which prepare objects required to launch a
232
    /// node. However it only starts telemetry, and doesn't provide any
233
    /// network-dependent objects (as it requires an import queue, which usually
234
    /// is different for each node).
235
414
    fn new(
236
414
        parachain_config: &Configuration,
237
414
        hwbench: Option<sc_sysinfo::HwBench>,
238
414
    ) -> Result<Self, sc_service::Error> {
239
        // Refactor: old new_partial
240

            
241
414
        let telemetry = parachain_config
242
414
            .telemetry_endpoints
243
414
            .clone()
244
414
            .filter(|x| !x.is_empty())
245
414
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
246
                let worker = TelemetryWorker::new(16)?;
247
                let telemetry = worker.handle().new_telemetry(endpoints);
248
                Ok((worker, telemetry))
249
414
            })
250
414
            .transpose()?;
251

            
252
414
        let heap_pages =
253
414
            parachain_config
254
414
                .executor
255
414
                .default_heap_pages
256
414
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
257
                    extra_pages: h as u32,
258
414
                });
259
414

            
260
414
        // Default runtime_cache_size is 2
261
414
        // For now we can work with this, but it will likely need
262
414
        // to change once we start having runtime_cache_sizes, or
263
414
        // run nodes with the maximum for this value
264
414
        let mut wasm_builder = WasmExecutor::builder()
265
414
            .with_execution_method(parachain_config.executor.wasm_method)
266
414
            .with_onchain_heap_alloc_strategy(heap_pages)
267
414
            .with_offchain_heap_alloc_strategy(heap_pages)
268
414
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
269
414
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
270
414
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
271
388
        {
272
388
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
273
388
        }
274

            
275
414
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
276

            
277
414
        let (client, backend, keystore_container, task_manager) =
278
414
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
279
414
                parachain_config,
280
414
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
281
414
                executor,
282
414
                true,
283
414
            )?;
284
414
        let client = Arc::new(client);
285
414

            
286
414
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
287
414

            
288
414
        let telemetry = telemetry.map(|(worker, telemetry)| {
289
            task_manager
290
                .spawn_handle()
291
                .spawn("telemetry", None, worker.run());
292
            telemetry
293
414
        });
294
414

            
295
414
        let transaction_pool = sc_transaction_pool::Builder::new(
296
414
            task_manager.spawn_essential_handle(),
297
414
            client.clone(),
298
414
            parachain_config.role.is_authority().into(),
299
414
        )
300
414
        .with_options(parachain_config.transaction_pool.clone())
301
414
        .with_prometheus(parachain_config.prometheus_registry())
302
414
        .build();
303
414

            
304
414
        Ok(Self {
305
414
            client,
306
414
            backend,
307
414
            transaction_pool: transaction_pool.into(),
308
414
            telemetry,
309
414
            telemetry_worker_handle,
310
414
            task_manager,
311
414
            keystore_container,
312
414
            hwbench,
313
414
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
314
414
            network: TypeIdentity::from_type(()),
315
414
            tx_handler_controller: TypeIdentity::from_type(()),
316
414
            import_queue_service: TypeIdentity::from_type(()),
317
414
        })
318
414
    }
319
}
320

            
321
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
322
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
323
where
324
    BlockOf<T>: cumulus_primitives_core::BlockT,
325
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
326
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
327
{
328
    pub async fn build_relay_chain_interface(
329
        &mut self,
330
        parachain_config: &Configuration,
331
        polkadot_config: Configuration,
332
        collator_options: CollatorOptions,
333
    ) -> sc_service::error::Result<(
334
        Arc<(dyn RelayChainInterface + 'static)>,
335
        Option<CollatorPair>,
336
    )> {
337
        build_relay_chain_interface(
338
            polkadot_config,
339
            parachain_config,
340
            self.telemetry_worker_handle.clone(),
341
            &mut self.task_manager,
342
            collator_options.clone(),
343
            self.hwbench.clone(),
344
        )
345
        .await
346
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
347
    }
348

            
349
    /// Given an import queue, calls [`cumulus_client_service::build_network`] and
350
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
351
    ///
352
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
353
    /// data.
354
    pub async fn build_cumulus_network<RCInterface, Net>(
355
        self,
356
        parachain_config: &Configuration,
357
        para_id: ParaId,
358
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
359
        relay_chain_interface: RCInterface,
360
    ) -> sc_service::error::Result<
361
        NodeBuilder<
362
            T,
363
            Network<BlockOf<T>>,
364
            TransactionsHandlerController<BlockHashOf<T>>,
365
            ImportQueueServiceOf<T>,
366
        >,
367
    >
368
    where
369
        SNetwork: TypeIdentity<Type = ()>,
370
        STxHandler: TypeIdentity<Type = ()>,
371
        SImportQueueService: TypeIdentity<Type = ()>,
372
        RCInterface: RelayChainInterface + Clone + 'static,
373
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
374
    {
375
        let Self {
376
            client,
377
            backend,
378
            transaction_pool,
379
            telemetry,
380
            telemetry_worker_handle,
381
            task_manager,
382
            keystore_container,
383
            hwbench,
384
            prometheus_registry,
385
            network: _,
386
            tx_handler_controller: _,
387
            import_queue_service: _,
388
        } = self;
389

            
390
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
391
            &parachain_config.network,
392
            prometheus_registry.clone(),
393
        );
394

            
395
        let import_queue_service = import_queue.service();
396
        let spawn_handle = task_manager.spawn_handle();
397

            
398
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
399
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
400
                parachain_config,
401
                client: client.clone(),
402
                transaction_pool: transaction_pool.clone(),
403
                spawn_handle,
404
                import_queue,
405
                para_id,
406
                relay_chain_interface,
407
                net_config,
408
                sybil_resistance_level: CollatorSybilResistance::Resistant,
409
            })
410
            .await?;
411

            
412
        Ok(NodeBuilder {
413
            client,
414
            backend,
415
            transaction_pool,
416
            telemetry,
417
            telemetry_worker_handle,
418
            task_manager,
419
            keystore_container,
420
            hwbench,
421
            prometheus_registry,
422
            network: Network {
423
                network,
424
                system_rpc_tx,
425
                start_network,
426
                sync_service,
427
            },
428
            tx_handler_controller,
429
            import_queue_service,
430
        })
431
    }
432

            
433
    /// Given an import queue, calls `sc_service::build_network` and
434
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
435
    ///
436
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
437
    /// data.
438
408
    pub fn build_substrate_network<Net>(
439
408
        self,
440
408
        parachain_config: &Configuration,
441
408
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
442
408
    ) -> sc_service::error::Result<
443
408
        NodeBuilder<
444
408
            T,
445
408
            Network<BlockOf<T>>,
446
408
            TransactionsHandlerController<BlockHashOf<T>>,
447
408
            ImportQueueServiceOf<T>,
448
408
        >,
449
408
    >
450
408
    where
451
408
        SNetwork: TypeIdentity<Type = ()>,
452
408
        STxHandler: TypeIdentity<Type = ()>,
453
408
        SImportQueueService: TypeIdentity<Type = ()>,
454
408
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
455
408
    {
456
408
        let Self {
457
408
            client,
458
408
            backend,
459
408
            transaction_pool,
460
408
            telemetry,
461
408
            telemetry_worker_handle,
462
408
            task_manager,
463
408
            keystore_container,
464
408
            hwbench,
465
408
            prometheus_registry,
466
408
            network: _,
467
408
            tx_handler_controller: _,
468
408
            import_queue_service: _,
469
408
        } = self;
470
408

            
471
408
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
472
408
            &parachain_config.network,
473
408
            prometheus_registry.clone(),
474
408
        );
475
408

            
476
408
        let metrics = Net::register_notification_metrics(
477
408
            parachain_config
478
408
                .prometheus_config
479
408
                .as_ref()
480
408
                .map(|cfg| &cfg.registry),
481
408
        );
482
408

            
483
408
        let import_queue_service = import_queue.service();
484

            
485
408
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
486
408
            sc_service::build_network(sc_service::BuildNetworkParams {
487
408
                config: parachain_config,
488
408
                client: client.clone(),
489
408
                transaction_pool: transaction_pool.clone(),
490
408
                spawn_handle: task_manager.spawn_handle(),
491
408
                import_queue,
492
408
                warp_sync_config: None,
493
408
                block_announce_validator_builder: None,
494
408
                net_config,
495
408
                block_relay: None,
496
408
                metrics,
497
408
            })?;
498

            
499
408
        Ok(NodeBuilder {
500
408
            client,
501
408
            backend,
502
408
            transaction_pool,
503
408
            telemetry,
504
408
            telemetry_worker_handle,
505
408
            task_manager,
506
408
            keystore_container,
507
408
            hwbench,
508
408
            prometheus_registry,
509
408
            network: Network {
510
408
                network,
511
408
                system_rpc_tx,
512
408
                start_network,
513
408
                sync_service,
514
408
            },
515
408
            tx_handler_controller,
516
408
            import_queue_service,
517
408
        })
518
408
    }
519

            
520
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
521
    /// node. It consumes `self.tx_handler_controller` in the process, which means
522
    /// it can only be called once, and any other code that would need this
523
    /// controller should interact with it before calling this function.
524
408
    pub fn spawn_common_tasks<TRpc>(
525
408
        self,
526
408
        parachain_config: Configuration,
527
408
        rpc_builder: Box<
528
408
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
529
408
        >,
530
408
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
531
408
    where
532
408
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
533
408
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
534
408
        BlockHashOf<T>: Unpin,
535
408
        BlockHeaderOf<T>: Unpin,
536
408
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
537
408
            + BlockBuilder<BlockOf<T>>
538
408
            + OffchainWorkerApi<BlockOf<T>>
539
408
            + sp_api::Metadata<BlockOf<T>>
540
408
            + sp_session::SessionKeys<BlockOf<T>>,
541
408
    {
542
408
        let NodeBuilder {
543
408
            client,
544
408
            backend,
545
408
            transaction_pool,
546
408
            mut telemetry,
547
408
            telemetry_worker_handle,
548
408
            mut task_manager,
549
408
            keystore_container,
550
408
            hwbench,
551
408
            prometheus_registry,
552
408
            network,
553
408
            tx_handler_controller,
554
408
            import_queue_service,
555
408
        } = self;
556
408

            
557
408
        let network = TypeIdentity::into_type(network);
558
408
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
559
408

            
560
408
        let collator = parachain_config.role.is_authority();
561
408

            
562
408
        if parachain_config.offchain_worker.enabled {
563
408
            task_manager.spawn_handle().spawn(
564
408
                "offchain-workers-runner",
565
408
                "offchain-work",
566
408
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
567
408
                    runtime_api_provider: client.clone(),
568
408
                    keystore: Some(keystore_container.keystore()),
569
408
                    offchain_db: backend.offchain_storage(),
570
408
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
571
408
                        transaction_pool.clone(),
572
408
                    )),
573
408
                    network_provider: Arc::new(network.network.clone()),
574
408
                    is_validator: parachain_config.role.is_authority(),
575
408
                    enable_http_requests: false,
576
11384
                    custom_extensions: move |_| vec![],
577
408
                })?
578
408
                .run(client.clone(), task_manager.spawn_handle())
579
408
                .boxed(),
580
            );
581
        }
582

            
583
408
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
584
408
            rpc_builder,
585
408
            client: client.clone(),
586
408
            transaction_pool: transaction_pool.clone(),
587
408
            task_manager: &mut task_manager,
588
408
            config: parachain_config,
589
408
            keystore: keystore_container.keystore(),
590
408
            backend: backend.clone(),
591
408
            network: network.network.clone(),
592
408
            system_rpc_tx: network.system_rpc_tx.clone(),
593
408
            tx_handler_controller,
594
408
            telemetry: telemetry.as_mut(),
595
408
            sync_service: network.sync_service.clone(),
596
408
        })?;
597

            
598
408
        if let Some(hwbench) = &hwbench {
599
            sc_sysinfo::print_hwbench(hwbench);
600
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
601
            // in there and swapping out the requirements for your own are probably a good idea. The
602
            // requirements for a para-chain are dictated by its relay-chain.
603
            if collator {
604
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
605
                    log::warn!(
606
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
607
                        err
608
                    );
609
                }
610
            }
611

            
612
            if let Some(ref mut telemetry) = telemetry {
613
                let telemetry_handle = telemetry.handle();
614
                task_manager.spawn_handle().spawn(
615
                    "telemetry_hwbench",
616
                    None,
617
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
618
                );
619
            }
620
408
        }
621

            
622
408
        Ok(NodeBuilder {
623
408
            client,
624
408
            backend,
625
408
            transaction_pool,
626
408
            telemetry,
627
408
            telemetry_worker_handle,
628
408
            task_manager,
629
408
            keystore_container,
630
408
            hwbench,
631
408
            prometheus_registry,
632
408
            network: TypeIdentity::from_type(network),
633
408
            tx_handler_controller: TypeIdentity::from_type(()),
634
408
            import_queue_service,
635
408
        })
636
408
    }
637

            
638
408
    pub fn install_manual_seal<BI, SC, CIDP>(
639
408
        &mut self,
640
408
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
641
408
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
642
408
    where
643
408
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
644
408
        SC: SelectChain<BlockOf<T>> + 'static,
645
408
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
646
408
    {
647
408
        let ManualSealConfiguration {
648
408
            sealing,
649
408
            soft_deadline,
650
408
            block_import,
651
408
            select_chain,
652
408
            consensus_data_provider,
653
408
            create_inherent_data_providers,
654
408
        } = manual_seal_config;
655
408

            
656
408
        let prometheus_registry = self.prometheus_registry.clone();
657
408

            
658
408
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
659
408
            self.task_manager.spawn_handle(),
660
408
            self.client.clone(),
661
408
            self.transaction_pool.clone(),
662
408
            prometheus_registry.as_ref(),
663
408
            self.telemetry.as_ref().map(|x| x.handle()),
664
408
        );
665
408

            
666
408
        let mut command_sink = None;
667

            
668
408
        if let Some(deadline) = soft_deadline {
669
194
            env.set_soft_deadline(deadline);
670
408
        }
671

            
672
408
        let commands_stream: Box<
673
408
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
674
408
        > = match sealing {
675
            Sealing::Instant => {
676
20
                Box::new(
677
20
                    // This bit cribbed from the implementation of instant seal.
678
20
                    self.transaction_pool.import_notification_stream().map(|_| {
679
                        EngineCommand::SealNewBlock {
680
                            create_empty: false,
681
                            finalize: false,
682
                            parent_hash: None,
683
                            sender: None,
684
                        }
685
20
                    }),
686
20
                )
687
            }
688
            Sealing::Manual => {
689
388
                let (sink, stream) = futures::channel::mpsc::channel(1000);
690
388
                // Keep a reference to the other end of the channel. It goes to the RPC.
691
388
                command_sink = Some(sink);
692
388
                Box::new(stream)
693
            }
694
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
695
                Timer::interval(Duration::from_millis(millis)),
696
                |_| EngineCommand::SealNewBlock {
697
                    create_empty: true,
698
                    finalize: true,
699
                    parent_hash: None,
700
                    sender: None,
701
                },
702
            )),
703
        };
704

            
705
408
        self.task_manager.spawn_essential_handle().spawn_blocking(
706
408
            "authorship_task",
707
408
            Some("block-authoring"),
708
408
            run_manual_seal(ManualSealParams {
709
408
                block_import,
710
408
                env,
711
408
                client: self.client.clone(),
712
408
                pool: self.transaction_pool.clone(),
713
408
                commands_stream,
714
408
                select_chain,
715
408
                consensus_data_provider,
716
408
                create_inherent_data_providers,
717
408
            }),
718
408
        );
719
408

            
720
408
        Ok(command_sink)
721
408
    }
722

            
723
    pub fn start_full_node<RCInterface>(
724
        self,
725
        para_id: ParaId,
726
        relay_chain_interface: RCInterface,
727
        relay_chain_slot_duration: Duration,
728
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
729
    where
730
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
731
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
732
        RCInterface: RelayChainInterface + Clone + 'static,
733
    {
734
        let NodeBuilder {
735
            client,
736
            backend,
737
            transaction_pool,
738
            telemetry,
739
            telemetry_worker_handle,
740
            mut task_manager,
741
            keystore_container,
742
            hwbench,
743
            prometheus_registry,
744
            network,
745
            tx_handler_controller,
746
            import_queue_service,
747
        } = self;
748

            
749
        let network = TypeIdentity::into_type(network);
750
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
751

            
752
        let announce_block = {
753
            let sync_service = network.sync_service.clone();
754
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
755
        };
756

            
757
        let overseer_handle = relay_chain_interface
758
            .overseer_handle()
759
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
760

            
761
        let params = StartFullNodeParams {
762
            client: client.clone(),
763
            announce_block,
764
            task_manager: &mut task_manager,
765
            para_id,
766
            relay_chain_interface,
767
            relay_chain_slot_duration,
768
            import_queue: import_queue_service,
769
            recovery_handle: Box::new(overseer_handle),
770
            sync_service: network.sync_service.clone(),
771
        };
772

            
773
        // TODO: change for async backing
774
        #[allow(deprecated)]
775
        cumulus_client_service::start_full_node(params)?;
776

            
777
        Ok(NodeBuilder {
778
            client,
779
            backend,
780
            transaction_pool,
781
            telemetry,
782
            telemetry_worker_handle,
783
            task_manager,
784
            keystore_container,
785
            hwbench,
786
            prometheus_registry,
787
            network: TypeIdentity::from_type(network),
788
            tx_handler_controller,
789
            import_queue_service: (),
790
        })
791
    }
792

            
793
    pub async fn start_collator<RCInterface>(
794
        self,
795
        para_id: ParaId,
796
        relay_chain_interface: RCInterface,
797
        relay_chain_slot_duration: Duration,
798
        parachain_consensus: ParachainConsensusOf<T>,
799
        collator_key: CollatorPair,
800
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
801
    where
802
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
803
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
804
        RCInterface: RelayChainInterface + Clone + 'static,
805
    {
806
        let NodeBuilder {
807
            client,
808
            backend,
809
            transaction_pool,
810
            telemetry,
811
            telemetry_worker_handle,
812
            mut task_manager,
813
            keystore_container,
814
            hwbench,
815
            prometheus_registry,
816
            network,
817
            tx_handler_controller,
818
            import_queue_service,
819
        } = self;
820

            
821
        let network = TypeIdentity::into_type(network);
822
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
823

            
824
        let spawner = task_manager.spawn_handle();
825
        let announce_block = {
826
            let sync_service = network.sync_service.clone();
827
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
828
        };
829
        let overseer_handle = relay_chain_interface
830
            .overseer_handle()
831
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
832

            
833
        let params = cumulus_client_service::StartCollatorParams {
834
            para_id,
835
            block_status: client.clone(),
836
            announce_block: announce_block.clone(),
837
            client: client.clone(),
838
            task_manager: &mut task_manager,
839
            relay_chain_interface: relay_chain_interface.clone(),
840
            spawner: spawner.clone(),
841
            parachain_consensus,
842
            import_queue: import_queue_service,
843
            collator_key,
844
            relay_chain_slot_duration,
845
            recovery_handle: Box::new(overseer_handle.clone()),
846
            sync_service: network.sync_service.clone(),
847
        };
848

            
849
        // TODO: change for async backing
850
        #[allow(deprecated)]
851
        cumulus_client_service::start_collator(params).await?;
852

            
853
        Ok(NodeBuilder {
854
            client,
855
            backend,
856
            transaction_pool,
857
            telemetry,
858
            telemetry_worker_handle,
859
            task_manager,
860
            keystore_container,
861
            hwbench,
862
            prometheus_registry,
863
            network: TypeIdentity::from_type(network),
864
            tx_handler_controller,
865
            import_queue_service: (),
866
        })
867
    }
868

            
869
    pub fn extract_import_queue_service(
870
        self,
871
    ) -> (
872
        NodeBuilder<T, SNetwork, STxHandler, ()>,
873
        SImportQueueService,
874
    )
875
    where
876
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
877
    {
878
        let NodeBuilder {
879
            client,
880
            backend,
881
            transaction_pool,
882
            telemetry,
883
            telemetry_worker_handle,
884
            task_manager,
885
            keystore_container,
886
            hwbench,
887
            prometheus_registry,
888
            network,
889
            tx_handler_controller,
890
            import_queue_service,
891
        } = self;
892

            
893
        (
894
            NodeBuilder {
895
                client,
896
                backend,
897
                transaction_pool,
898
                telemetry,
899
                telemetry_worker_handle,
900
                task_manager,
901
                keystore_container,
902
                hwbench,
903
                prometheus_registry,
904
                network,
905
                tx_handler_controller,
906
                import_queue_service: (),
907
            },
908
            import_queue_service,
909
        )
910
    }
911

            
912
    pub fn cumulus_client_collator_params_generator(
913
        &self,
914
        para_id: ParaId,
915
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
916
        collator_key: CollatorPair,
917
        parachain_consensus: ParachainConsensusOf<T>,
918
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
919
        BlockOf<T>,
920
        ClientOf<T>,
921
        ClientOf<T>,
922
        SpawnTaskHandle,
923
    > + Send
924
           + Clone
925
           + 'static
926
    where
927
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
928
    {
929
        let network = TypeIdentity::as_type(&self.network);
930

            
931
        let client = self.client.clone();
932
        let announce_block = {
933
            let sync_service = network.sync_service.clone();
934
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
935
        };
936
        let spawner = self.task_manager.spawn_handle();
937

            
938
        move || cumulus_client_collator::StartCollatorParams {
939
            runtime_api: client.clone(),
940
            block_status: client.clone(),
941
            announce_block: announce_block.clone(),
942
            overseer_handle: overseer_handle.clone(),
943
            spawner: spawner.clone(),
944
            para_id,
945
            key: collator_key.clone(),
946
            parachain_consensus: parachain_consensus.clone(),
947
        }
948
    }
949
}
950

            
951
/// Block authoring scheme to be used by the dev service.
952
#[derive(Debug, Copy, Clone)]
953
pub enum Sealing {
954
    /// Author a block immediately upon receiving a transaction into the transaction pool
955
    Instant,
956
    /// Author a block upon receiving an RPC command
957
    Manual,
958
    /// Author blocks at a regular interval specified in milliseconds
959
    Interval(u64),
960
}
961

            
962
impl FromStr for Sealing {
963
    type Err = String;
964

            
965
1314
    fn from_str(s: &str) -> Result<Self, Self::Err> {
966
1314
        Ok(match s {
967
1314
            "instant" => Self::Instant,
968
1164
            "manual" => Self::Manual,
969
            s => {
970
                let millis = s
971
                    .parse::<u64>()
972
                    .map_err(|_| "couldn't decode sealing param")?;
973
                Self::Interval(millis)
974
            }
975
        })
976
1314
    }
977
}
978

            
979
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
980
    pub sealing: Sealing,
981
    pub block_import: BI,
982
    pub soft_deadline: Option<Percent>,
983
    pub select_chain: SC,
984
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
985
    pub create_inherent_data_providers: CIDP,
986
}