1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor},
47
    sc_service::{
48
        Configuration, KeystoreContainer, NetworkStarter, SpawnTaskHandle, TFullBackend,
49
        TFullClient, TaskManager,
50
    },
51
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
52
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
53
    sc_utils::mpsc::TracingUnboundedSender,
54
    sp_api::ConstructRuntimeApi,
55
    sp_block_builder::BlockBuilder,
56
    sp_consensus::SelectChain,
57
    sp_core::traits::CodeExecutor,
58
    sp_inherents::CreateInherentDataProviders,
59
    sp_offchain::OffchainWorkerApi,
60
    sp_runtime::Percent,
61
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
62
    std::{str::FromStr, sync::Arc},
63
};
64

            
65
#[allow(deprecated)]
66
use sc_executor::NativeElseWasmExecutor;
67

            
68
/// Trait to configure the main types the builder rely on, bundled in a single
69
/// type to reduce verbosity and the amount of type parameters.
70
pub trait NodeBuilderConfig {
71
    type Block;
72
    type RuntimeApi;
73
    type ParachainExecutor;
74

            
75
    /// Create a new `NodeBuilder` using the types of this `Config`, along
76
    /// with the parachain `Configuration` and an optional `HwBench`.
77
378
    fn new_builder(
78
378
        parachain_config: &Configuration,
79
378
        hwbench: Option<sc_sysinfo::HwBench>,
80
378
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
81
378
    where
82
378
        Self: Sized,
83
378
        BlockOf<Self>: cumulus_primitives_core::BlockT,
84
378
        ExecutorOf<Self>:
85
378
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
86
378
        RuntimeApiOf<Self>:
87
378
            ConstructRuntimeApi<BlockOf<Self>, ClientOf<Self>> + Sync + Send + 'static,
88
378
        ConstructedRuntimeApiOf<Self>:
89
378
            TaggedTransactionQueue<BlockOf<Self>> + BlockBuilder<BlockOf<Self>>,
90
378
    {
91
378
        NodeBuilder::<Self>::new(parachain_config, hwbench)
92
378
    }
93
}
94

            
95
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
96
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
97
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
98
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
99
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
100
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
101
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
102
pub type ConstructedRuntimeApiOf<T> =
103
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
104
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
105
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
106

            
107
// `Cumulus` and `TxHandler` are types that will change during the life of
108
// a `NodeBuilder` because they are generated and consumed when calling
109
// certain functions, with absence of data represented with `()`. Some
110
// function are implemented only for a given concrete type, which ensure it
111
// can only be called if the required data is available (generated and not yet
112
// consumed).
113
//
114
// While this could be implemented with multiple impl blocks with concrete types,
115
// we use here `core_extensions::TypeIdentity` which allow to express type
116
// identity/equality as a trait bound on each function as it removes the
117
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
118
// are still required since Rust can't infer the types in the `new` function
119
// that doesn't take `self`.
120
pub struct NodeBuilder<
121
    T: NodeBuilderConfig,
122
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
123
    // but can only be called with an `import_queue` which can be different in
124
    // each node. For that reason it is a `()` when calling `new`, then the
125
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
126
    // then call `build_cumulus_network` with it to generate the cumulus systems.
127
    SNetwork = (),
128
    // The `TxHandler` is constructed in `build_X_network`
129
    // and is then consumed when calling `spawn_common_tasks`.
130
    STxHandler = (),
131
    // The import queue service is obtained from the import queue in
132
    // `build_cumulus_network` or `build_substrate_network`, which also
133
    // consumes the import queue. Neither of them are clonable, so we need to
134
    // to store the service here to be able to consume it later in
135
    // `start_full_node`.
136
    SImportQueueService = (),
137
> where
138
    BlockOf<T>: cumulus_primitives_core::BlockT,
139
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
140
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
141
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>> + BlockBuilder<BlockOf<T>>,
142
{
143
    pub client: Arc<ClientOf<T>>,
144
    pub backend: Arc<BackendOf<T>>,
145
    pub task_manager: TaskManager,
146
    pub keystore_container: KeystoreContainer,
147
    pub transaction_pool: Arc<sc_transaction_pool::FullPool<BlockOf<T>, ClientOf<T>>>,
148
    pub telemetry: Option<Telemetry>,
149
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
150

            
151
    pub hwbench: Option<sc_sysinfo::HwBench>,
152
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
153

            
154
    pub network: SNetwork,
155
    pub tx_handler_controller: STxHandler,
156
    pub import_queue_service: SImportQueueService,
157
}
158

            
159
pub struct Network<Block: cumulus_primitives_core::BlockT> {
160
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
161
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
162
    pub start_network: NetworkStarter,
163
    pub sync_service: Arc<SyncingService<Block>>,
164
}
165

            
166
/// Allows to create a parachain-defined executor from a `WasmExecutor`
167
pub trait TanssiExecutorExt {
168
    type HostFun: HostFunctions;
169
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
170
}
171

            
172
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
173
    type HostFun = ParachainHostFunctions;
174

            
175
594
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
176
594
        wasm_executor
177
594
    }
