1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_io::Timer,
19
    core::time::Duration,
20
    core_extensions::TypeIdentity,
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainConsensus,
23
    cumulus_client_service::{
24
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
25
        StartFullNodeParams,
26
    },
27
    cumulus_primitives_core::ParaId,
28
    cumulus_relay_chain_interface::RelayChainInterface,
29
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
30
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
31
    jsonrpsee::RpcModule,
32
    polkadot_primitives::CollatorPair,
33
    sc_client_api::Backend,
34
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
35
    sc_consensus_manual_seal::{
36
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
37
    },
38
    sc_executor::{
39
        sp_wasm_interface::{ExtendedHostFunctions, HostFunctions},
40
        HeapAllocStrategy, NativeExecutionDispatch, RuntimeVersionOf, WasmExecutor,
41
        DEFAULT_HEAP_ALLOC_STRATEGY,
42
    },
43
    sc_network::{config::FullNetworkConfiguration, NetworkBlock},
44
    sc_network_sync::SyncingService,
45
    sc_network_transactions::TransactionsHandlerController,
46
    sc_rpc::{DenyUnsafe, SubscriptionTaskExecutor},
47
    sc_service::{
48
        Configuration, KeystoreContainer, NetworkStarter, SpawnTaskHandle, TFullBackend,
49
        TFullClient, TaskManager,
50
    },
51
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
52
    sc_transaction_pool_api::OffchainTransactionPoolFactory,
53
    sc_utils::mpsc::TracingUnboundedSender,
54
    sp_api::ConstructRuntimeApi,
55
    sp_block_builder::BlockBuilder,
56
    sp_consensus::SelectChain,
57
    sp_core::traits::CodeExecutor,
58
    sp_inherents::CreateInherentDataProviders,
59
    sp_offchain::OffchainWorkerApi,
60
    sp_runtime::Percent,
61
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
62
    std::{str::FromStr, sync::Arc},
