1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    async_channel::Receiver,
19
    async_io::Timer,
20
    core::time::Duration,
21
    core_extensions::TypeIdentity,
22
    cumulus_client_bootnodes::{start_bootnode_tasks, StartBootnodeTasksParams},
23
    cumulus_client_cli::CollatorOptions,
24
    cumulus_client_consensus_common::ParachainConsensus,
25
    cumulus_client_service::{
26
        build_relay_chain_interface, CollatorSybilResistance, ParachainHostFunctions,
27
        StartFullNodeParams,
28
    },
29
    cumulus_primitives_core::ParaId,
30
    cumulus_relay_chain_interface::RelayChainInterface,
31
    frame_benchmarking_cli::SUBSTRATE_REFERENCE_HARDWARE,
32
    futures::{channel::mpsc, FutureExt, Stream, StreamExt},
33
    jsonrpsee::RpcModule,
34
    polkadot_primitives::CollatorPair,
35
    sc_client_api::Backend,
36
    sc_consensus::{import_queue::ImportQueueService, BlockImport, ImportQueue},
37
    sc_consensus_manual_seal::{
38
        run_manual_seal, ConsensusDataProvider, EngineCommand, ManualSealParams,
39
    },
40
    sc_executor::{
41
        sp_wasm_interface::HostFunctions, HeapAllocStrategy, RuntimeVersionOf, WasmExecutor,
42
        DEFAULT_HEAP_ALLOC_STRATEGY,
43
    },
44
    sc_network::{
45
        config::FullNetworkConfiguration, request_responses::IncomingRequest,
46
        service::traits::NetworkService, NetworkBlock,
47
    },
48
    sc_network_sync::SyncingService,
49
    sc_network_transactions::TransactionsHandlerController,
50
    sc_service::{
51
        config::Multiaddr, Configuration, KeystoreContainer, SpawnTaskHandle, TFullBackend,
52
        TFullClient, TaskManager,
53
    },
54
    sc_telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
55
    sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool},
56
    sc_utils::mpsc::TracingUnboundedSender,
57
    sp_api::{ConstructRuntimeApi, StorageProof},
58
    sp_block_builder::BlockBuilder,
59
    sp_consensus::SelectChain,
60
    sp_core::{
61
        traits::{CodeExecutor, SpawnNamed},
62
        H256,
63
    },
64
    sp_inherents::CreateInherentDataProviders,
65
    sp_offchain::OffchainWorkerApi,
66
    sp_runtime::Percent,
67
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
68
    std::{str::FromStr, sync::Arc},
69
};
70

            
71
tp_traits::alias!(
72
    pub trait MinimalRuntimeApi<
73
        Block: (cumulus_primitives_core::BlockT),
74
        Client: (sp_api::CallApiAt<Block>),
75
    > :
76
        ConstructRuntimeApi<
77
            Block,
78
            Client,
79
            RuntimeApi:
80
                TaggedTransactionQueue<Block>
81
                + BlockBuilder<Block> + OffchainWorkerApi<Block>
82
                + sp_api::Metadata<Block>
83
                + sp_session::SessionKeys<Block>,
84
        > + Send + Sync + 'static
85
);
86

            
87
tp_traits::alias!(
88
    pub trait MinimalCumulusRuntimeApi<
89
        Block: (cumulus_primitives_core::BlockT),
90
        Client: (sp_api::CallApiAt<Block>),
91
    > :
92
        MinimalRuntimeApi<Block, Client> +
93
        ConstructRuntimeApi<
94
            Block,
95
            Client,
96
            RuntimeApi:
97
                cumulus_primitives_core::CollectCollationInfo<Block>,
98
        >
99
);
100

            
101
/// Trait to configure the main types the builder rely on, bundled in a single
102
/// type to reduce verbosity and the amount of type parameters.
103
pub trait NodeBuilderConfig {
104
    type Block;
105
    type RuntimeApi;
106
    type ParachainExecutor;
107

            
108
    /// Create a new `NodeBuilder` using the types of this `Config`, along
109
    /// with the parachain `Configuration` and an optional `HwBench`.
110
442
    fn new_builder(
111
442
        parachain_config: &Configuration,
112
442
        hwbench: Option<sc_sysinfo::HwBench>,
113
442
    ) -> Result<NodeBuilder<Self>, sc_service::Error>
114
442
    where
115
442
        Self: Sized,
116
442
        BlockOf<Self>: cumulus_primitives_core::BlockT,
117
442
        ExecutorOf<Self>:
118
442
            Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
119
442
        RuntimeApiOf<Self>: MinimalRuntimeApi<BlockOf<Self>, ClientOf<Self>>,
120
442
        BlockHashOf<Self>: Unpin,
121
    {
122
442
        NodeBuilder::<Self>::new(parachain_config, hwbench)
123
442
    }
124
}
125

            
126
pub type BlockOf<T> = <T as NodeBuilderConfig>::Block;
127
pub type BlockHashOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Hash;
128
pub type BlockHeaderOf<T> = <BlockOf<T> as cumulus_primitives_core::BlockT>::Header;
129
pub type RuntimeApiOf<T> = <T as NodeBuilderConfig>::RuntimeApi;
130
pub type ExecutorOf<T> = <T as NodeBuilderConfig>::ParachainExecutor;
131
pub type ClientOf<T> = TFullClient<BlockOf<T>, RuntimeApiOf<T>, ExecutorOf<T>>;
132
pub type BackendOf<T> = TFullBackend<BlockOf<T>>;
133
pub type ConstructedRuntimeApiOf<T> =
134
    <RuntimeApiOf<T> as ConstructRuntimeApi<BlockOf<T>, ClientOf<T>>>::RuntimeApi;