178
}
179

            
180
#[allow(deprecated)]
181
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
182
where
183
    D: NativeExecutionDispatch,
184
{
185
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
186

            
187
180
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
188
180
        #[allow(deprecated)]
189
180
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
190
180
    }
191
}
192

            
193
// `new` function doesn't take self, and the Rust compiler cannot infer that
194
// only one type T implements `TypeIdentity`. With thus need a separate impl
195
// block with concrete types `()`.
196
impl<T: NodeBuilderConfig> NodeBuilder<T>
197
where
198
    BlockOf<T>: cumulus_primitives_core::BlockT,
199
    ExecutorOf<T>:
200
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
201
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
202
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>> + BlockBuilder<BlockOf<T>>,
203
{
204
    /// Create a new `NodeBuilder` which prepare objects required to launch a
205
    /// node. However it only starts telemetry, and doesn't provide any
206
    /// network-dependent objects (as it requires an import queue, which usually
207
    /// is different for each node).
208
378
    fn new(
209
378
        parachain_config: &Configuration,
210
378
        hwbench: Option<sc_sysinfo::HwBench>,
211
378
    ) -> Result<Self, sc_service::Error> {
212
        // Refactor: old new_partial
213

            
214
378
        let telemetry = parachain_config
215
378
            .telemetry_endpoints
216
378
            .clone()
217
378
            .filter(|x| !x.is_empty())
218
378
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
219
                let worker = TelemetryWorker::new(16)?;
220
                let telemetry = worker.handle().new_telemetry(endpoints);
221
                Ok((worker, telemetry))
222
378
            })
223
378
            .transpose()?;
224

            
225
378
        let heap_pages =
226
378
            parachain_config
227
378
                .default_heap_pages
228
378
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
229
                    extra_pages: h as u32,
230
378
                });
231
378

            
232
378
        // Default runtime_cache_size is 2
233
378
        // For now we can work with this, but it will likely need
234
378
        // to change once we start having runtime_cache_sizes, or
235
378
        // run nodes with the maximum for this value
236
378
        let mut wasm_builder = WasmExecutor::builder()
237
378
            .with_execution_method(parachain_config.wasm_method)
238
378
            .with_onchain_heap_alloc_strategy(heap_pages)
239
378
            .with_offchain_heap_alloc_strategy(heap_pages)
240
378
            .with_max_runtime_instances(parachain_config.max_runtime_instances)
241
378
            .with_runtime_cache_size(parachain_config.runtime_cache_size);
242
378
        if let Some(ref wasmtime_precompiled_path) = parachain_config.wasmtime_precompiled {
243
356
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
244
356
        }
245

            
246
378
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
247

            
248
378
        let (client, backend, keystore_container, task_manager) =
249
378
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
250
378
                parachain_config,
251
378
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
252
378
                executor,
253
378
                true,
254
378
            )?;
255
378
        let client = Arc::new(client);
256
378

            
257
378
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
258
378

            
259
378
        let telemetry = telemetry.map(|(worker, telemetry)| {
260
            task_manager
261
                .spawn_handle()
262
                .spawn("telemetry", None, worker.run());
263
            telemetry
264
378
        });
265
378

            
266
378
        let transaction_pool = sc_transaction_pool::BasicPool::new_full(
267
378
            parachain_config.transaction_pool.clone(),
268
378
            parachain_config.role.is_authority().into(),
269
378
            parachain_config.prometheus_registry(),
270
378
            task_manager.spawn_essential_handle(),
271
378
            client.clone(),
272
378
        );