63
};
64

            
65
#[allow(deprecated)]
66
use sc_executor::NativeElseWasmExecutor;
67
use sp_api::StorageProof;
68

            
69
tp_traits::alias!(
70
    pub trait MinimalRuntimeApi<
71
        Block: (cumulus_primitives_core::BlockT),
72
        Client: (sp_api::CallApiAt<Block>),
73
    > :
74
        ConstructRuntimeApi<
75
            Block,
76
            Client,
77
            RuntimeApi:
78
                TaggedTransactionQueue<Block>
79
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
80
                + sp_api::Metadata<Block>
81
                + sp_session::SessionKeys<Block>,
82
        > + Send + Sync + 'static
83
);
84

            
85
tp_traits::alias!(
86
    pub trait MinimalCumulusRuntimeApi<
87
        Block: (cumulus_primitives_core::BlockT),
88
        Client: (sp_api::CallApiAt<Block>),
89
    > :
90
        MinimalRuntimeApi<Block, Client> +
91
        ConstructRuntimeApi<
92
            Block,
93
            Client,
94
            RuntimeApi:
95
                cumulus_primitives_core::CollectCollationInfo<Block>,
96
        >
97
);
98

            
99
/// Trait to configure the main types the builder rely on, bundled in a single
100
/// type to reduce verbosity and the amount of type parameters.
101
pub trait NodeBuilderConfig {
102
    type Block;
103
    type RuntimeApi;
104
    type ParachainExecutor;
105

            
106
    /// Create a new `NodeBuilder` using the types of this `Config`, along
107
    /// with the parachain `Configuration` and an optional `HwBench`.
108
386
    fn new_builder(
109
386
        parachain_config: &Configuration,
110
386
        hwbench: Option<sc_sysinfo::HwBench>,
111
386
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
112
386
    where
113
386
        Self: Sized,
114
386
        BlockOf<Self>: cumulus_primitives_core::BlockT,
115
386
        ExecutorOf<Self>:
116
386
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
117
386
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
118
386
    {
119
386
        NodeBuilder::<Self>::new(parachain_config, hwbench)
120
386
    }
121
}
122

            
123
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
124
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
125
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
126
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
127
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
128
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
129
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
130
pub type ConstructedRuntimeApiOf<T> =
131
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
132
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
133
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
134

            
135
// `Cumulus` and `TxHandler` are types that will change during the life of
136
// a `NodeBuilder` because they are generated and consumed when calling
137
// certain functions, with absence of data represented with `()`. Some
138
// function are implemented only for a given concrete type, which ensure it
139
// can only be called if the required data is available (generated and not yet
140
// consumed).
141
//
142
// While this could be implemented with multiple impl blocks with concrete types,
143
// we use here `core_extensions::TypeIdentity` which allow to express type
144
// identity/equality as a trait bound on each function as it removes the
145
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
146
// are still required since Rust can't infer the types in the `new` function
147
// that doesn't take `self`.
148
pub struct NodeBuilder<
149
    T: NodeBuilderConfig,
150
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
151
    // but can only be called with an `import_queue` which can be different in
152
    // each node. For that reason it is a `()` when calling `new`, then the
153
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
154
    // then call `build_cumulus_network` with it to generate the cumulus systems.
155
    SNetwork = (),
156
    // The `TxHandler` is constructed in `build_X_network`
157
    // and is then consumed when calling `spawn_common_tasks`.
158
    STxHandler = (),
159
    // The import queue service is obtained from the import queue in
160
    // `build_cumulus_network` or `build_substrate_network`, which also
161
    // consumes the import queue. Neither of them are clonable, so we need to
162
    // to store the service here to be able to consume it later in
163
    // `start_full_node`.
164
    SImportQueueService = (),
165
> where
166
    BlockOf<T>: cumulus_primitives_core::BlockT,
167
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
168
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
169
{
170
    pub client: Arc<ClientOf<T>>,
171
    pub backend: Arc<BackendOf<T>>,
172
    pub task_manager: TaskManager,
173
    pub keystore_container: KeystoreContainer,
174
    pub transaction_pool: Arc<sc_transaction_pool::FullPool<BlockOf<T>, ClientOf<T>>>,
175
    pub telemetry: Option<Telemetry>,
176
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
177

            
178
    pub hwbench: Option<sc_sysinfo::HwBench>,
179
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
180

            
181
    pub network: SNetwork,
182
    pub tx_handler_controller: STxHandler,
183
    pub import_queue_service: SImportQueueService,
184
}
185

            
186
pub struct Network<Block: cumulus_primitives_core::BlockT> {
187
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
188
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
189
    pub start_network: NetworkStarter,
190
    pub sync_service: Arc<SyncingService<Block>>,
191
}
192

            
193
/// Allows to create a parachain-defined executor from a `WasmExecutor`
194
pub trait TanssiExecutorExt {
195
    type HostFun: HostFunctions;
196
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
197
}
198

            
199
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
200
    type HostFun = ParachainHostFunctions;
201

            
202
618
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
203
618
        wasm_executor
204
618
    }
205
}
206

            
207
#[allow(deprecated)]
208
impl<D> TanssiExecutorExt for NativeElseWasmExecutor<D>
209
where
210
    D: NativeExecutionDispatch,
211
{
212
    type HostFun = ExtendedHostFunctions<sp_io::SubstrateHostFunctions, D::ExtendHostFunctions>;
213

            
214
180
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
215
180
        #[allow(deprecated)]
216
180
        NativeElseWasmExecutor::new_with_wasm_executor(wasm_executor)
217
180
    }
218
}
219

            
220
// `new` function doesn't take self, and the Rust compiler cannot infer that
221
// only one type T implements `TypeIdentity`. With thus need a separate impl
222
// block with concrete types `()`.
223
impl<T: NodeBuilderConfig> NodeBuilder<T>
224
where
225
    BlockOf<T>: cumulus_primitives_core::BlockT,
226
    ExecutorOf<T>:
