1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_service::{
47
        Configuration, KeystoreContainer, NetworkStarter, SpawnTaskHandle, TFullBackend,
48
        TFullClient, TaskManager,
49
    },
50
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
51
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
52
    sc_utils::mpsc::TracingUnboundedSender,
53
    sp_api::ConstructRuntimeApi,
54
    sp_block_builder::BlockBuilder,
55
    sp_consensus::SelectChain,
56
    sp_core::traits::CodeExecutor,
57
    sp_inherents::CreateInherentDataProviders,
58
    sp_offchain::OffchainWorkerApi,
59
    sp_runtime::Percent,
60
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
61
    std::{str::FromStr, sync::Arc},
62
};
63

            
64
#[allow(deprecated)]
65
use sc_executor::NativeElseWasmExecutor;
66
use sc_transaction_pool_api::TransactionPool;
67
use sp_api::StorageProof;
68
use sp_core::traits::SpawnNamed;
69

            
70
tp_traits::alias!(
71
    pub trait MinimalRuntimeApi<
72
        Block: (cumulus_primitives_core::BlockT),
73
        Client: (sp_api::CallApiAt<Block>),
74
    > :
75
        ConstructRuntimeApi<
76
            Block,
77
            Client,
78
            RuntimeApi:
79
                TaggedTransactionQueue<Block>
80
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
81
                + sp_api::Metadata<Block>
82
                + sp_session::SessionKeys<Block>,
83
        > + Send + Sync + 'static
84
);
85

            
86
tp_traits::alias!(
87
    pub trait MinimalCumulusRuntimeApi<
88
        Block: (cumulus_primitives_core::BlockT),
89
        Client: (sp_api::CallApiAt<Block>),
90
    > :
91
        MinimalRuntimeApi<Block, Client> +
92
        ConstructRuntimeApi<
93
            Block,
94
            Client,
95
            RuntimeApi:
96
                cumulus_primitives_core::CollectCollationInfo<Block>,
97
        >
98
);
99

            
100
/// Trait to configure the main types the builder rely on, bundled in a single
101
/// type to reduce verbosity and the amount of type parameters.
102
pub trait NodeBuilderConfig {
103
    type Block;
104
    type RuntimeApi;
105
    type ParachainExecutor;
106

            
107
    /// Create a new `NodeBuilder` using the types of this `Config`, along
108
    /// with the parachain `Configuration` and an optional `HwBench`.
109
390
    fn new_builder(
110
390
        parachain_config: &Configuration,
111
390
        hwbench: Option<sc_sysinfo::HwBench>,
112
390
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
113
390
    where
114
390
        Self: Sized,
115
390
        BlockOf<Self>: cumulus_primitives_core::BlockT,
116
390
        ExecutorOf<Self>:
117
390
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
118
390
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
119
390
        BlockHashOf<Self>: Unpin,
120
390
    {
121
390
        NodeBuilder::<Self>::new(parachain_config, hwbench)
122
390
    }
123
}
124

            
125
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
126
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
127
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
128
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
129
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
130
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
131
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
132
pub type ConstructedRuntimeApiOf<T> =
133
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
134
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
135
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
136

            
137
// `Cumulus` and `TxHandler` are types that will change during the life of
138
// a `NodeBuilder` because they are generated and consumed when calling
139
// certain functions, with absence of data represented with `()`. Some
140
// function are implemented only for a given concrete type, which ensure it
141
// can only be called if the required data is available (generated and not yet
142
// consumed).
143
//
144
// While this could be implemented with multiple impl blocks with concrete types,
145
// we use here `core_extensions::TypeIdentity` which allow to express type
146
// identity/equality as a trait bound on each function as it removes the
147
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
148
// are still required since Rust can't infer the types in the `new` function
149
// that doesn't take `self`.
150
pub struct NodeBuilder<
151
    T: NodeBuilderConfig,
152
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
153
    // but can only be called with an `import_queue` which can be different in
154
    // each node. For that reason it is a `()` when calling `new`, then the
155
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
156
    // then call `build_cumulus_network` with it to generate the cumulus systems.
157
    SNetwork = (),
158
    // The `TxHandler` is constructed in `build_X_network`
159
    // and is then consumed when calling `spawn_common_tasks`.
160
    STxHandler = (),
161
    // The import queue service is obtained from the import queue in
162
    // `build_cumulus_network` or `build_substrate_network`, which also
163
    // consumes the import queue. Neither of them are clonable, so we need to
164
    // to store the service here to be able to consume it later in
165
    // `start_full_node`.
166
    SImportQueueService = (),
167
> where
168
    BlockOf<T>: cumulus_primitives_core::BlockT,
169
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
170
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
171
{
172
    pub client: Arc<ClientOf<T>>,
173
    pub backend: Arc<BackendOf<T>>,
174
    pub task_manager: TaskManager,
175
    pub keystore_container: KeystoreContainer,
176
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
177
    pub telemetry: Option<Telemetry>,
178
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
179

            
180
    pub hwbench: Option<sc_sysinfo::HwBench>,
181
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
182

            
183
    pub network: SNetwork,
184
    pub tx_handler_controller: STxHandler,
185
    pub import_queue_service: SImportQueueService,
186
}
187

            
188
pub struct Network<Block: cumulus_primitives_core::BlockT> {
189
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
190
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
191
    pub start_network: NetworkStarter,
192
    pub sync_service: Arc<SyncingService<Block>>,
193
}
194

            
195
/// Allows to create a parachain-defined executor from a `WasmExecutor`
196
pub trait TanssiExecutorExt {
197
    type HostFun: HostFunctions;
198
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
199
}
200

            
201
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
202
    type HostFun = ParachainHostFunctions;
203

            
204
618
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
205
618
        wasm_executor
206
618
    }