273
378

            
274
378
        Ok(Self {
275
378
            client,
276
378
            backend,
277
378
            transaction_pool,
278
378
            telemetry,
279
378
            telemetry_worker_handle,
280
378
            task_manager,
281
378
            keystore_container,
282
378
            hwbench,
283
378
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
284
378
            network: TypeIdentity::from_type(()),
285
378
            tx_handler_controller: TypeIdentity::from_type(()),
286
378
            import_queue_service: TypeIdentity::from_type(()),
287
378
        })
288
378
    }
289
}
290

            
291
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
292
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
293
where
294
    BlockOf<T>: cumulus_primitives_core::BlockT,
295
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
296
    RuntimeApiOf<T>: ConstructRuntimeApi<BlockOf<T>, ClientOf<T>> + Sync + Send + 'static,
297
    ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
298
        + BlockBuilder<BlockOf<T>>
299
        + cumulus_primitives_core::CollectCollationInfo<BlockOf<T>>,
300
{
301
    pub async fn build_relay_chain_interface(
302
        &mut self,
303
        parachain_config: &Configuration,
304
        polkadot_config: Configuration,
305
        collator_options: CollatorOptions,
306
    ) -> sc_service::error::Result<(
307
        Arc<(dyn RelayChainInterface + 'static)>,
308
        Option<CollatorPair>,
309
    )> {
310
        build_relay_chain_interface(
311
            polkadot_config,
312
            parachain_config,
313
            self.telemetry_worker_handle.clone(),
314
            &mut self.task_manager,
315
            collator_options.clone(),
316
            self.hwbench.clone(),
317
        )
318
        .await
319
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
320
    }
321

            
322
    /// Given an import queue, calls `cumulus_client_service::build_network` and
323
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
324
    ///
325
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
326
    /// data.
327
    pub async fn build_cumulus_network<RCInterface, Net>(
328
        self,
329
        parachain_config: &Configuration,
330
        para_id: ParaId,
331
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
332
        relay_chain_interface: RCInterface,
333
    ) -> sc_service::error::Result<
334
        NodeBuilder<
335
            T,
336
            Network<BlockOf<T>>,
337
            TransactionsHandlerController<BlockHashOf<T>>,
338
            ImportQueueServiceOf<T>,
339
        >,
340
    >
341
    where
342
        SNetwork: TypeIdentity<Type = ()>,
343
        STxHandler: TypeIdentity<Type = ()>,
344
        SImportQueueService: TypeIdentity<Type = ()>,
345
        RCInterface: RelayChainInterface + Clone + 'static,
346
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
347
    {
348
        let Self {
349
            client,
350
            backend,
351
            transaction_pool,
352
            telemetry,
353
            telemetry_worker_handle,
354
            task_manager,
355
            keystore_container,
356
            hwbench,
357
            prometheus_registry,
358
            network: _,
359
            tx_handler_controller: _,
360
            import_queue_service: _,
361
        } = self;
362

            
363
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
364

            
365
        let import_queue_service = import_queue.service();
366
        let spawn_handle = task_manager.spawn_handle();
367

            
368
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
369
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
370
                parachain_config,
371
                client: client.clone(),
372
                transaction_pool: transaction_pool.clone(),
373
                spawn_handle,
374
                import_queue,
375
                para_id,
376
                relay_chain_interface,
377
                net_config,
378
                sybil_resistance_level: CollatorSybilResistance::Resistant,
379
            })
380
            .await?;
381

            
382
        Ok(NodeBuilder {
383
            client,
384
            backend,
385
            transaction_pool,
386
            telemetry,
387
            telemetry_worker_handle,
388
            task_manager,
389
            keystore_container,
390
            hwbench,
391
            prometheus_registry,
392
            network: Network {
393
                network,
394
                system_rpc_tx,
395
                start_network,
396
                sync_service,
397
            },
398
            tx_handler_controller,
399
            import_queue_service,
400
        })
401
    }
402

            
403
    /// Given an import queue, calls `sc_service::build_network` and
404
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
405
    ///
406
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
407
    /// data.
408
372
    pub fn build_substrate_network<Net>(
409
372
        self,
410
372
        parachain_config: &Configuration,
411
372
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
412
372
    ) -> sc_service::error::Result<
413
372
        NodeBuilder<
414
372
            T,
415
372
            Network<BlockOf<T>>,