227
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
228
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
229
{
230
    /// Create a new `NodeBuilder` which prepare objects required to launch a
231
    /// node. However it only starts telemetry, and doesn't provide any
232
    /// network-dependent objects (as it requires an import queue, which usually
233
    /// is different for each node).
234
386
    fn new(
235
386
        parachain_config: &Configuration,
236
386
        hwbench: Option<sc_sysinfo::HwBench>,
237
386
    ) -> Result<Self, sc_service::Error> {
238
        // Refactor: old new_partial
239

            
240
386
        let telemetry = parachain_config
241
386
            .telemetry_endpoints
242
386
            .clone()
243
386
            .filter(|x| !x.is_empty())
244
386
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
245
                let worker = TelemetryWorker::new(16)?;
246
                let telemetry = worker.handle().new_telemetry(endpoints);
247
                Ok((worker, telemetry))
248
386
            })
249
386
            .transpose()?;
250

            
251
386
        let heap_pages =
252
386
            parachain_config
253
386
                .default_heap_pages
254
386
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
255
                    extra_pages: h as u32,
256
386
                });
257
386

            
258
386
        // Default runtime_cache_size is 2
259
386
        // For now we can work with this, but it will likely need
260
386
        // to change once we start having runtime_cache_sizes, or
261
386
        // run nodes with the maximum for this value
262
386
        let mut wasm_builder = WasmExecutor::builder()
263
386
            .with_execution_method(parachain_config.wasm_method)
264
386
            .with_onchain_heap_alloc_strategy(heap_pages)
265
386
            .with_offchain_heap_alloc_strategy(heap_pages)
266
386
            .with_max_runtime_instances(parachain_config.max_runtime_instances)
267
386
            .with_runtime_cache_size(parachain_config.runtime_cache_size);
268
386
        if let Some(ref wasmtime_precompiled_path) = parachain_config.wasmtime_precompiled {
269
364
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
270
364
        }
271

            
272
386
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
273

            
274
386
        let (client, backend, keystore_container, task_manager) =
275
386
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
276
386
                parachain_config,
277
386
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
278
386
                executor,
279
386
                true,
280
386
            )?;
281
386
        let client = Arc::new(client);
282
386

            
283
386
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
284
386

            
285
386
        let telemetry = telemetry.map(|(worker, telemetry)| {
286
            task_manager
287
                .spawn_handle()
288
                .spawn("telemetry", None, worker.run());
289
            telemetry
290
386
        });
291
386

            
292
386
        let transaction_pool = sc_transaction_pool::BasicPool::new_full(
293
386
            parachain_config.transaction_pool.clone(),
294
386
            parachain_config.role.is_authority().into(),
295
386
            parachain_config.prometheus_registry(),
296
386
            task_manager.spawn_essential_handle(),
297
386
            client.clone(),
298
386
        );
299
386

            
300
386
        Ok(Self {
301
386
            client,
302
386
            backend,
303
386
            transaction_pool,
304
386
            telemetry,
305
386
            telemetry_worker_handle,
306
386
            task_manager,
307
386
            keystore_container,
308
386
            hwbench,
309
386
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
310
386
            network: TypeIdentity::from_type(()),
311
386
            tx_handler_controller: TypeIdentity::from_type(()),
312
386
            import_queue_service: TypeIdentity::from_type(()),
313
386
        })
314
386
    }
315
}
316

            
317
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
318
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
319
where
320
    BlockOf<T>: cumulus_primitives_core::BlockT,
321
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
322
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
323
{
324
    pub async fn build_relay_chain_interface(
325
        &mut self,
326
        parachain_config: &Configuration,
327
        polkadot_config: Configuration,
328
        collator_options: CollatorOptions,
329
    ) -> sc_service::error::Result<(
330
        Arc<(dyn RelayChainInterface + 'static)>,
331
        Option<CollatorPair>,
332
    )> {
333
        build_relay_chain_interface(
334
            polkadot_config,
335
            parachain_config,
336
            self.telemetry_worker_handle.clone(),
337
            &mut self.task_manager,
338
            collator_options.clone(),
339
            self.hwbench.clone(),
340
        )
341
        .await
342
        .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))
343
    }