207
}
208

            
209
#[allow(deprecated)]
210
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
211
where
212
    D: NativeExecutionDispatch,
213
{
214
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
215

            
216
184
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
217
184
        #[allow(deprecated)]
218
184
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
219
184
    }
220
}
221

            
222
// `new` function doesn't take self, and the Rust compiler cannot infer that
223
// only one type T implements `TypeIdentity`. With thus need a separate impl
224
// block with concrete types `()`.
225
impl<T: NodeBuilderConfig> NodeBuilder<T>
226
where
227
    BlockOf<T>: cumulus_primitives_core::BlockT,
228
    ExecutorOf<T>:
229
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
230
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
231
    BlockHashOf<T>: Unpin,
232
{
233
    /// Create a new `NodeBuilder` which prepare objects required to launch a
234
    /// node. However it only starts telemetry, and doesn't provide any
235
    /// network-dependent objects (as it requires an import queue, which usually
236
    /// is different for each node).
237
390
    fn new(
238
390
        parachain_config: &Configuration,
239
390
        hwbench: Option<sc_sysinfo::HwBench>,
240
390
    ) -> Result<Self, sc_service::Error> {
241
        // Refactor: old new_partial
242

            
243
390
        let telemetry = parachain_config
244
390
            .telemetry_endpoints
245
390
            .clone()
246
390
            .filter(|x| !x.is_empty())
247
390
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
248
                let worker = TelemetryWorker::new(16)?;
249
                let telemetry = worker.handle().new_telemetry(endpoints);
250
                Ok((worker, telemetry))
251
390
            })
252
390
            .transpose()?;
253

            
254
390
        let heap_pages =
255
390
            parachain_config
256
390
                .executor
257
390
                .default_heap_pages
258
390
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
259
                    extra_pages: h as u32,
260
390
                });
261
390

            
262
390
        // Default runtime_cache_size is 2
263
390
        // For now we can work with this, but it will likely need
264
390
        // to change once we start having runtime_cache_sizes, or
265
390
        // run nodes with the maximum for this value
266
390
        let mut wasm_builder = WasmExecutor::builder()
267
390
            .with_execution_method(parachain_config.executor.wasm_method)
268
390
            .with_onchain_heap_alloc_strategy(heap_pages)
269
390
            .with_offchain_heap_alloc_strategy(heap_pages)
270
390
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
271
390
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
272
390
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
273
368
        {
274
368
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
275
368
        }
276

            
277
390
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
278

            
279
390
        let (client, backend, keystore_container, task_manager) =
280
390
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
281
390
                parachain_config,
282
390
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
283
390
                executor,
284
390
                true,
285
390
            )?;
286
390
        let client = Arc::new(client);
287
390

            
288
390
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
289
390

            
290
390
        let telemetry = telemetry.map(|(worker, telemetry)| {
291
            task_manager
292
                .spawn_handle()
293
                .spawn("telemetry", None, worker.run());
294
            telemetry
295
390
        });