416
372
            TransactionsHandlerController<BlockHashOf<T>>,
417
372
            ImportQueueServiceOf<T>,
418
372
        >,
419
372
    >
420
372
    where
421
372
        SNetwork: TypeIdentity<Type = ()>,
422
372
        STxHandler: TypeIdentity<Type = ()>,
423
372
        SImportQueueService: TypeIdentity<Type = ()>,
424
372
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
425
372
    {
426
372
        let Self {
427
372
            client,
428
372
            backend,
429
372
            transaction_pool,
430
372
            telemetry,
431
372
            telemetry_worker_handle,
432
372
            task_manager,
433
372
            keystore_container,
434
372
            hwbench,
435
372
            prometheus_registry,
436
372
            network: _,
437
372
            tx_handler_controller: _,
438
372
            import_queue_service: _,
439
372
        } = self;
440
372

            
441
372
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
442
372

            
443
372
        let metrics = Net::register_notification_metrics(
444
372
            parachain_config
445
372
                .prometheus_config
446
372
                .as_ref()
447
372
                .map(|cfg| &cfg.registry),
448
372
        );
449
372

            
450
372
        let import_queue_service = import_queue.service();
451

            
452
372
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
453
372
            sc_service::build_network(sc_service::BuildNetworkParams {
454
372
                config: parachain_config,
455
372
                client: client.clone(),
456
372
                transaction_pool: transaction_pool.clone(),
457
372
                spawn_handle: task_manager.spawn_handle(),
458
372
                import_queue,
459
372
                warp_sync_params: None,
460
372
                block_announce_validator_builder: None,
461
372
                net_config,
462
372
                block_relay: None,
463
372
                metrics,
464
372
            })?;
465

            
466
372
        Ok(NodeBuilder {
467
372
            client,
468
372
            backend,
469
372
            transaction_pool,
470
372
            telemetry,
471
372
            telemetry_worker_handle,
472
372
            task_manager,
473
372
            keystore_container,
474
372
            hwbench,
475
372
            prometheus_registry,
476
372
            network: Network {
477
372
                network,
478
372
                system_rpc_tx,
479
372
                start_network,
480
372
                sync_service,
481
372
            },
482
372
            tx_handler_controller,
483
372
            import_queue_service,
484
372
        })
485
372
    }
486

            
487
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
488
    /// node. It consumes `self.tx_handler_controller` in the process, which means
489
    /// it can only be called once, and any other code that would need this
490
    /// controller should interact with it before calling this function.
491
372
    pub fn spawn_common_tasks<TRpc>(
492
372
        self,
493
372
        parachain_config: Configuration,
494
372
        rpc_builder: Box<
495
372
            dyn Fn(
496
372
                DenyUnsafe,
497
372
                SubscriptionTaskExecutor,
498
372
            ) -> Result<RpcModule<TRpc>, sc_service::Error>,
499
372
        >,
500
372
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
501
372
    where
502
372
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
503
372
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
504
372
        BlockHashOf<T>: Unpin,
505
372
        BlockHeaderOf<T>: Unpin,
506
372
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
507
372
            + BlockBuilder<BlockOf<T>>
508
372
            + OffchainWorkerApi<BlockOf<T>>
509
372
            + sp_api::Metadata<BlockOf<T>>
510
372
            + sp_session::SessionKeys<BlockOf<T>>,
511
372
    {
512
372
        let NodeBuilder {
513
372
            client,
514
372
            backend,
515
372
            transaction_pool,
516
372
            mut telemetry,
517
372
            telemetry_worker_handle,
518
372
            mut task_manager,
519
372
            keystore_container,
520
372
            hwbench,
521
372
            prometheus_registry,
522
372
            network,
523
372
            tx_handler_controller,
524
372
            import_queue_service,
525
372
        } = self;
526
372

            
527
372
        let network = TypeIdentity::into_type(network);
528
372
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
529
372

            
530
372
        let collator = parachain_config.role.is_authority();
531
372

            
532
372
        if parachain_config.offchain_worker.enabled {
533
372
            task_manager.spawn_handle().spawn(
534
372
                "offchain-workers-runner",
535
372
                "offchain-work",
536
372
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
537
372
                    runtime_api_provider: client.clone(),
538
372
                    keystore: Some(keystore_container.keystore()),
539
372
                    offchain_db: backend.offchain_storage(),
540
372
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
541
372
                        transaction_pool.clone(),
542
372
                    )),
543
372
                    network_provider: Arc::new(network.network.clone()),
544
372
                    is_validator: parachain_config.role.is_authority(),
545
372
                    enable_http_requests: false,
546
8402
                    custom_extensions: move |_| vec![],
547
372
                })