344

            
345
    /// Given an import queue, calls `cumulus_client_service::build_network` and
346
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
347
    ///
348
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
349
    /// data.
350
    pub async fn build_cumulus_network<RCInterface, Net>(
351
        self,
352
        parachain_config: &Configuration,
353
        para_id: ParaId,
354
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
355
        relay_chain_interface: RCInterface,
356
    ) -> sc_service::error::Result<
357
        NodeBuilder<
358
            T,
359
            Network<BlockOf<T>>,
360
            TransactionsHandlerController<BlockHashOf<T>>,
361
            ImportQueueServiceOf<T>,
362
        >,
363
    >
364
    where
365
        SNetwork: TypeIdentity<Type = ()>,
366
        STxHandler: TypeIdentity<Type = ()>,
367
        SImportQueueService: TypeIdentity<Type = ()>,
368
        RCInterface: RelayChainInterface + Clone + 'static,
369
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
370
    {
371
        let Self {
372
            client,
373
            backend,
374
            transaction_pool,
375
            telemetry,
376
            telemetry_worker_handle,
377
            task_manager,
378
            keystore_container,
379
            hwbench,
380
            prometheus_registry,
381
            network: _,
382
            tx_handler_controller: _,
383
            import_queue_service: _,
384
        } = self;
385

            
386
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
387

            
388
        let import_queue_service = import_queue.service();
389
        let spawn_handle = task_manager.spawn_handle();
390

            
391
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
392
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
393
                parachain_config,
394
                client: client.clone(),
395
                transaction_pool: transaction_pool.clone(),
396
                spawn_handle,
397
                import_queue,
398
                para_id,
399
                relay_chain_interface,
400
                net_config,
401
                sybil_resistance_level: CollatorSybilResistance::Resistant,
402
            })
403
            .await?;
404

            
405
        Ok(NodeBuilder {
406
            client,
407
            backend,
408
            transaction_pool,
409
            telemetry,
410
            telemetry_worker_handle,
411
            task_manager,
412
            keystore_container,
413
            hwbench,
414
            prometheus_registry,
415
            network: Network {
416
                network,
417
                system_rpc_tx,
418
                start_network,
419
                sync_service,
420
            },
421
            tx_handler_controller,
422
            import_queue_service,
423
        })
424
    }
425

            
426
    /// Given an import queue, calls `sc_service::build_network` and
427
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
428
    ///
429
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
430
    /// data.
431
380
    pub fn build_substrate_network<Net>(
432
380
        self,
433
380
        parachain_config: &Configuration,
434
380
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
435
380
    ) -> sc_service::error::Result<
436
380
        NodeBuilder<
437
380
            T,
438
380
            Network<BlockOf<T>>,
439
380
            TransactionsHandlerController<BlockHashOf<T>>,
440
380
            ImportQueueServiceOf<T>,
441
380
        >,
442
380
    >
443
380
    where
444
380
        SNetwork: TypeIdentity<Type = ()>,
445
380
        STxHandler: TypeIdentity<Type = ()>,
446
380
        SImportQueueService: TypeIdentity<Type = ()>,