296
390

            
297
390
        let transaction_pool = sc_transaction_pool::Builder::new(
298
390
            task_manager.spawn_essential_handle(),
299
390
            client.clone(),
300
390
            parachain_config.role.is_authority().into(),
301
390
        )
302
390
        .with_options(parachain_config.transaction_pool.clone())
303
390
        .with_prometheus(parachain_config.prometheus_registry())
304
390
        .build();
305
390

            
306
390
        Ok(Self {
307
390
            client,
308
390
            backend,
309
390
            transaction_pool: transaction_pool.into(),
310
390
            telemetry,
311
390
            telemetry_worker_handle,
312
390
            task_manager,
313
390
            keystore_container,
314
390
            hwbench,
315
390
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
316
390
            network: TypeIdentity::from_type(()),
317
390
            tx_handler_controller: TypeIdentity::from_type(()),
318
390
            import_queue_service: TypeIdentity::from_type(()),
319
390
        })
320
390
    }
321
}
322

            
323
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
324
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
325
where
326
    BlockOf<T>: cumulus_primitives_core::BlockT,
327
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
328
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
329
{
330
    pub async fn build_relay_chain_interface(
331
        &mut self,
332
        parachain_config: &Configuration,
333
        polkadot_config: Configuration,
334
        collator_options: CollatorOptions,
335
    ) -> sc_service::error::Result<(
336
        Arc<(dyn RelayChainInterface + 'static)>,
337
        Option<CollatorPair>,
338
    )> {
339
        build_relay_chain_interface(
340
            polkadot_config,
341
            parachain_config,
342
            self.telemetry_worker_handle.clone(),
343
            &mut self.task_manager,
344
            collator_options.clone(),
345
            self.hwbench.clone(),
346
        )
347
        .await
348
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
349
    }
350

            
351
    /// Given an import queue, calls `cumulus_client_service::build_network` and
352
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
353
    ///
354
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
355
    /// data.
356
    pub async fn build_cumulus_network<RCInterface, Net>(
357
        self,
358
        parachain_config: &Configuration,
359
        para_id: ParaId,
360
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
361
        relay_chain_interface: RCInterface,
362
    ) -> sc_service::error::Result<
363
        NodeBuilder<
364
            T,
365
            Network<BlockOf<T>>,
366
            TransactionsHandlerController<BlockHashOf<T>>,
367
            ImportQueueServiceOf<T>,
368
        >,
369
    >
370
    where
371
        SNetwork: TypeIdentity<Type = ()>,
372
        STxHandler: TypeIdentity<Type = ()>,
373
        SImportQueueService: TypeIdentity<Type = ()>,
374
        RCInterface: RelayChainInterface + Clone + 'static,
375
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
376
    {
377
        let Self {
378
            client,
379
            backend,
380
            transaction_pool,
381
            telemetry,
382
            telemetry_worker_handle,
383
            task_manager,
384
            keystore_container,
385
            hwbench,
386
            prometheus_registry,
387
            network: _,
388
            tx_handler_controller: _,
389
            import_queue_service: _,
390
        } = self;
391

            
392
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
393
            &parachain_config.network,
394
            prometheus_registry.clone(),
395
        );
396

            
397
        let import_queue_service = import_queue.service();
398
        let spawn_handle = task_manager.spawn_handle();
399

            
400
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
401
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
402
                parachain_config,
403
                client: client.clone(),
404
                transaction_pool: transaction_pool.clone(),
405
                spawn_handle,
406
                import_queue,
407
                para_id,
408
                relay_chain_interface,
409
                net_config,
410
                sybil_resistance_level: CollatorSybilResistance::Resistant,
411
            })
412
            .await?;
413

            
414
        Ok(NodeBuilder {
415
            client,
416
            backend,
417
            transaction_pool,
418
            telemetry,
419
            telemetry_worker_handle,
420
            task_manager,
421
            keystore_container,
422
            hwbench,
423
            prometheus_registry,
424
            network: Network {
425
                network,
426
                system_rpc_tx,
427
                start_network,
428
                sync_service,
429
            },
430
            tx_handler_controller,
431
            import_queue_service,
432
        })
433
    }
434

            
435
    /// Given an import queue, calls `sc_service::build_network` and
436
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
437
    ///