548
372
                .run(client.clone(), task_manager.spawn_handle())
549
372
                .boxed(),
550
372
            );
551
372
        }
552

            
553
372
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
554
372
            rpc_builder,
555
372
            client: client.clone(),
556
372
            transaction_pool: transaction_pool.clone(),
557
372
            task_manager: &mut task_manager,
558
372
            config: parachain_config,
559
372
            keystore: keystore_container.keystore(),
560
372
            backend: backend.clone(),
561
372
            network: network.network.clone(),
562
372
            system_rpc_tx: network.system_rpc_tx.clone(),
563
372
            tx_handler_controller,
564
372
            telemetry: telemetry.as_mut(),
565
372
            sync_service: network.sync_service.clone(),
566
372
        })?;
567

            
568
372
        if let Some(hwbench) = &hwbench {
569
            sc_sysinfo::print_hwbench(hwbench);
570
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
571
            // in there and swapping out the requirements for your own are probably a good idea. The
572
            // requirements for a para-chain are dictated by its relay-chain.
573
            if collator {
574
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench) {
575
                    log::warn!(
576
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
577
                        err
578
                    );
579
                }
580
            }
581

            
582
            if let Some(ref mut telemetry) = telemetry {
583
                let telemetry_handle = telemetry.handle();
584
                task_manager.spawn_handle().spawn(
585
                    "telemetry_hwbench",
586
                    None,
587
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
588
                );
589
            }
590
372
        }
591

            
592
372
        Ok(NodeBuilder {
593
372
            client,
594
372
            backend,
595
372
            transaction_pool,
596
372
            telemetry,
597
372
            telemetry_worker_handle,
598
372
            task_manager,
599
372
            keystore_container,
600
372
            hwbench,
601
372
            prometheus_registry,
602
372
            network: TypeIdentity::from_type(network),
603
372
            tx_handler_controller: TypeIdentity::from_type(()),
604
372
            import_queue_service,
605
372
        })
606
372
    }
607

            
608
372
    pub fn install_manual_seal<BI, SC, CIDP>(
609
372
        &mut self,
610
372
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
611
372
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
612
372
    where
613
372
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
614
372
        SC: SelectChain<BlockOf<T>> + 'static,
615
372
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
616
372
    {
617
372
        let ManualSealConfiguration {
618
372
            sealing,
619
372
            soft_deadline,
620
372
            block_import,
621
372
            select_chain,
622
372
            consensus_data_provider,
623
372
            create_inherent_data_providers,
624
372
        } = manual_seal_config;
625
372

            
626
372
        let prometheus_registry = self.prometheus_registry.clone();
627
372

            
628
372
        let mut env = sc_basic_authorship::ProposerFactory::new(
629
372
            self.task_manager.spawn_handle(),
630
372
            self.client.clone(),
631
372
            self.transaction_pool.clone(),
632
372
            prometheus_registry.as_ref(),
633
372
            self.telemetry.as_ref().map(|x| x.handle()),
634
372
        );
635
372

            
636
372
        let mut command_sink = None;
637

            
638
372
        if let Some(deadline) = soft_deadline {
639
178
            env.set_soft_deadline(deadline);
640
372
        }
641

            
642
372
        let commands_stream: Box<
643
372
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
644
372
        > = match sealing {
645
            Sealing::Instant => {
646
16
                Box::new(
647
16
                    // This bit cribbed from the implementation of instant seal.
648
16
                    self.transaction_pool
649
16
                        .pool()
650
16
                        .validated_pool()
651
16
                        .import_notification_stream()
652
16
                        .map(|_| EngineCommand::SealNewBlock {
653
                            create_empty: false,
654
                            finalize: false,
655
                            parent_hash: None,
656
                            sender: None,
657
16
                        }),
658
16
                )
659
            }
660
            Sealing::Manual => {
661
356
                let (sink, stream) = futures::channel::mpsc::channel(1000);
662
356
                // Keep a reference to the other end of the channel. It goes to the RPC.
663
356
                command_sink = Some(sink);
664
356
                Box::new(stream)
665
            }
666
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
667
                Timer::interval(Duration::from_millis(millis)),
668
                |_| EngineCommand::SealNewBlock {
669
                    create_empty: true,
670
                    finalize: true,
671
                    parent_hash: None,
672
                    sender: None,
673
                },
674
            )),
675
        };