135
pub type ImportQueueServiceOf<T> = Box<dyn ImportQueueService<BlockOf<T>>>;
136
pub type ParachainConsensusOf<T> = Box<dyn ParachainConsensus<BlockOf<T>>>;
137

            
138
// `Cumulus` and `TxHandler` are types that will change during the life of
139
// a `NodeBuilder` because they are generated and consumed when calling
140
// certain functions, with absence of data represented with `()`. Some
141
// function are implemented only for a given concrete type, which ensure it
142
// can only be called if the required data is available (generated and not yet
143
// consumed).
144
//
145
// While this could be implemented with multiple impl blocks with concrete types,
146
// we use here `core_extensions::TypeIdentity` which allow to express type
147
// identity/equality as a trait bound on each function as it removes the
148
// boilerplate of many impl block with duplicated trait bounds. 2 impl blocks
149
// are still required since Rust can't infer the types in the `new` function
150
// that doesn't take `self`.
151
pub struct NodeBuilder<
152
    T: NodeBuilderConfig,
153
    // `(cumulus_client_service/sc_service)::build_network` returns many important systems,
154
    // but can only be called with an `import_queue` which can be different in
155
    // each node. For that reason it is a `()` when calling `new`, then the
156
    // caller create the `import_queue` using systems contained in `NodeBuilder`,
157
    // then call `build_cumulus_network` with it to generate the cumulus systems.
158
    SNetwork = (),
159
    // The `TxHandler` is constructed in `build_X_network`
160
    // and is then consumed when calling `spawn_common_tasks`.
161
    STxHandler = (),
162
    // The import queue service is obtained from the import queue in
163
    // `build_cumulus_network` or `build_substrate_network`, which also
164
    // consumes the import queue. Neither of them are clonable, so we need to
165
    // to store the service here to be able to consume it later in
166
    // `start_full_node`.
167
    SImportQueueService = (),
168
> where
169
    BlockOf<T>: cumulus_primitives_core::BlockT,
170
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
171
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
172
{
173
    pub client: Arc<ClientOf<T>>,
174
    pub backend: Arc<BackendOf<T>>,
175
    pub task_manager: TaskManager,
176
    pub keystore_container: KeystoreContainer,
177
    pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<BlockOf<T>, ClientOf<T>>>,
178
    pub telemetry: Option<Telemetry>,
179
    pub telemetry_worker_handle: Option<TelemetryWorkerHandle>,
180

            
181
    pub hwbench: Option<sc_sysinfo::HwBench>,
182
    pub prometheus_registry: Option<substrate_prometheus_endpoint::Registry>,
183

            
184
    pub network: SNetwork,
185
    pub tx_handler_controller: STxHandler,
186
    pub import_queue_service: SImportQueueService,
187
}
188

            
189
pub struct Network<Block: cumulus_primitives_core::BlockT> {
190
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
191
    pub system_rpc_tx: TracingUnboundedSender<sc_rpc::system::Request<Block>>,
192
    pub sync_service: Arc<SyncingService<Block>>,
193
}
194

            
195
/// Allows to create a parachain-defined executor from a `WasmExecutor`
196
pub trait TanssiExecutorExt {
197
    type HostFun: HostFunctions;
198
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self;
199
}
200

            
201
impl TanssiExecutorExt for WasmExecutor<ParachainHostFunctions> {
202
    type HostFun = ParachainHostFunctions;
203

            
204
1326
    fn new_with_wasm_executor(wasm_executor: WasmExecutor<Self::HostFun>) -> Self {
205
1326
        wasm_executor
206
1326
    }
207
}
208

            
209
// `new` function doesn't take self, and the Rust compiler cannot infer that
210
// only one type T implements `TypeIdentity`. With thus need a separate impl
211
// block with concrete types `()`.
212
impl<T: NodeBuilderConfig> NodeBuilder<T>
213
where
214
    BlockOf<T>: cumulus_primitives_core::BlockT,
215
    ExecutorOf<T>:
216
        Clone + CodeExecutor + RuntimeVersionOf + TanssiExecutorExt + Sync + Send + 'static,
217
    RuntimeApiOf<T>: MinimalRuntimeApi<BlockOf<T>, ClientOf<T>>,
218
    BlockHashOf<T>: Unpin,