438
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
439
    /// data.
440
384
    pub fn build_substrate_network<Net>(
441
384
        self,
442
384
        parachain_config: &Configuration,
443
384
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
444
384
    ) -> sc_service::error::Result<
445
384
        NodeBuilder<
446
384
            T,
447
384
            Network<BlockOf<T>>,
448
384
            TransactionsHandlerController<BlockHashOf<T>>,
449
384
            ImportQueueServiceOf<T>,
450
384
        >,
451
384
    >
452
384
    where
453
384
        SNetwork: TypeIdentity<Type = ()>,
454
384
        STxHandler: TypeIdentity<Type = ()>,
455
384
        SImportQueueService: TypeIdentity<Type = ()>,
456
384
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
457
384
    {
458
384
        let Self {
459
384
            client,
460
384
            backend,
461
384
            transaction_pool,
462
384
            telemetry,
463
384
            telemetry_worker_handle,
464
384
            task_manager,
465
384
            keystore_container,
466
384
            hwbench,
467
384
            prometheus_registry,
468
384
            network: _,
469
384
            tx_handler_controller: _,
470
384
            import_queue_service: _,
471
384
        } = self;
472
384

            
473
384
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
474
384
            &parachain_config.network,
475
384
            prometheus_registry.clone(),
476
384
        );
477
384

            
478
384
        let metrics = Net::register_notification_metrics(
479
384
            parachain_config
480
384
                .prometheus_config
481
384
                .as_ref()
482
384
                .map(|cfg| &cfg.registry),
483
384
        );
484
384

            
485
384
        let import_queue_service = import_queue.service();
486

            
487
384
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
488
384
            sc_service::build_network(sc_service::BuildNetworkParams {
489
384
                config: parachain_config,
490
384
                client: client.clone(),
491
384
                transaction_pool: transaction_pool.clone(),
492
384
                spawn_handle: task_manager.spawn_handle(),
493
384
                import_queue,
494
384
                warp_sync_config: None,
495
384
                block_announce_validator_builder: None,
496
384
                net_config,
497
384
                block_relay: None,
498
384
                metrics,
499
384
            })?;
500

            
501
384
        Ok(NodeBuilder {
502
384
            client,
503
384
            backend,
504
384
            transaction_pool,
505
384
            telemetry,
506
384
            telemetry_worker_handle,
507
384
            task_manager,
508
384
            keystore_container,
509
384
            hwbench,
510
384
            prometheus_registry,
511
384
            network: Network {
512
384
                network,
513
384
                system_rpc_tx,
514
384
                start_network,
515
384
                sync_service,
516
384
            },
517
384
            tx_handler_controller,
518
384
            import_queue_service,
519
384
        })
520
384
    }
521

            
522
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
523
    /// node. It consumes `self.tx_handler_controller` in the process, which means
524
    /// it can only be called once, and any other code that would need this
525
    /// controller should interact with it before calling this function.
526
384
    pub fn spawn_common_tasks<TRpc>(
527
384
        self,
528
384
        parachain_config: Configuration,
529
384
        rpc_builder: Box<
530
384
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
531
384
        >,
532
384
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
533
384
    where
534
384
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
535
384
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
536
384
        BlockHashOf<T>: Unpin,
537
384
        BlockHeaderOf<T>: Unpin,
538
384
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
539
384
            + BlockBuilder<BlockOf<T>>
540
384
            + OffchainWorkerApi<BlockOf<T>>
541
384
            + sp_api::Metadata<BlockOf<T>>
542
384
            + sp_session::SessionKeys<BlockOf<T>>,
543
384
    {
544
384
        let NodeBuilder {
545
384
            client,
546
384
            backend,
547
384
            transaction_pool,
548
384
            mut telemetry,
549
384
            telemetry_worker_handle,
550
384
            mut task_manager,
551
384
            keystore_container,
552
384
            hwbench,
553
384
            prometheus_registry,
554
384
            network,
555
384
            tx_handler_controller,
556
384
            import_queue_service,
557
384
        } = self;
558
384

            
559
384
        let network = TypeIdentity::into_type(network);
560
384
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
561
384

            
562
384
        let collator = parachain_config.role.is_authority();
563
384

            
564
384
        if parachain_config.offchain_worker.enabled {
565
384
            task_manager.spawn_handle().spawn(
566
384
                "offchain-workers-runner",
567
384
                "offchain-work",
568
384
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
569
384
                    runtime_api_provider: client.clone(),
570
384
                    keystore: Some(keystore_container.keystore()),
571
384
                    offchain_db: backend.offchain_storage(),
572
384
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
573
384
                        transaction_pool.clone(),
574
384
                    )),
575
384
                    network_provider: Arc::new(network.network.clone()),
576
384
                    is_validator: parachain_config.role.is_authority(),
577
384
                    enable_http_requests: false,
578
8742
                    custom_extensions: move |_| vec![],
579
384
                })