676

            
677
372
        self.task_manager.spawn_essential_handle().spawn_blocking(
678
372
            "authorship_task",
679
372
            Some("block-authoring"),
680
372
            run_manual_seal(ManualSealParams {
681
372
                block_import,
682
372
                env,
683
372
                client: self.client.clone(),
684
372
                pool: self.transaction_pool.clone(),
685
372
                commands_stream,
686
372
                select_chain,
687
372
                consensus_data_provider,
688
372
                create_inherent_data_providers,
689
372
            }),
690
372
        );
691
372

            
692
372
        Ok(command_sink)
693
372
    }
694

            
695
    pub fn start_full_node<RCInterface>(
696
        self,
697
        para_id: ParaId,
698
        relay_chain_interface: RCInterface,
699
        relay_chain_slot_duration: Duration,
700
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
701
    where
702
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
703
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
704
        RCInterface: RelayChainInterface + Clone + 'static,
705
    {
706
        let NodeBuilder {
707
            client,
708
            backend,
709
            transaction_pool,
710
            telemetry,
711
            telemetry_worker_handle,
712
            mut task_manager,
713
            keystore_container,
714
            hwbench,
715
            prometheus_registry,
716
            network,
717
            tx_handler_controller,
718
            import_queue_service,
719
        } = self;
720

            
721
        let network = TypeIdentity::into_type(network);
722
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
723

            
724
        let announce_block = {
725
            let sync_service = network.sync_service.clone();
726
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
727
        };
728

            
729
        let overseer_handle = relay_chain_interface
730
            .overseer_handle()
731
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
732

            
733
        let params = StartFullNodeParams {
734
            client: client.clone(),
735
            announce_block,
736
            task_manager: &mut task_manager,
737
            para_id,
738
            relay_chain_interface,
739
            relay_chain_slot_duration,
740
            import_queue: import_queue_service,
741
            recovery_handle: Box::new(overseer_handle),
742
            sync_service: network.sync_service.clone(),
743
        };
744

            
745
        // TODO: change for async backing
746
        #[allow(deprecated)]
747
        cumulus_client_service::start_full_node(params)?;
748

            
749
        Ok(NodeBuilder {
750
            client,
751
            backend,
752
            transaction_pool,
753
            telemetry,
754
            telemetry_worker_handle,
755
            task_manager,
756
            keystore_container,
757
            hwbench,
758
            prometheus_registry,
759
            network: TypeIdentity::from_type(network),
760
            tx_handler_controller,
761
            import_queue_service: (),
762
        })
763
    }
764

            
765
    pub async fn start_collator<RCInterface>(
766
        self,
767
        para_id: ParaId,
768
        relay_chain_interface: RCInterface,
769
        relay_chain_slot_duration: Duration,
770
        parachain_consensus: ParachainConsensusOf<T>,
771
        collator_key: CollatorPair,
772
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
773
    where
774
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
775
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
776
        RCInterface: RelayChainInterface + Clone + 'static,
777
    {
778
        let NodeBuilder {
779
            client,
780
            backend,
781
            transaction_pool,
782
            telemetry,
783
            telemetry_worker_handle,
784
            mut task_manager,
785
            keystore_container,
786
            hwbench,
787
            prometheus_registry,
788
            network,
789
            tx_handler_controller,
790
            import_queue_service,
791
        } = self;
792

            
793
        let network = TypeIdentity::into_type(network);
794
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
795

            
796
        let spawner = task_manager.spawn_handle();
797
        let announce_block = {
798
            let sync_service = network.sync_service.clone();
799
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
800
        };
801
        let overseer_handle = relay_chain_interface
802
            .overseer_handle()
803
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
804

            
805
        let params = cumulus_client_service::StartCollatorParams {
806
            para_id,
807
            block_status: client.clone(),
808
            announce_block: announce_block.clone(),
809
            client: client.clone(),
810
            task_manager: &mut task_manager,
811
            relay_chain_interface: relay_chain_interface.clone(),
812
            spawner: spawner.clone(),
813
            parachain_consensus,
814
            import_queue: import_queue_service,
815
            collator_key,
816
            relay_chain_slot_duration,
817
            recovery_handle: Box::new(overseer_handle.clone()),
818
            sync_service: network.sync_service.clone(),
819
        };
820

            
821
        // TODO: change for async backing
822
        #[allow(deprecated)]
823
        cumulus_client_service::start_collator(params).await?;
824

            
825
        Ok(NodeBuilder {
826
            client,
827
            backend,
828
            transaction_pool,
829
            telemetry,
830
            telemetry_worker_handle,
831
            task_manager,
832
            keystore_container,
833
            hwbench,
834
            prometheus_registry,
835
            network: TypeIdentity::from_type(network),
836
            tx_handler_controller,
837
            import_queue_service: (),
838
        })
839
    }
840

            
841
    pub fn extract_import_queue_service(
842
        self,
843
    ) -> (
844
        NodeBuilder<T, SNetwork, STxHandler, ()>,
845
        SImportQueueService,
846
    )
847
    where
848
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
849
    {
850
        let NodeBuilder {
851
            client,
852
            backend,
853
            transaction_pool,
854
            telemetry,
855
            telemetry_worker_handle,
856
            task_manager,
857
            keystore_container,
858
            hwbench,
859
            prometheus_registry,
860
            network,
861
            tx_handler_controller,
862
            import_queue_service,
863
        } = self;
864

            
865
        (
866
            NodeBuilder {
867
                client,
868
                backend,
869
                transaction_pool,
870
                telemetry,
871
                telemetry_worker_handle,
872
                task_manager,
873
                keystore_container,
874
                hwbench,
875
                prometheus_registry,
876
                network,
877
                tx_handler_controller,
878
                import_queue_service: (),
879
            },
880
            import_queue_service,
881
        )
882
    }
883

            
884
    pub fn cumulus_client_collator_params_generator(
885
        &self,
886
        para_id: ParaId,
887
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
888
        collator_key: CollatorPair,
889
        parachain_consensus: ParachainConsensusOf<T>,
890
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
891
        BlockOf<T>,
892
        ClientOf<T>,
893
        ClientOf<T>,
894
        SpawnTaskHandle,
895
    > + Send
896
           + Clone
897
           + 'static
898
    where
899
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
900
    {
901
        let network = TypeIdentity::as_type(&self.network);
902

            
903
        let client = self.client.clone();
904
        let announce_block = {
905
            let sync_service = network.sync_service.clone();
906
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
907
        };
908
        let spawner = self.task_manager.spawn_handle();
909

            
910
        move || cumulus_client_collator::StartCollatorParams {
911
            runtime_api: client.clone(),
912
            block_status: client.clone(),
913
            announce_block: announce_block.clone(),
914
            overseer_handle: overseer_handle.clone(),
915
            spawner: spawner.clone(),
916
            para_id,
917
            key: collator_key.clone(),
918
            parachain_consensus: parachain_consensus.clone(),
919
        }
920
    }
921
}
922

            
923
/// Block authoring scheme to be used by the dev service.
924
#[derive(Debug, Copy, Clone)]
925
pub enum Sealing {
926
    /// Author a block immediately upon receiving a transaction into the transaction pool
927
    Instant,
928
    /// Author a block upon receiving an RPC command
929
    Manual,
930
    /// Author blocks at a regular interval specified in milliseconds
931
    Interval(u64),
932
}
933

            
934
impl FromStr for Sealing {
935
    type Err = String;
936

            
937
1176
    fn from_str(s: &str) -> Result<Self, Self::Err> {
938
1176
        Ok(match s {
939
1176
            "instant" => Self::Instant,
940
1068
            "manual" => Self::Manual,
941
            s => {
942
                let millis = s
943
                    .parse::<u64>()
944
                    .map_err(|_| "couldn't decode sealing param")?;
945
                Self::Interval(millis)
946
            }
947
        })
948
1176
    }
949
}
950

            
951
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
952
    pub sealing: Sealing,
953
    pub block_import: BI,
954
    pub soft_deadline: Option<Percent>,
955
    pub select_chain: SC,
956
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = ()>>>,
957
    pub create_inherent_data_providers: CIDP,
958
}