447
380
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
448
380
    {
449
380
        let Self {
450
380
            client,
451
380
            backend,
452
380
            transaction_pool,
453
380
            telemetry,
454
380
            telemetry_worker_handle,
455
380
            task_manager,
456
380
            keystore_container,
457
380
            hwbench,
458
380
            prometheus_registry,
459
380
            network: _,
460
380
            tx_handler_controller: _,
461
380
            import_queue_service: _,
462
380
        } = self;
463
380

            
464
380
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(&parachain_config.network);
465
380

            
466
380
        let metrics = Net::register_notification_metrics(
467
380
            parachain_config
468
380
                .prometheus_config
469
380
                .as_ref()
470
380
                .map(|cfg| &cfg.registry),
471
380
        );
472
380

            
473
380
        let import_queue_service = import_queue.service();
474

            
475
380
        let (network, system_rpc_tx, tx_handler_controller, start_network, sync_service) =
476
380
            sc_service::build_network(sc_service::BuildNetworkParams {
477
380
                config: parachain_config,
478
380
                client: client.clone(),
479
380
                transaction_pool: transaction_pool.clone(),
480
380
                spawn_handle: task_manager.spawn_handle(),
481
380
                import_queue,
482
380
                warp_sync_params: None,
483
380
                block_announce_validator_builder: None,
484
380
                net_config,
485
380
                block_relay: None,
486
380
                metrics,
487
380
            })?;
488

            
489
380
        Ok(NodeBuilder {
490
380
            client,
491
380
            backend,
492
380
            transaction_pool,
493
380
            telemetry,
494
380
            telemetry_worker_handle,
495
380
            task_manager,
496
380
            keystore_container,
497
380
            hwbench,
498
380
            prometheus_registry,
499
380
            network: Network {
500
380
                network,
501
380
                system_rpc_tx,
502
380
                start_network,
503
380
                sync_service,
504
380
            },
505
380
            tx_handler_controller,
506
380
            import_queue_service,
507
380
        })
508
380
    }
509

            
510
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
511
    /// node. It consumes `self.tx_handler_controller` in the process, which means
512
    /// it can only be called once, and any other code that would need this
513
    /// controller should interact with it before calling this function.
514
380
    pub fn spawn_common_tasks<TRpc>(
515
380
        self,
516
380
        parachain_config: Configuration,
517
380
        rpc_builder: Box<
518
380
            dyn Fn(
519
380
                DenyUnsafe,
520
380
                SubscriptionTaskExecutor,
521
380
            ) -> Result<RpcModule<TRpc>, sc_service::Error>,
522
380
        >,
523
380
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
524
380
    where
525
380
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
526
380
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
527
380
        BlockHashOf<T>: Unpin,
528
380
        BlockHeaderOf<T>: Unpin,
529
380
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
530
380
            + BlockBuilder<BlockOf<T>>
531
380
            + OffchainWorkerApi<BlockOf<T>>
532
380
            + sp_api::Metadata<BlockOf<T>>
533
380
            + sp_session::SessionKeys<BlockOf<T>>,
534
380
    {
535
380
        let NodeBuilder {
536
380
            client,
537
380
            backend,
538
380
            transaction_pool,
539
380
            mut telemetry,
540
380
            telemetry_worker_handle,
541
380
            mut task_manager,
542
380
            keystore_container,
543
380
            hwbench,
544
380
            prometheus_registry,
545
380
            network,
546
380
            tx_handler_controller,
547
380
            import_queue_service,
548
380
        } = self;
549
380

            
550
380
        let network = TypeIdentity::into_type(network);
551
380
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
552
380

            
553
380
        let collator = parachain_config.role.is_authority();
554
380

            
555
380
        if parachain_config.offchain_worker.enabled {
556
380
            task_manager.spawn_handle().spawn(
557
380
                "offchain-workers-runner",
558
380
                "offchain-work",
559
380
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
560
380
                    runtime_api_provider: client.clone(),
561
380
                    keystore: Some(keystore_container.keystore()),
562
380
                    offchain_db: backend.offchain_storage(),
563
380
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
564
380
                        transaction_pool.clone(),
565
380
                    )),
566
380
                    network_provider: Arc::new(network.network.clone()),
567
380
                    is_validator: parachain_config.role.is_authority(),
568
380
                    enable_http_requests: false,
569
8460
                    custom_extensions: move |_| vec![],
570
380
                })
571
380
                .run(client.clone(), task_manager.spawn_handle())
572
380
                .boxed(),
573
380
            );
574
380
        }
575

            
576
380
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
577
380
            rpc_builder,
578
380
            client: client.clone(),
579
380
            transaction_pool: transaction_pool.clone(),
580
380
            task_manager: &mut task_manager,
581
380
            config: parachain_config,
582
380
            keystore: keystore_container.keystore(),
583
380
            backend: backend.clone(),
584
380
            network: network.network.clone(),