580
384
                .run(client.clone(), task_manager.spawn_handle())
581
384
                .boxed(),
582
384
            );
583
384
        }
584

            
585
384
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
586
384
            rpc_builder,
587
384
            client: client.clone(),
588
384
            transaction_pool: transaction_pool.clone(),
589
384
            task_manager: &mut task_manager,
590
384
            config: parachain_config,
591
384
            keystore: keystore_container.keystore(),
592
384
            backend: backend.clone(),
593
384
            network: network.network.clone(),
594
384
            system_rpc_tx: network.system_rpc_tx.clone(),
595
384
            tx_handler_controller,
596
384
            telemetry: telemetry.as_mut(),
597
384
            sync_service: network.sync_service.clone(),
598
384
        })?;
599

            
600
384
        if let Some(hwbench) = &hwbench {
601
            sc_sysinfo::print_hwbench(hwbench);
602
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
603
            // in there and swapping out the requirements for your own are probably a good idea. The
604
            // requirements for a para-chain are dictated by its relay-chain.
605
            if collator {
606
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
607
                    log::warn!(
608
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
609
                        err
610
                    );
611
                }
612
            }
613

            
614
            if let Some(ref mut telemetry) = telemetry {
615
                let telemetry_handle = telemetry.handle();
616
                task_manager.spawn_handle().spawn(
617
                    "telemetry_hwbench",
618
                    None,
619
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
620
                );
621
            }
622
384
        }
623

            
624
384
        Ok(NodeBuilder {
625
384
            client,
626
384
            backend,
627
384
            transaction_pool,
628
384
            telemetry,
629
384
            telemetry_worker_handle,
630
384
            task_manager,
631
384
            keystore_container,
632
384
            hwbench,
633
384
            prometheus_registry,
634
384
            network: TypeIdentity::from_type(network),
635
384
            tx_handler_controller: TypeIdentity::from_type(()),
636
384
            import_queue_service,
637
384
        })
638
384
    }
639

            
640
384
    pub fn install_manual_seal<BI, SC, CIDP>(
641
384
        &mut self,
642
384
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
643
384
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
644
384
    where
645
384
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
646
384
        SC: SelectChain<BlockOf<T>> + 'static,
647
384
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
648
384
    {
649
384
        let ManualSealConfiguration {
650
384
            sealing,
651
384
            soft_deadline,
652
384
            block_import,
653
384
            select_chain,
654
384
            consensus_data_provider,
655
384
            create_inherent_data_providers,
656
384
        } = manual_seal_config;
657
384

            
658
384
        let prometheus_registry = self.prometheus_registry.clone();
659
384

            
660
384
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
661
384
            self.task_manager.spawn_handle(),
662
384
            self.client.clone(),
663
384
            self.transaction_pool.clone(),
664
384
            prometheus_registry.as_ref(),
665
384
            self.telemetry.as_ref().map(|x| x.handle()),
666
384
        );
667
384

            
668
384
        let mut command_sink = None;
669

            
670
384
        if let Some(deadline) = soft_deadline {
671
182
            env.set_soft_deadline(deadline);
672
384
        }
673

            
674
384
        let commands_stream: Box<
675
384
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
676
384
        > = match sealing {
677
            Sealing::Instant => {
678
16
                Box::new(
679
16
                    // This bit cribbed from the implementation of instant seal.
680
16
                    self.transaction_pool.import_notification_stream().map(|_| {
681
                        EngineCommand::SealNewBlock {
682
                            create_empty: false,
683
                            finalize: false,
684
                            parent_hash: None,
685
                            sender: None,
686
                        }
687
16
                    }),
688
16
                )
689
            }
690
            Sealing::Manual => {
691
368
                let (sink, stream) = futures::channel::mpsc::channel(1000);
692
368
                // Keep a reference to the other end of the channel. It goes to the RPC.
693
368
                command_sink = Some(sink);
694
368
                Box::new(stream)
695
            }
696
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
697
                Timer::interval(Duration::from_millis(millis)),
698
                |_| EngineCommand::SealNewBlock {
699
                    create_empty: true,
700
                    finalize: true,
701
                    parent_hash: None,
702
                    sender: None,
703
                },
704
            )),