219
{
220
    /// Create a new `NodeBuilder` which prepare objects required to launch a
221
    /// node. However it only starts telemetry, and doesn't provide any
222
    /// network-dependent objects (as it requires an import queue, which usually
223
    /// is different for each node).
224
442
    fn new(
225
442
        parachain_config: &Configuration,
226
442
        hwbench: Option<sc_sysinfo::HwBench>,
227
442
    ) -> Result<Self, sc_service::Error> {
228
        // Refactor: old new_partial
229

            
230
442
        let telemetry = parachain_config
231
442
            .telemetry_endpoints
232
442
            .clone()
233
442
            .filter(|x| !x.is_empty())
234
442
            .map(|endpoints| -> Result<_, sc_telemetry::Error> {
235
                let worker = TelemetryWorker::new(16)?;
236
                let telemetry = worker.handle().new_telemetry(endpoints);
237
                Ok((worker, telemetry))
238
            })
239
442
            .transpose()?;
240

            
241
442
        let heap_pages =
242
442
            parachain_config
243
442
                .executor
244
442
                .default_heap_pages
245
442
                .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
246
                    extra_pages: h as u32,
247
                });
248

            
249
        // Default runtime_cache_size is 2
250
        // For now we can work with this, but it will likely need
251
        // to change once we start having runtime_cache_sizes, or
252
        // run nodes with the maximum for this value
253
442
        let mut wasm_builder = WasmExecutor::builder()
254
442
            .with_execution_method(parachain_config.executor.wasm_method)
255
442
            .with_onchain_heap_alloc_strategy(heap_pages)
256
442
            .with_offchain_heap_alloc_strategy(heap_pages)
257
442
            .with_max_runtime_instances(parachain_config.executor.max_runtime_instances)
258
442
            .with_runtime_cache_size(parachain_config.executor.runtime_cache_size);
259
442
        if let Some(ref wasmtime_precompiled_path) = parachain_config.executor.wasmtime_precompiled
260
416
        {
261
416
            wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
262
416
        }
263

            
264
442
        let executor = ExecutorOf::<T>::new_with_wasm_executor(wasm_builder.build());
265

            
266
442
        let (client, backend, keystore_container, task_manager) =
267
442
            sc_service::new_full_parts_record_import::<BlockOf<T>, RuntimeApiOf<T>, _>(
268
442
                parachain_config,
269
442
                telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
270
442
                executor,
271
                true,
272
            )?;
273
442
        let client = Arc::new(client);
274

            
275
442
        let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
276

            
277
442
        let telemetry = telemetry.map(|(worker, telemetry)| {
278
            task_manager
279
                .spawn_handle()
280
                .spawn("telemetry", None, worker.run());
281
            telemetry
282
        });
283

            
284
442
        let transaction_pool = sc_transaction_pool::Builder::new(
285
442
            task_manager.spawn_essential_handle(),
286
442
            client.clone(),
287
442
            parachain_config.role.is_authority().into(),
288
        )
289
442
        .with_options(parachain_config.transaction_pool.clone())
290
442
        .with_prometheus(parachain_config.prometheus_registry())
291
442
        .build();
292

            
293
442
        Ok(Self {
294
442
            client,
295
442
            backend,
296
442
            transaction_pool: transaction_pool.into(),
297
442
            telemetry,
298
442
            telemetry_worker_handle,
299
442
            task_manager,
300
442
            keystore_container,
301
442
            hwbench,
302
442
            prometheus_registry: parachain_config.prometheus_registry().cloned(),
303
442
            network: TypeIdentity::from_type(()),
304
442
            tx_handler_controller: TypeIdentity::from_type(()),
305
442
            import_queue_service: TypeIdentity::from_type(()),
306
442
        })
307
442
    }
308
}
309

            
310
#[derive(Clone)]
311
pub struct StartBootnodeParams {
312
    pub relay_chain_fork_id: Option<String>,
313
    pub parachain_fork_id: Option<String>,
314
    pub advertise_non_global_ips: bool,
315
    pub parachain_public_addresses: Vec<Multiaddr>,
316
    pub relay_chain_network: Arc<dyn NetworkService>,
317
    pub paranode_rx: Receiver<IncomingRequest>,
318
    pub embedded_dht_bootnode: bool,
319
    pub dht_bootnode_discovery: bool,
320
}
321

            
322
impl<T: NodeBuilderConfig, SNetwork, STxHandler, SImportQueueService>
323
    NodeBuilder<T, SNetwork, STxHandler, SImportQueueService>
324
where
325
    BlockOf<T>: cumulus_primitives_core::BlockT,
326
    ExecutorOf<T>: Clone + CodeExecutor + RuntimeVersionOf + Sync + Send + 'static,
327
    RuntimeApiOf<T>: MinimalCumulusRuntimeApi<BlockOf<T>, ClientOf<T>>,