585
380
            system_rpc_tx: network.system_rpc_tx.clone(),
586
380
            tx_handler_controller,
587
380
            telemetry: telemetry.as_mut(),
588
380
            sync_service: network.sync_service.clone(),
589
380
        })?;
590

            
591
380
        if let Some(hwbench) = &hwbench {
592
            sc_sysinfo::print_hwbench(hwbench);
593
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
594
            // in there and swapping out the requirements for your own are probably a good idea. The
595
            // requirements for a para-chain are dictated by its relay-chain.
596
            if collator {
597
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench) {
598
                    log::warn!(
599
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
600
                        err
601
                    );
602
                }
603
            }
604

            
605
            if let Some(ref mut telemetry) = telemetry {
606
                let telemetry_handle = telemetry.handle();
607
                task_manager.spawn_handle().spawn(
608
                    "telemetry_hwbench",
609
                    None,
610
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
611
                );
612
            }
613
380
        }
614

            
615
380
        Ok(NodeBuilder {
616
380
            client,
617
380
            backend,
618
380
            transaction_pool,
619
380
            telemetry,
620
380
            telemetry_worker_handle,
621
380
            task_manager,
622
380
            keystore_container,
623
380
            hwbench,
624
380
            prometheus_registry,
625
380
            network: TypeIdentity::from_type(network),
626
380
            tx_handler_controller: TypeIdentity::from_type(()),
627
380
            import_queue_service,
628
380
        })
629
380
    }
630

            
631
380
    pub fn install_manual_seal<BI, SC, CIDP>(
632
380
        &mut self,
633
380
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
634
380
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
635
380
    where
636
380
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
637
380
        SC: SelectChain<BlockOf<T>> + 'static,
638
380
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
639
380
    {
640
380
        let ManualSealConfiguration {
641
380
            sealing,
642
380
            soft_deadline,
643
380
            block_import,
644
380
            select_chain,
645
380
            consensus_data_provider,
646
380
            create_inherent_data_providers,
647
380
        } = manual_seal_config;
648
380

            
649
380
        let prometheus_registry = self.prometheus_registry.clone();
650
380

            
651
380
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
652
380
            self.task_manager.spawn_handle(),
653
380
            self.client.clone(),
654
380
            self.transaction_pool.clone(),
655
380
            prometheus_registry.as_ref(),
656
380
            self.telemetry.as_ref().map(|x| x.handle()),
657
380
        );
658
380

            
659
380
        let mut command_sink = None;
660

            
661
380
        if let Some(deadline) = soft_deadline {
662
178
            env.set_soft_deadline(deadline);
663
380
        }
664

            
665
380
        let commands_stream: Box<
666
380
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
667
380
        > = match sealing {
668
            Sealing::Instant => {
669
16
                Box::new(
670
16
                    // This bit cribbed from the implementation of instant seal.
671
16
                    self.transaction_pool
672
16
                        .pool()
673
16
                        .validated_pool()
674
16
                        .import_notification_stream()
675
16
                        .map(|_| EngineCommand::SealNewBlock {
676
                            create_empty: false,
677
                            finalize: false,
678
                            parent_hash: None,
679
                            sender: None,
680
16
                        }),
681
16
                )
682
            }
683
            Sealing::Manual => {
684
364
                let (sink, stream) = futures::channel::mpsc::channel(1000);
685
364
                // Keep a reference to the other end of the channel. It goes to the RPC.
686
364
                command_sink = Some(sink);
687
364
                Box::new(stream)
688
            }
689
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
690
                Timer::interval(Duration::from_millis(millis)),
691
                |_| EngineCommand::SealNewBlock {
692
                    create_empty: true,
693
                    finalize: true,
694
                    parent_hash: None,
695
                    sender: None,
696
                },
697
            )),
698
        };