705
        };
706

            
707
384
        self.task_manager.spawn_essential_handle().spawn_blocking(
708
384
            "authorship_task",
709
384
            Some("block-authoring"),
710
384
            run_manual_seal(ManualSealParams {
711
384
                block_import,
712
384
                env,
713
384
                client: self.client.clone(),
714
384
                pool: self.transaction_pool.clone(),
715
384
                commands_stream,
716
384
                select_chain,
717
384
                consensus_data_provider,
718
384
                create_inherent_data_providers,
719
384
            }),
720
384
        );
721
384

            
722
384
        Ok(command_sink)
723
384
    }
724

            
725
    pub fn start_full_node<RCInterface>(
726
        self,
727
        para_id: ParaId,
728
        relay_chain_interface: RCInterface,
729
        relay_chain_slot_duration: Duration,
730
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
731
    where
732
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
733
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
734
        RCInterface: RelayChainInterface + Clone + 'static,
735
    {
736
        let NodeBuilder {
737
            client,
738
            backend,
739
            transaction_pool,
740
            telemetry,
741
            telemetry_worker_handle,
742
            mut task_manager,
743
            keystore_container,
744
            hwbench,
745
            prometheus_registry,
746
            network,
747
            tx_handler_controller,
748
            import_queue_service,
749
        } = self;
750

            
751
        let network = TypeIdentity::into_type(network);
752
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
753

            
754
        let announce_block = {
755
            let sync_service = network.sync_service.clone();
756
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
757
        };
758

            
759
        let overseer_handle = relay_chain_interface
760
            .overseer_handle()
761
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
762

            
763
        let params = StartFullNodeParams {
764
            client: client.clone(),
765
            announce_block,
766
            task_manager: &mut task_manager,
767
            para_id,
768
            relay_chain_interface,
769
            relay_chain_slot_duration,
770
            import_queue: import_queue_service,
771
            recovery_handle: Box::new(overseer_handle),
772
            sync_service: network.sync_service.clone(),
773
        };
774

            
775
        // TODO: change for async backing
776
        #[allow(deprecated)]
777
        cumulus_client_service::start_full_node(params)?;
778

            
779
        Ok(NodeBuilder {
780
            client,
781
            backend,
782
            transaction_pool,
783
            telemetry,
784
            telemetry_worker_handle,
785
            task_manager,
786
            keystore_container,
787
            hwbench,
788
            prometheus_registry,
789
            network: TypeIdentity::from_type(network),
790
            tx_handler_controller,
791
            import_queue_service: (),
792
        })
793
    }
794

            
795
    pub async fn start_collator<RCInterface>(
796
        self,
797
        para_id: ParaId,
798
        relay_chain_interface: RCInterface,
799
        relay_chain_slot_duration: Duration,
800
        parachain_consensus: ParachainConsensusOf<T>,
801
        collator_key: CollatorPair,
802
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
803
    where
804
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
805
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
806
        RCInterface: RelayChainInterface + Clone + 'static,
807
    {
808
        let NodeBuilder {
809
            client,
810
            backend,
811
            transaction_pool,
812
            telemetry,
813
            telemetry_worker_handle,
814
            mut task_manager,
815
            keystore_container,
816
            hwbench,
817
            prometheus_registry,
818
            network,
819
            tx_handler_controller,
820
            import_queue_service,
821
        } = self;
822

            
823
        let network = TypeIdentity::into_type(network);
824
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
825

            
826
        let spawner = task_manager.spawn_handle();
827
        let announce_block = {
828
            let sync_service = network.sync_service.clone();
829
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
830
        };
831
        let overseer_handle = relay_chain_interface
832
            .overseer_handle()
833
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
834

            
835
        let params = cumulus_client_service::StartCollatorParams {
836
            para_id,
837
            block_status: client.clone(),
838
            announce_block: announce_block.clone(),
839
            client: client.clone(),
840
            task_manager: &mut task_manager,
841
            relay_chain_interface: relay_chain_interface.clone(),
842
            spawner: spawner.clone(),
843
            parachain_consensus,
844
            import_queue: import_queue_service,
845
            collator_key,
846
            relay_chain_slot_duration,
847
            recovery_handle: Box::new(overseer_handle.clone()),
848
            sync_service: network.sync_service.clone(),
849
        };
850

            
851
        // TODO: change for async backing
852
        #[allow(deprecated)]
853
        cumulus_client_service::start_collator(params).await?;
854

            
855
        Ok(NodeBuilder {
856
            client,
857
            backend,
858
            transaction_pool,
859
            telemetry,
860
            telemetry_worker_handle,
861
            task_manager,
862
            keystore_container,
863
            hwbench,
864
            prometheus_registry,
865
            network: TypeIdentity::from_type(network),
866
            tx_handler_controller,
867
            import_queue_service: (),
868
        })
869
    }
870

            
871
    pub fn extract_import_queue_service(
872
        self,
873
    ) -> (
874
        NodeBuilder<T, SNetwork, STxHandler, ()>,
875
        SImportQueueService,
876
    )
877
    where
878
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
879
    {
880
        let NodeBuilder {
881
            client,
882
            backend,
883
            transaction_pool,
884
            telemetry,
885
            telemetry_worker_handle,
886
            task_manager,
887
            keystore_container,
888
            hwbench,
889
            prometheus_registry,
890
            network,
891
            tx_handler_controller,
892
            import_queue_service,
893
        } = self;
894

            
895
        (
896
            NodeBuilder {
897
                client,
898
                backend,
899
                transaction_pool,
900
                telemetry,
901
                telemetry_worker_handle,
902
                task_manager,
903
                keystore_container,
904
                hwbench,
905
                prometheus_registry,
906
                network,
907
                tx_handler_controller,
908
                import_queue_service: (),
909
            },
910
            import_queue_service,
911
        )
912
    }
913

            
914
    pub fn cumulus_client_collator_params_generator(
915
        &self,
916
        para_id: ParaId,
917
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
918
        collator_key: CollatorPair,
919
        parachain_consensus: ParachainConsensusOf<T>,
920
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
921
        BlockOf<T>,
922
        ClientOf<T>,
923
        ClientOf<T>,
924
        SpawnTaskHandle,
925
    > + Send
926
           + Clone
927
           + 'static
928
    where
929
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
930
    {
931
        let network = TypeIdentity::as_type(&self.network);
932

            
933
        let client = self.client.clone();
934
        let announce_block = {
935
            let sync_service = network.sync_service.clone();
936
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
937
        };
938
        let spawner = self.task_manager.spawn_handle();
939

            
940
        move || cumulus_client_collator::StartCollatorParams {
941
            runtime_api: client.clone(),
942
            block_status: client.clone(),
943
            announce_block: announce_block.clone(),
944
            overseer_handle: overseer_handle.clone(),
945
            spawner: spawner.clone(),
946
            para_id,
947
            key: collator_key.clone(),
948
            parachain_consensus: parachain_consensus.clone(),
949
        }
950
    }
951
}
952

            
953
/// Block authoring scheme to be used by the dev service.
954
#[derive(Debug, Copy, Clone)]
955
pub enum Sealing {
956
    /// Author a block immediately upon receiving a transaction into the transaction pool
957
    Instant,
958
    /// Author a block upon receiving an RPC command
959
    Manual,
960
    /// Author blocks at a regular interval specified in milliseconds
961
    Interval(u64),
962
}
963

            
964
impl FromStr for Sealing {
965
    type Err = String;
966

            
967
1212
    fn from_str(s: &str) -> Result<Self, Self::Err> {
968
1212
        Ok(match s {
969
1212
            "instant" => Self::Instant,
970
1104
            "manual" => Self::Manual,
971
            s => {
972
                let millis = s
973
                    .parse::<u64>()
974
                    .map_err(|_| "couldn't decode sealing param")?;
975
                Self::Interval(millis)
976
            }
977
        })
978
1212
    }
979
}
980

            
981
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
982
    pub sealing: Sealing,
983
    pub block_import: BI,
984
    pub soft_deadline: Option<Percent>,
985
    pub select_chain: SC,
986
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
987
    pub create_inherent_data_providers: CIDP,
988
}