328
{
329
    pub async fn build_relay_chain_interface(
330
        &mut self,
331
        parachain_config: &Configuration,
332
        polkadot_config: Configuration,
333
        collator_options: CollatorOptions,
334
    ) -> sc_service::error::Result<(
335
        Arc<(dyn RelayChainInterface + 'static)>,
336
        Option<CollatorPair>,
337
        StartBootnodeParams,
338
    )> {
339
        let relay_chain_fork_id = polkadot_config
340
            .chain_spec
341
            .fork_id()
342
            .map(ToString::to_string);
343
        let parachain_fork_id = parachain_config
344
            .chain_spec
345
            .fork_id()
346
            .map(ToString::to_string);
347
        let advertise_non_global_ips = parachain_config.network.allow_non_globals_in_dht;
348
        let parachain_public_addresses = parachain_config.network.public_addresses.clone();
349

            
350
        let (relay_chain_interface, collator_key, relay_chain_network, paranode_rx) =
351
            build_relay_chain_interface(
352
                polkadot_config,
353
                parachain_config,
354
                self.telemetry_worker_handle.clone(),
355
                &mut self.task_manager,
356
                collator_options.clone(),
357
                self.hwbench.clone(),
358
            )
359
            .await
360
            .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
361

            
362
        let start_bootnode_params = StartBootnodeParams {
363
            relay_chain_fork_id,
364
            parachain_fork_id,
365
            advertise_non_global_ips,
366
            parachain_public_addresses,
367
            relay_chain_network,
368
            paranode_rx,
369
            embedded_dht_bootnode: collator_options.embedded_dht_bootnode,
370
            dht_bootnode_discovery: collator_options.dht_bootnode_discovery,
371
        };
372

            
373
        Ok((relay_chain_interface, collator_key, start_bootnode_params))
374
    }
375

            
376
    /// Given an import queue, calls [`cumulus_client_service::build_network`] and
377
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
378
    ///
379
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
380
    /// data.
381
    pub async fn build_cumulus_network<RCInterface, Net>(
382
        self,
383
        parachain_config: &Configuration,
384
        para_id: ParaId,
385
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
386
        relay_chain_interface: RCInterface,
387
    ) -> sc_service::error::Result<
388
        NodeBuilder<
389
            T,
390
            Network<BlockOf<T>>,
391
            TransactionsHandlerController<BlockHashOf<T>>,
392
            ImportQueueServiceOf<T>,
393
        >,
394
    >
395
    where
396
        SNetwork: TypeIdentity<Type = ()>,
397
        STxHandler: TypeIdentity<Type = ()>,
398
        SImportQueueService: TypeIdentity<Type = ()>,
399
        RCInterface: RelayChainInterface + Clone + 'static,
400
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
401
    {
402
        let Self {
403
            client,
404
            backend,
405
            transaction_pool,
406
            telemetry,
407
            telemetry_worker_handle,
408
            task_manager,
409
            keystore_container,
410
            hwbench,
411
            prometheus_registry,
412
            network: _,
413
            tx_handler_controller: _,
414
            import_queue_service: _,
415
        } = self;
416

            
417
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
418
            &parachain_config.network,
419
            prometheus_registry.clone(),
420
        );
421

            
422
        let import_queue_service = import_queue.service();
423
        let spawn_handle = task_manager.spawn_handle();
424

            
425
        let metrics = Net::register_notification_metrics(
426
            parachain_config
427
                .prometheus_config
428
                .as_ref()
429
                .map(|config| &config.registry),
430
        );
431

            
432
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
433
            cumulus_client_service::build_network(cumulus_client_service::BuildNetworkParams {
434
                parachain_config,
435
                client: client.clone(),
436
                transaction_pool: transaction_pool.clone(),
437
                spawn_handle,
438
                import_queue,
439
                para_id,
440
                relay_chain_interface,
441
                net_config,
442
                sybil_resistance_level: CollatorSybilResistance::Resistant,
443
                metrics,
444
            })
445
            .await?;
446

            
447
        Ok(NodeBuilder {
448
            client,
449
            backend,
450
            transaction_pool,
451
            telemetry,
452
            telemetry_worker_handle,
453
            task_manager,
454
            keystore_container,
455
            hwbench,
456
            prometheus_registry,
457
            network: Network {
458
                network,
459
                system_rpc_tx,
460
                sync_service,
461
            },
462
            tx_handler_controller,
463
            import_queue_service,
464
        })
465
    }
466

            
467
    /// Given an import queue, calls `sc_service::build_network` and
468
    /// stores the returned objects in `self.network` and `self.tx_handler_controller`.
469
    ///
470
    /// Can only be called once on a `NodeBuilder` that doesn't have yet network
471
    /// data.
472
436
    pub fn build_substrate_network<Net>(
473
436
        self,
474
436
        parachain_config: &Configuration,
475
436
        import_queue: impl ImportQueue<BlockOf<T>> + 'static,
476
436
    ) -> sc_service::error::Result<
477
436
        NodeBuilder<
478
436
            T,
479
436
            Network<BlockOf<T>>,
480
436
            TransactionsHandlerController<BlockHashOf<T>>,
481
436
            ImportQueueServiceOf<T>,
482
436
        >,
483
436
    >
484
436
    where
485
436
        SNetwork: TypeIdentity<Type = ()>,
486
436
        STxHandler: TypeIdentity<Type = ()>,
487
436
        SImportQueueService: TypeIdentity<Type = ()>,
488
436
        Net: sc_network::service::traits::NetworkBackend<BlockOf<T>, BlockHashOf<T>>,
489
    {
490
        let Self {
491
436
            client,
492
436
            backend,
493
436
            transaction_pool,
494
436
            telemetry,
495
436
            telemetry_worker_handle,
496
436
            task_manager,
497
436
            keystore_container,
498
436
            hwbench,
499
436
            prometheus_registry,
500
            network: _,
501
            tx_handler_controller: _,
502
            import_queue_service: _,
503
436
        } = self;
504

            
505
436
        let net_config = FullNetworkConfiguration::<_, _, Net>::new(
506
436
            &parachain_config.network,
507
436
            prometheus_registry.clone(),
508
        );
509

            
510
436
        let metrics = Net::register_notification_metrics(
511
436
            parachain_config
512
436
                .prometheus_config
513
436
                .as_ref()
514
436
                .map(|cfg| &cfg.registry),
515
        );
516

            
517
436
        let import_queue_service = import_queue.service();
518

            
519
436
        let (network, system_rpc_tx, tx_handler_controller, sync_service) =
520
436
            sc_service::build_network(sc_service::BuildNetworkParams {
521
436
                config: parachain_config,
522
436
                client: client.clone(),
523
436
                transaction_pool: transaction_pool.clone(),
524
436
                spawn_handle: task_manager.spawn_handle(),
525
436
                import_queue,
526
436
                warp_sync_config: None,
527
436
                block_announce_validator_builder: None,
528
436
                net_config,
529
436
                block_relay: None,
530
436
                metrics,
531
436
            })?;
532

            
533
436
        Ok(NodeBuilder {
534
436
            client,
535
436
            backend,
536
436
            transaction_pool,
537
436
            telemetry,
538
436
            telemetry_worker_handle,
539
436
            task_manager,
540
436
            keystore_container,
541
436
            hwbench,
542
436
            prometheus_registry,
543
436
            network: Network {
544
436
                network,
545
436
                system_rpc_tx,
546
436
                sync_service,
547
436
            },
548
436
            tx_handler_controller,
549
436
            import_queue_service,
550
436
        })
551
436
    }
552

            
553
    /// Given an `rpc_builder`, spawns the common tasks of a Substrate
554
    /// node. It consumes `self.tx_handler_controller` in the process, which means
555
    /// it can only be called once, and any other code that would need this
556
    /// controller should interact with it before calling this function.
557
436
    pub fn spawn_common_tasks<TRpc>(
558
436
        self,
559
436
        parachain_config: Configuration,
560
436
        rpc_builder: Box<
561
436
            dyn Fn(Arc<(dyn SpawnNamed + 'static)>) -> Result<RpcModule<TRpc>, sc_service::Error>,
562
436
        >,
563
436
    ) -> sc_service::error::Result<NodeBuilder<T, Network<BlockOf<T>>, (), SImportQueueService>>
564
436
    where
565
436
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
566
436
        STxHandler: TypeIdentity<Type = TransactionsHandlerController<BlockHashOf<T>>>,
567
436
        BlockHashOf<T>: Unpin,
568
436
        BlockHeaderOf<T>: Unpin,
569
436
        ConstructedRuntimeApiOf<T>: TaggedTransactionQueue<BlockOf<T>>
570
436
            + BlockBuilder<BlockOf<T>>
571
436
            + OffchainWorkerApi<BlockOf<T>>
572
436
            + sp_api::Metadata<BlockOf<T>>
573
436
            + sp_session::SessionKeys<BlockOf<T>>,
574
    {
575
        let NodeBuilder {
576
436
            client,
577
436
            backend,
578
436
            transaction_pool,
579
436
            mut telemetry,
580
436
            telemetry_worker_handle,
581
436
            mut task_manager,
582
436
            keystore_container,
583
436
            hwbench,
584
436
            prometheus_registry,
585
436
            network,
586
436
            tx_handler_controller,
587
436
            import_queue_service,
588
436
        } = self;
589

            
590
436
        let network = TypeIdentity::into_type(network);
591
436
        let tx_handler_controller = TypeIdentity::into_type(tx_handler_controller);
592

            
593
436
        let collator = parachain_config.role.is_authority();
594

            
595
436
        if parachain_config.offchain_worker.enabled {
596
436
            task_manager.spawn_handle().spawn(
597
                "offchain-workers-runner",
598
                "offchain-work",
599
436
                sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
600
436
                    runtime_api_provider: client.clone(),
601
436
                    keystore: Some(keystore_container.keystore()),
602
436
                    offchain_db: backend.offchain_storage(),
603
436
                    transaction_pool: Some(OffchainTransactionPoolFactory::new(
604
436
                        transaction_pool.clone(),
605
436
                    )),
606
436
                    network_provider: Arc::new(network.network.clone()),
607
436
                    is_validator: parachain_config.role.is_authority(),
608
                    enable_http_requests: false,
609
                    custom_extensions: move |_| vec![],
610
                })?
611
436
                .run(client.clone(), task_manager.spawn_handle())
612
436
                .boxed(),
613
            );
614
        }
615

            
616
436
        let _rpc_handlers = sc_service::spawn_tasks(sc_service::SpawnTasksParams {
617
436
            rpc_builder,
618
436
            client: client.clone(),
619
436
            transaction_pool: transaction_pool.clone(),
620
436
            task_manager: &mut task_manager,
621
436
            config: parachain_config,
622
436
            keystore: keystore_container.keystore(),
623
436
            backend: backend.clone(),
624
436
            network: network.network.clone(),
625
436
            system_rpc_tx: network.system_rpc_tx.clone(),
626
436
            tx_handler_controller,
627
436
            telemetry: telemetry.as_mut(),
628
436
            sync_service: network.sync_service.clone(),
629
436
        })?;
630

            
631
436
        if let Some(hwbench) = &hwbench {
632
            sc_sysinfo::print_hwbench(hwbench);
633
            // Here you can check whether the hardware meets your chains' requirements. Putting a link
634
            // in there and swapping out the requirements for your own are probably a good idea. The
635
            // requirements for a para-chain are dictated by its relay-chain.
636
            if collator {
637
                if let Err(err) = SUBSTRATE_REFERENCE_HARDWARE.check_hardware(hwbench, false) {
638
                    log::warn!(
639
                        "⚠️  The hardware does not meet the minimal requirements {} for role 'Authority'.",
640
                        err
641
                    );
642
                }
643
            }
644

            
645
            if let Some(ref mut telemetry) = telemetry {
646
                let telemetry_handle = telemetry.handle();
647
                task_manager.spawn_handle().spawn(
648
                    "telemetry_hwbench",
649
                    None,
650
                    sc_sysinfo::initialize_hwbench_telemetry(telemetry_handle, hwbench.clone()),
651
                );
652
            }
653
436
        }
654

            
655
436
        Ok(NodeBuilder {
656
436
            client,
657
436
            backend,
658
436
            transaction_pool,
659
436
            telemetry,
660
436
            telemetry_worker_handle,
661
436
            task_manager,
662
436
            keystore_container,
663
436
            hwbench,
664
436
            prometheus_registry,
665
436
            network: TypeIdentity::from_type(network),
666
436
            tx_handler_controller: TypeIdentity::from_type(()),
667
436
            import_queue_service,
668
436
        })
669
436
    }
670

            
671
436
    pub fn install_manual_seal<BI, SC, CIDP>(
672
436
        &mut self,
673
436
        manual_seal_config: ManualSealConfiguration<BlockOf<T>, BI, SC, CIDP>,
674
436
    ) -> sc_service::error::Result<Option<mpsc::Sender<EngineCommand<BlockHashOf<T>>>>>
675
436
    where
676
436
        BI: BlockImport<BlockOf<T>, Error = sp_consensus::Error> + Send + Sync + 'static,
677
436
        SC: SelectChain<BlockOf<T>> + 'static,
678
436
        CIDP: CreateInherentDataProviders<BlockOf<T>, ()> + 'static,
679
    {
680
        let ManualSealConfiguration {
681
436
            sealing,
682
436
            soft_deadline,
683
436
            block_import,
684
436
            select_chain,
685
436
            consensus_data_provider,
686
436
            create_inherent_data_providers,
687
436
        } = manual_seal_config;
688

            
689
436
        let prometheus_registry = self.prometheus_registry.clone();
690

            
691
436
        let mut env = sc_basic_authorship::ProposerFactory::with_proof_recording(
692
436
            self.task_manager.spawn_handle(),
693
436
            self.client.clone(),
694
436
            self.transaction_pool.clone(),
695
436
            prometheus_registry.as_ref(),
696
436
            self.telemetry.as_ref().map(|x| x.handle()),
697
        );
698

            
699
436
        let mut command_sink = None;
700

            
701
436
        if let Some(deadline) = soft_deadline {
702
198
            env.set_soft_deadline(deadline);
703
436
        }
704

            
705
436
        let commands_stream: Box<
706
436
            dyn Stream<Item = EngineCommand<BlockHashOf<T>>> + Send + Sync + Unpin,
707
436
        > = match sealing {
708
            Sealing::Instant => {
709
20
                Box::new(
710
                    // This bit cribbed from the implementation of instant seal.
711
20
                    self.transaction_pool.import_notification_stream().map(|_| {
712
                        EngineCommand::SealNewBlock {
713
                            create_empty: false,
714
                            finalize: false,
715
                            parent_hash: None,
716
                            sender: None,
717
                        }
718
                    }),
719
                )
720
            }
721
            Sealing::Manual => {
722
416
                let (sink, stream) = futures::channel::mpsc::channel(1000);
723
                // Keep a reference to the other end of the channel. It goes to the RPC.
724
416
                command_sink = Some(sink);
725
416
                Box::new(stream)
726
            }
727
            Sealing::Interval(millis) => Box::new(futures::StreamExt::map(
728
                Timer::interval(Duration::from_millis(millis)),
729
                |_| EngineCommand::SealNewBlock {
730
                    create_empty: true,
731
                    finalize: true,
732
                    parent_hash: None,
733
                    sender: None,
734
                },
735
            )),
736
        };
737

            
738
436
        self.task_manager.spawn_essential_handle().spawn_blocking(
739
            "authorship_task",
740
436
            Some("block-authoring"),
741
436
            run_manual_seal(ManualSealParams {
742
436
                block_import,
743
436
                env,
744
436
                client: self.client.clone(),
745
436
                pool: self.transaction_pool.clone(),
746
436
                commands_stream,
747
436
                select_chain,
748
436
                consensus_data_provider,
749
436
                create_inherent_data_providers,
750
436
            }),
751
        );
752

            
753
436
        Ok(command_sink)
754
436
    }
755

            
756
    pub fn start_full_node<RCInterface>(
757
        self,
758
        para_id: ParaId,
759
        relay_chain_interface: RCInterface,
760
        relay_chain_slot_duration: Duration,
761
        start_bootnode_params: StartBootnodeParams,
762
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
763
    where
764
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
765
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
766
        RCInterface: RelayChainInterface + Clone + 'static,
767
        RCInterface: TypeIdentity<Type = Arc<dyn RelayChainInterface + 'static>>,
768
        BlockHashOf<T>: TypeIdentity<Type = H256>,
769
    {
770
        let NodeBuilder {
771
            client,
772
            backend,
773
            transaction_pool,
774
            telemetry,
775
            telemetry_worker_handle,
776
            mut task_manager,
777
            keystore_container,
778
            hwbench,
779
            prometheus_registry,
780
            network,
781
            tx_handler_controller,
782
            import_queue_service,
783
        } = self;
784

            
785
        let network = TypeIdentity::into_type(network);
786
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
787

            
788
        let announce_block = {
789
            let sync_service = network.sync_service.clone();
790
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
791
        };
792

            
793
        let overseer_handle = relay_chain_interface
794
            .overseer_handle()
795
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
796

            
797
        let params = StartFullNodeParams {
798
            client: client.clone(),
799
            announce_block,
800
            task_manager: &mut task_manager,
801
            para_id,
802
            relay_chain_interface: relay_chain_interface.clone(),
803
            relay_chain_slot_duration,
804
            import_queue: import_queue_service,
805
            recovery_handle: Box::new(overseer_handle),
806
            sync_service: network.sync_service.clone(),
807
            prometheus_registry: prometheus_registry.as_ref(),
808
        };
809

            
810
        // TODO: change for async backing
811
        // TODO: to fix deprecation warning, we only need to change
812
        // `start_full_node` to `start_relay_chain_tasks`
813
        #[allow(deprecated)]
814
        cumulus_client_service::start_full_node(params)?;
815

            
816
        let StartBootnodeParams {
817
            relay_chain_fork_id,
818
            parachain_fork_id,
819
            advertise_non_global_ips,
820
            parachain_public_addresses,
821
            relay_chain_network,
822
            paranode_rx,
823
            embedded_dht_bootnode,
824
            dht_bootnode_discovery,
825
        } = start_bootnode_params;
826

            
827
        // Advertise parachain bootnode address in relay chain DHT
828
        start_bootnode_tasks(StartBootnodeTasksParams {
829
            embedded_dht_bootnode,
830
            dht_bootnode_discovery,
831
            para_id,
832
            task_manager: &mut task_manager,
833
            relay_chain_interface: TypeIdentity::into_type(relay_chain_interface),
834
            relay_chain_fork_id,
835
            relay_chain_network,
836
            request_receiver: paranode_rx,
837
            parachain_network: network.network.clone(),
838
            advertise_non_global_ips,
839
            parachain_genesis_hash: TypeIdentity::into_type(client.chain_info().genesis_hash),
840
            parachain_fork_id,
841
            parachain_public_addresses,
842
        });
843

            
844
        Ok(NodeBuilder {
845
            client,
846
            backend,
847
            transaction_pool,
848
            telemetry,
849
            telemetry_worker_handle,
850
            task_manager,
851
            keystore_container,
852
            hwbench,
853
            prometheus_registry,
854
            network: TypeIdentity::from_type(network),
855
            tx_handler_controller,
856
            import_queue_service: (),
857
        })
858
    }
859

            
860
    pub async fn start_collator<RCInterface>(
861
        self,
862
        para_id: ParaId,
863
        relay_chain_interface: RCInterface,
864
        relay_chain_slot_duration: Duration,
865
        parachain_consensus: ParachainConsensusOf<T>,
866
        collator_key: CollatorPair,
867
    ) -> sc_service::error::Result<NodeBuilder<T, SNetwork, STxHandler, ()>>
868
    where
869
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
870
        SImportQueueService: TypeIdentity<Type = ImportQueueServiceOf<T>>,
871
        RCInterface: RelayChainInterface + Clone + 'static,
872
    {
873
        let NodeBuilder {
874
            client,
875
            backend,
876
            transaction_pool,
877
            telemetry,
878
            telemetry_worker_handle,
879
            mut task_manager,
880
            keystore_container,
881
            hwbench,
882
            prometheus_registry,
883
            network,
884
            tx_handler_controller,
885
            import_queue_service,
886
        } = self;
887

            
888
        let network = TypeIdentity::into_type(network);
889
        let import_queue_service = TypeIdentity::into_type(import_queue_service);
890

            
891
        let spawner = task_manager.spawn_handle();
892
        let announce_block = {
893
            let sync_service = network.sync_service.clone();
894
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
895
        };
896
        let overseer_handle = relay_chain_interface
897
            .overseer_handle()
898
            .map_err(|e| sc_service::Error::Application(Box::new(e)))?;
899

            
900
        let params = cumulus_client_service::StartCollatorParams {
901
            para_id,
902
            block_status: client.clone(),
903
            announce_block: announce_block.clone(),
904
            client: client.clone(),
905
            task_manager: &mut task_manager,
906
            relay_chain_interface: relay_chain_interface.clone(),
907
            spawner: spawner.clone(),
908
            parachain_consensus,
909
            import_queue: import_queue_service,
910
            collator_key,
911
            relay_chain_slot_duration,
912
            recovery_handle: Box::new(overseer_handle.clone()),
913
            sync_service: network.sync_service.clone(),
914
            prometheus_registry: prometheus_registry.as_ref(),
915
        };
916

            
917
        // TODO: change for async backing
918
        #[allow(deprecated)]
919
        cumulus_client_service::start_collator(params).await?;
920

            
921
        Ok(NodeBuilder {
922
            client,
923
            backend,
924
            transaction_pool,
925
            telemetry,
926
            telemetry_worker_handle,
927
            task_manager,
928
            keystore_container,
929
            hwbench,
930
            prometheus_registry,
931
            network: TypeIdentity::from_type(network),
932
            tx_handler_controller,
933
            import_queue_service: (),
934
        })
935
    }
936

            
937
    pub fn extract_import_queue_service(
938
        self,
939
    ) -> (
940
        NodeBuilder<T, SNetwork, STxHandler, ()>,
941
        SImportQueueService,
942
    )
943
    where
944
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
945
    {
946
        let NodeBuilder {
947
            client,
948
            backend,
949
            transaction_pool,
950
            telemetry,
951
            telemetry_worker_handle,
952
            task_manager,
953
            keystore_container,
954
            hwbench,
955
            prometheus_registry,
956
            network,
957
            tx_handler_controller,
958
            import_queue_service,
959
        } = self;
960

            
961
        (
962
            NodeBuilder {
963
                client,
964
                backend,
965
                transaction_pool,
966
                telemetry,
967
                telemetry_worker_handle,
968
                task_manager,
969
                keystore_container,
970
                hwbench,
971
                prometheus_registry,
972
                network,
973
                tx_handler_controller,
974
                import_queue_service: (),
975
            },
976
            import_queue_service,
977
        )
978
    }
979

            
980
    pub fn cumulus_client_collator_params_generator(
981
        &self,
982
        para_id: ParaId,
983
        overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
984
        collator_key: CollatorPair,
985
        parachain_consensus: ParachainConsensusOf<T>,
986
    ) -> impl Fn() -> cumulus_client_collator::StartCollatorParams<
987
        BlockOf<T>,
988
        ClientOf<T>,
989
        ClientOf<T>,
990
        SpawnTaskHandle,
991
    > + Send
992
           + Clone
993
           + 'static
994
    where
995
        SNetwork: TypeIdentity<Type = Network<BlockOf<T>>>,
996
    {
997
        let network = TypeIdentity::as_type(&self.network);
998

            
999
        let client = self.client.clone();
        let announce_block = {
            let sync_service = network.sync_service.clone();
            Arc::new(move |hash, data| sync_service.announce_block(hash, data))
        };
        let spawner = self.task_manager.spawn_handle();
        move || cumulus_client_collator::StartCollatorParams {
            runtime_api: client.clone(),
            block_status: client.clone(),
            announce_block: announce_block.clone(),
            overseer_handle: overseer_handle.clone(),
            spawner: spawner.clone(),
            para_id,
            key: collator_key.clone(),
            parachain_consensus: parachain_consensus.clone(),
        }
    }
}
/// Block authoring scheme to be used by the dev service.
#[derive(Debug, Copy, Clone)]
pub enum Sealing {
    /// Author a block immediately upon receiving a transaction into the transaction pool
    Instant,
    /// Author a block upon receiving an RPC command
    Manual,
    /// Author blocks at a regular interval specified in milliseconds
    Interval(u64),
}
impl FromStr for Sealing {
    type Err = String;
1410
    fn from_str(s: &str) -> Result<Self, Self::Err> {
1410
        Ok(match s {
1410
            "instant" => Self::Instant,
1248
            "manual" => Self::Manual,
            s => {
                let millis = s
                    .parse::<u64>()
                    .map_err(|_| "couldn't decode sealing param")?;
                Self::Interval(millis)
            }
        })
1410
    }
}
pub struct ManualSealConfiguration<B, BI, SC, CIDP> {
    pub sealing: Sealing,
    pub block_import: BI,
    pub soft_deadline: Option<Percent>,
    pub select_chain: SC,
    pub consensus_data_provider: Option<Box<dyn ConsensusDataProvider<B, Proof = StorageProof>>>,
    pub create_inherent_data_providers: CIDP,
}