699

            
700
380
        self.task_manager.spawn_essential_handle().spawn_blocking(
701
380
            "authorship_task",
702
380
            Some("block-authoring"),
703
380
            run_manual_seal(ManualSealParams {
704
380
                block_import,
705
380
                env,
706
380
                client: self.client.clone(),
707
380
                pool: self.transaction_pool.clone(),
708
380
                commands_stream,
709
380
                select_chain,
710
380
                consensus_data_provider,
711
380
                create_inherent_data_providers,
712
380
            }),
713
380
        );
714
380

            
715
380
        Ok(command_sink)
716
380
    }
717

            
718
    pub fn start_full_node<RCInterface>(
719
        self,
720
        para_id: ParaId,
721
        relay_chain_interface: RCInterface,
722
        relay_chain_slot_duration: Duration,
723
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
724
    where
725
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
726
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
727
        RCInterface: RelayChainInterface + Clone + 'static,
728
    {
729
        let NodeBuilder {
730
            client,
731
            backend,
732
            transaction_pool,
733
            telemetry,
734
            telemetry_worker_handle,
735
            mut task_manager,
736
            keystore_container,
737
            hwbench,
738
            prometheus_registry,
739
            network,
740
            tx_handler_controller,
741
            import_queue_service,
742
        } = self;
743

            
744
        let network = TypeIdentity::into_type(network);
745
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
746

            
747
        let announce_block = {
748
            let sync_service = network.sync_service.clone();
749
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
750
        };
751

            
752
        let overseer_handle = relay_chain_interface
753
            .overseer_handle()
754
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
755

            
756
        let params = StartFullNodeParams {
757
            client: client.clone(),
758
            announce_block,
759
            task_manager: &mut task_manager,
760
            para_id,
761
            relay_chain_interface,
762
            relay_chain_slot_duration,
763
            import_queue: import_queue_service,
764
            recovery_handle: Box::new(overseer_handle),
765
            sync_service: network.sync_service.clone(),
766
        };
767

            
768
        // TODO: change for async backing
769
        #[allow(deprecated)]
770
        cumulus_client_service::start_full_node(params)?;
771

            
772
        Ok(NodeBuilder {
773
            client,
774
            backend,
775
            transaction_pool,
776
            telemetry,
777
            telemetry_worker_handle,
778
            task_manager,
779
            keystore_container,
780
            hwbench,
781
            prometheus_registry,
782
            network: TypeIdentity::from_type(network),
783
            tx_handler_controller,
784
            import_queue_service: (),
785
        })
786
    }
787

            
788
    pub async fn start_collator<RCInterface>(
789
        self,
790
        para_id: ParaId,
791
        relay_chain_interface: RCInterface,
792
        relay_chain_slot_duration: Duration,
793
        parachain_consensus: ParachainConsensusOf<T>,
794
        collator_key: CollatorPair,
795
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
796
    where
797
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
798
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
799
        RCInterface: RelayChainInterface + Clone + 'static,
800
    {
801
        let NodeBuilder {
802
            client,
803
            backend,
804
            transaction_pool,
805
            telemetry,
806
            telemetry_worker_handle,
807
            mut task_manager,
808
            keystore_container,
809
            hwbench,
810
            prometheus_registry,
811
            network,
812
            tx_handler_controller,
813
            import_queue_service,
814
        } = self;
815

            
816
        let network = TypeIdentity::into_type(network);
817
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
818

            
819
        let spawner = task_manager.spawn_handle();
820
        let announce_block = {
821
            let sync_service = network.sync_service.clone();
822
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
823
        };
824
        let overseer_handle = relay_chain_interface
825
            .overseer_handle()
826
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
827

            
828
        let params = cumulus_client_service::StartCollatorParams {
829
            para_id,
830
            block_status: client.clone(),
831
            announce_block: announce_block.clone(),
832
            client: client.clone(),
833
            task_manager: &mut task_manager,
834
            relay_chain_interface: relay_chain_interface.clone(),
835
            spawner: spawner.clone(),
836
            parachain_consensus,
837
            import_queue: import_queue_service,
838
            collator_key,
839
            relay_chain_slot_duration,
840
            recovery_handle: Box::new(overseer_handle.clone()),
841
            sync_service: network.sync_service.clone(),
842
        };
843

            
844
        // TODO: change for async backing
845
        #[allow(deprecated)]
846
        cumulus_client_service::start_collator(params).await?;
847

            
848
        Ok(NodeBuilder {
849
            client,
850
            backend,
851
            transaction_pool,
852
            telemetry,
853
            telemetry_worker_handle,
854
            task_manager,
855
            keystore_container,
856
            hwbench,
857
            prometheus_registry,
858
            network: TypeIdentity::from_type(network),
859
            tx_handler_controller,
860
            import_queue_service: (),
861
        })
862
    }
863

            
864
    pub fn extract_import_queue_service(
865
        self,
866
    ) -> (
867
        NodeBuilder<T, SNetwork, STxHandler, ()>,
868
        SImportQueueService,
869
    )
870
    where
871
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
872
    {
873
        let NodeBuilder {
874
            client,
875
            backend,
876
            transaction_pool,
877
            telemetry,
878
            telemetry_worker_handle,
879
            task_manager,
880
            keystore_container,
881
            hwbench,
882
            prometheus_registry,
883
            network,
884
            tx_handler_controller,
885
            import_queue_service,
886
        } = self;
887

            
888
        (
889
            NodeBuilder {
890
                client,
891
                backend,
892
                transaction_pool,
893
                telemetry,
894
                telemetry_worker_handle,
895
                task_manager,
896
                keystore_container,
897
                hwbench,
898
                prometheus_registry,
899
                network,
900
                tx_handler_controller,
901
                import_queue_service: (),
902
            },
903
            import_queue_service,
904
        )
905
    }
906

            
907
    pub fn cumulus_client_collator_params_generator(
908
        &self,
909
        para_id: ParaId,
910
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
911
        collator_key: CollatorPair,
912
        parachain_consensus: ParachainConsensusOf<T>,
913
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
914
        BlockOf<T>,
915
        ClientOf<T>,
916
        ClientOf<T>,
917
        SpawnTaskHandle,
918
    > + Send
919
           + Clone
920
           + 'static
921
    where
922
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
923
    {
924
        let network = TypeIdentity::as_type(&self.network);
925

            
926
        let client = self.client.clone();
927
        let announce_block = {
928
            let sync_service = network.sync_service.clone();
929
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
930
        };
931
        let spawner = self.task_manager.spawn_handle();
932

            
933
        move || cumulus_client_collator::StartCollatorParams {
934
            runtime_api: client.clone(),
935
            block_status: client.clone(),
936
            announce_block: announce_block.clone(),
937
            overseer_handle: overseer_handle.clone(),
938
            spawner: spawner.clone(),
939
            para_id,
940
            key: collator_key.clone(),
941
            parachain_consensus: parachain_consensus.clone(),
942
        }
943
    }
944
}
945

            
946
/// Block authoring scheme to be used by the dev service.
947
#[derive(Debug, Copy, Clone)]
948
pub enum Sealing {
949
    /// Author a block immediately upon receiving a transaction into the transaction pool
950
    Instant,
951
    /// Author a block upon receiving an RPC command
952
    Manual,
953
    /// Author blocks at a regular interval specified in milliseconds
954
    Interval(u64),
955
}
956

            
957
impl FromStr for Sealing {
958
    type Err = String;
959

            
960
1200
    fn from_str(s: &str) -> Result<Self, Self::Err> {
961
1200
        Ok(match s {
962
1200
            "instant" => Self::Instant,
963
1092
            "manual" => Self::Manual,
964
            s => {
965
                let millis = s
966
                    .parse::<u64>()
967
                    .map_err(|_| "couldn't decode sealing param")?;
968
                Self::Interval(millis)
969
            }
970
        })
971
1200
    }
972
}
973

            
974
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
975
    pub sealing: Sealing,
976
    pub block_import: BI,
977
    pub soft_deadline: Option<Percent>,
978
    pub select_chain: SC,
979
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
980
    pub create_inherent_data_providers: CIDP,
981
}