1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    container_chain_template_frontier_runtime::{opaque::Block, Hash, RuntimeApi},
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport,
23
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
24
    cumulus_client_service::{prepare_node_config, ParachainHostFunctions},
25
    cumulus_primitives_core::{
26
        relay_chain::well_known_keys as RelayWellKnownKeys, CollectCollationInfo, ParaId,
27
    },
28
    fc_consensus::FrontierBlockImport,
29
    fc_db::DatabaseSource,
30
    fc_rpc_core::types::{FeeHistoryCache, FilterPool},
31
    fc_storage::StorageOverrideHandler,
32
    nimbus_primitives::NimbusId,
33
    node_common::service::node_builder::{
34
        ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing,
35
    },
36
    parity_scale_codec::Encode,
37
    polkadot_parachain_primitives::primitives::HeadData,
38
    polkadot_primitives::UpgradeGoAhead,
39
    sc_consensus::BasicQueue,
40
    sc_executor::WasmExecutor,
41
    sc_network::NetworkBackend,
42
    sc_service::{Configuration, TFullBackend, TFullClient, TaskManager},
43
    sp_api::ProvideRuntimeApi,
44
    sp_blockchain::HeaderBackend,
45
    sp_consensus_slots::{Slot, SlotDuration},
46
    sp_core::{Pair, H256},
47
    std::{
48
        collections::BTreeMap,
49
        sync::{Arc, Mutex},
50
        time::Duration,
51
    },
52
};
53

            
54
type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
55
type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
56
type ParachainBackend = TFullBackend<Block>;
57
type ParachainBlockImport = TParachainBlockImport<
58
    Block,
59
    FrontierBlockImport<Block, Arc<ParachainClient>, ParachainClient>,
60
    ParachainBackend,
61
>;
62

            
63
pub struct NodeConfig;
64
impl NodeBuilderConfig for NodeConfig {
65
    type Block = Block;
66
    type RuntimeApi = RuntimeApi;
67
    type ParachainExecutor = ParachainExecutor;
68
}
69

            
70
160
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
71
160
    let config_dir = config
72
160
        .base_path
73
160
        .config_dir(config.chain_spec.id())
74
160
        .join("frontier")
75
160
        .join(path);
76

            
77
160
    config_dir
78
160
}
79

            
80
// TODO This is copied from frontier. It should be imported instead after
81
// https://github.com/paritytech/frontier/issues/333 is solved
82
160
pub fn open_frontier_backend<C>(
83
160
    client: Arc<C>,
84
160
    config: &Configuration,
85
160
) -> Result<fc_db::kv::Backend<Block, C>, String>
86
160
where
87
160
    C: sp_blockchain::HeaderBackend<Block>,
88
{
89
160
    fc_db::kv::Backend::<Block, _>::new(
90
160
        client,
91
        &fc_db::kv::DatabaseSettings {
92
160
            source: match config.database {
93
160
                DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
94
160
                    path: frontier_database_dir(config, "db"),
95
160
                    cache_size: 0,
96
160
                },
97
                DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
98
                    path: frontier_database_dir(config, "paritydb"),
99
                },
100
                DatabaseSource::Auto { .. } => DatabaseSource::Auto {
101
                    rocksdb_path: frontier_database_dir(config, "db"),
102
                    paritydb_path: frontier_database_dir(config, "paritydb"),
103
                    cache_size: 0,
104
                },
105
                _ => {
106
                    return Err("Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string())
107
                }
108
            },
109
        },
110
    )
111
160
}
112

            
113
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
114

            
115
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
116
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
117
struct MockTimestampInherentDataProvider;
118
#[async_trait::async_trait]
119
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
120
    async fn provide_inherent_data(
121
        &self,
122
        inherent_data: &mut sp_inherents::InherentData,
123
6216
    ) -> Result<(), sp_inherents::Error> {
124
3108
        TIMESTAMP.with(|x| {
125
3108
            *x.borrow_mut() += container_chain_template_frontier_runtime::SLOT_DURATION;
126
3108
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
127
3108
        })
128
6216
    }
129

            
130
    async fn try_handle_error(
131
        &self,
132
        _identifier: &sp_inherents::InherentIdentifier,
133
        _error: &[u8],
134
    ) -> Option<Result<(), sp_inherents::Error>> {
135
        // The pallet never reports error.
136
        None
137
    }
138
}
139

            
140
160
pub fn import_queue(
141
160
    parachain_config: &Configuration,
142
160
    node_builder: &NodeBuilder<NodeConfig>,
143
160
) -> (ParachainBlockImport, BasicQueue<Block>) {
144
160
    let frontier_block_import =
145
160
        FrontierBlockImport::new(node_builder.client.clone(), node_builder.client.clone());
146

            
147
    // The parachain block import and import queue
148
160
    let block_import = cumulus_client_consensus_common::ParachainBlockImport::new(
149
160
        frontier_block_import,
150
160
        node_builder.backend.clone(),
151
    );
152
160
    let import_queue = nimbus_consensus::import_queue(
153
160
        node_builder.client.clone(),
154
160
        block_import.clone(),
155
        move |_, _| async move {
156
            let time = sp_timestamp::InherentDataProvider::from_system_time();
157

            
158
            Ok((time,))
159
        },
160
160
        &node_builder.task_manager.spawn_essential_handle(),
161
160
        parachain_config.prometheus_registry(),
162
        false,
163
        false,
164
    )
165
160
    .expect("function never fails");
166

            
167
160
    (block_import, import_queue)
168
160
}
169

            
170
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
171
///
172
/// This is the actual implementation that is abstract over the executor and the runtime api.
173
#[sc_tracing::logging::prefix_logs_with("Parachain")]
174
async fn start_node_impl<Net>(
175
    parachain_config: Configuration,
176
    polkadot_config: Configuration,
177
    collator_options: CollatorOptions,
178
    para_id: ParaId,
179
    rpc_config: crate::cli::RpcConfig,
180
    hwbench: Option<sc_sysinfo::HwBench>,
181
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
182
where
183
    Net: NetworkBackend<Block, Hash>,
184
{
185
    let parachain_config = prepare_node_config(parachain_config);
186

            
187
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
188
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
189

            
190
    // Frontier specific stuff
191
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
192
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
193
    let frontier_backend = fc_db::Backend::KeyValue(
194
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
195
    );
196
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
197
    let fee_history_limit = rpc_config.fee_history_limit;
198

            
199
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
200
        fc_mapping_sync::EthereumBlockNotification<Block>,
201
    > = Default::default();
202
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
203

            
204
    let (_, import_queue) = import_queue(&parachain_config, &node_builder);
205

            
206
    // Relay chain interface
207
    let (relay_chain_interface, _collator_key) = node_builder
208
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
209
        .await?;
210

            
211
    // Build cumulus network, allowing to access network-related services.
212
    let node_builder = node_builder
213
        .build_cumulus_network::<_, Net>(
214
            &parachain_config,
215
            para_id,
216
            import_queue,
217
            relay_chain_interface.clone(),
218
        )
219
        .await?;
220

            
221
    let frontier_backend = Arc::new(frontier_backend);
222

            
223
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
224
        task_manager: &node_builder.task_manager,
225
        client: node_builder.client.clone(),
226
        substrate_backend: node_builder.backend.clone(),
227
        frontier_backend: frontier_backend.clone(),
228
        filter_pool: filter_pool.clone(),
229
        overrides: overrides.clone(),
230
        fee_history_limit,
231
        fee_history_cache: fee_history_cache.clone(),
232
        sync_service: node_builder.network.sync_service.clone(),
233
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
234
    });
235

            
236
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
237
        node_builder.task_manager.spawn_handle(),
238
        overrides.clone(),
239
        rpc_config.eth_log_block_cache,
240
        rpc_config.eth_statuses_cache,
241
        node_builder.prometheus_registry.clone(),
242
    ));
243

            
244
    let rpc_builder = {
245
        let client = node_builder.client.clone();
246
        let pool = node_builder.transaction_pool.clone();
247
        let pubsub_notification_sinks = pubsub_notification_sinks;
248
        let network = node_builder.network.network.clone();
249
        let sync = node_builder.network.sync_service.clone();
250
        let filter_pool = filter_pool.clone();
251
        let backend = node_builder.backend.clone();
252
        let max_past_logs = rpc_config.max_past_logs;
253
        let max_block_range = rpc_config.max_block_range;
254
        let overrides = overrides;
255
        let fee_history_cache = fee_history_cache.clone();
256
        let block_data_cache = block_data_cache;
257
        let frontier_backend = frontier_backend.clone();
258

            
259
        Box::new(move |subscription_task_executor| {
260
            let deps = crate::rpc::FullDeps {
261
                backend: backend.clone(),
262
                client: client.clone(),
263
                filter_pool: filter_pool.clone(),
264
                frontier_backend: match &*frontier_backend {
265
                    fc_db::Backend::KeyValue(b) => b.clone(),
266
                    fc_db::Backend::Sql(b) => b.clone(),
267
                },
268
                graph: pool.clone(),
269
                pool: pool.clone(),
270
                max_past_logs,
271
                max_block_range,
272
                fee_history_limit,
273
                fee_history_cache: fee_history_cache.clone(),
274
                network: Arc::new(network.clone()),
275
                sync: sync.clone(),
276
                block_data_cache: block_data_cache.clone(),
277
                overrides: overrides.clone(),
278
                is_authority: false,
279
                command_sink: None,
280
                xcm_senders: None,
281
            };
282
            crate::rpc::create_full(
283
                deps,
284
                subscription_task_executor,
285
                pubsub_notification_sinks.clone(),
286
            )
287
            .map_err(Into::into)
288
        })
289
    };
290

            
291
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
292

            
293
    let relay_chain_slot_duration = Duration::from_secs(6);
294
    let node_builder = node_builder.start_full_node(
295
        para_id,
296
        relay_chain_interface.clone(),
297
        relay_chain_slot_duration,
298
    )?;
299

            
300
    Ok((node_builder.task_manager, node_builder.client))
301
}
302

            
303
/// Start a parachain node.
304
pub async fn start_parachain_node<Net>(
305
    parachain_config: Configuration,
306
    polkadot_config: Configuration,
307
    collator_options: CollatorOptions,
308
    para_id: ParaId,
309
    rpc_config: crate::cli::RpcConfig,
310
    hwbench: Option<sc_sysinfo::HwBench>,
311
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
312
where
313
    Net: NetworkBackend<Block, Hash>,
314
{
315
    start_node_impl::<Net>(
316
        parachain_config,
317
        polkadot_config,
318
        collator_options,
319
        para_id,
320
        rpc_config,
321
        hwbench,
322
    )
323
    .await
324
}
325

            
326
/// Helper function to generate a crypto pair from seed
327
160
fn get_aura_id_from_seed(seed: &str) -> NimbusId {
328
160
    sp_core::sr25519::Pair::from_string(&format!("//{}", seed), None)
329
160
        .expect("static values are valid; qed")
330
160
        .public()
331
160
        .into()
332
160
}
333

            
334
/// Builds a new development service. This service uses manual seal, and mocks
335
/// the parachain inherent.
336
160
pub async fn start_dev_node(
337
160
    parachain_config: Configuration,
338
160
    sealing: Sealing,
339
160
    rpc_config: crate::cli::RpcConfig,
340
160
    para_id: ParaId,
341
160
    hwbench: Option<sc_sysinfo::HwBench>,
342
160
) -> Result<TaskManager, sc_service::error::Error> {
343
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
344
160
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
345

            
346
    // Frontier specific stuff
347
160
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
348
160
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
349
160
    let frontier_backend = fc_db::Backend::KeyValue(
350
160
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
351
    );
352
160
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
353
160
    let fee_history_limit = rpc_config.fee_history_limit;
354

            
355
160
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
356
160
        fc_mapping_sync::EthereumBlockNotification<Block>,
357
160
    > = Default::default();
358
160
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
359

            
360
160
    let (parachain_block_import, import_queue) = import_queue(&parachain_config, &node_builder);
361

            
362
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
363
    // the relaychain)
364
160
    let mut node_builder = node_builder
365
160
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
366
160
            &parachain_config,
367
160
            import_queue,
368
        )?;
369

            
370
160
    let mut command_sink = None;
371
160
    let mut xcm_senders = None;
372

            
373
160
    if parachain_config.role.is_authority() {
374
160
        let client = node_builder.client.clone();
375
160
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
376
160
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
377
160
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
378

            
379
160
        let authorities = vec![get_aura_id_from_seed("alice")];
380

            
381
160
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
382
160
            block_import: parachain_block_import,
383
160
            sealing,
384
160
            soft_deadline: None,
385
160
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
386
160
            consensus_data_provider: Some(Box::new(
387
160
                tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
388
160
                    SlotDuration::from_millis(
389
160
                        container_chain_template_frontier_runtime::SLOT_DURATION,
390
160
                    ),
391
160
                    authorities.clone(),
392
160
                ),
393
160
            )),
394
3108
            create_inherent_data_providers: move |block: H256, ()| {
395
3108
                let current_para_block = client
396
3108
                    .number(block)
397
3108
                    .expect("Header lookup should succeed")
398
3108
                    .expect("Header passed in as parent should be present in backend.");
399

            
400
3108
                let hash = client
401
3108
                    .hash(current_para_block.saturating_sub(1))
402
3108
                    .expect("Hash of the desired block must be present")
403
3108
                    .expect("Hash of the desired block should exist");
404

            
405
3108
                let para_header = client
406
3108
                    .expect_header(hash)
407
3108
                    .expect("Expected parachain header should exist")
408
3108
                    .encode();
409

            
410
3108
                let para_head_data: Vec<u8> = HeadData(para_header).encode();
411
3108
                let client_set_aside_for_cidp = client.clone();
412
3108
                let client_for_xcm = client.clone();
413
3108
                let authorities_for_cidp = authorities.clone();
414
3108
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
415
3108
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
416
3108
                let slot_duration = container_chain_template_frontier_runtime::SLOT_DURATION;
417

            
418
3108
                let mut timestamp = 0u64;
419
3108
                TIMESTAMP.with(|x| {
420
3108
                    timestamp = *x.borrow();
421
3108
                });
422

            
423
3108
                timestamp += slot_duration;
424

            
425
3108
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
426
3108
						timestamp.into(),
427
3108
						SlotDuration::from_millis(slot_duration),
428
                    );
429
3108
                let relay_slot = u64::from(*relay_slot);
430

            
431
3108
                let downward_xcm_receiver = downward_xcm_receiver.clone();
432
3108
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
433

            
434
3108
                async move {
435
3108
                    let mocked_authorities_noting =
436
3108
                        ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
437
3108
                            current_para_block,
438
3108
                            relay_offset: 1000,
439
3108
                            relay_blocks_per_para_block: 2,
440
3108
                            orchestrator_para_id: crate::chain_spec::ORCHESTRATOR,
441
3108
                            container_para_id: para_id,
442
3108
                            authorities: authorities_for_cidp
443
3108
                    };
444

            
445
3108
                    let mut additional_keys = mocked_authorities_noting.get_key_values();
446
3108
                    additional_keys.append(&mut vec![(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode())]);
447

            
448
3108
                    let time = MockTimestampInherentDataProvider;
449
3108
                    let current_para_head = client_set_aside_for_cidp
450
3108
                            .header(block)
451
3108
                            .expect("Header lookup should succeed")
452
3108
                            .expect("Header passed in as parent should be present in backend.");
453
3108
                    let should_send_go_ahead = match client_set_aside_for_cidp
454
3108
                            .runtime_api()
455
3108
                            .collect_collation_info(block, &current_para_head)
456
                    {
457
3108
                            Ok(info) => info.new_validation_code.is_some(),
458
                            Err(e) => {
459
                                    log::error!("Failed to collect collation info: {:?}", e);
460
                                    false
461
                            },
462
                    };
463
3108
                    let mocked_parachain = MockValidationDataInherentDataProvider {
464
3108
                        current_para_block,
465
3108
                        current_para_block_head: None,
466
                        relay_offset: 1000,
467
                        relay_blocks_per_para_block: 2,
468
                        para_blocks_per_relay_epoch: 10,
469
3108
                        relay_randomness_config: (),
470
3108
                        xcm_config: MockXcmConfig::new(
471
3108
                            &*client_for_xcm,
472
3108
                            block,
473
3108
                            Default::default(),
474
                        ),
475
3108
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
476
3108
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
477
3108
                        additional_key_values: Some(additional_keys),
478
3108
                        para_id,
479
3108
                        upgrade_go_ahead: should_send_go_ahead.then(|| {
480
2
                            log::info!(
481
2
                                "Detected pending validation code, sending go-ahead signal."
482
                            );
483
2
                            UpgradeGoAhead::GoAhead
484
2
                        }),
485
                    };
486

            
487
3108
                    Ok((time, mocked_parachain, mocked_authorities_noting))
488
3108
                }
489
3108
            },
490
        })?;
491
    }
492

            
493
160
    let frontier_backend = Arc::new(frontier_backend);
494

            
495
160
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
496
160
        task_manager: &node_builder.task_manager,
497
160
        client: node_builder.client.clone(),
498
160
        substrate_backend: node_builder.backend.clone(),
499
160
        frontier_backend: frontier_backend.clone(),
500
160
        filter_pool: filter_pool.clone(),
501
160
        overrides: overrides.clone(),
502
160
        fee_history_limit,
503
160
        fee_history_cache: fee_history_cache.clone(),
504
160
        sync_service: node_builder.network.sync_service.clone(),
505
160
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
506
160
    });
507

            
508
160
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
509
160
        node_builder.task_manager.spawn_handle(),
510
160
        overrides.clone(),
511
160
        rpc_config.eth_log_block_cache,
512
160
        rpc_config.eth_statuses_cache,
513
160
        node_builder.prometheus_registry.clone(),
514
    ));
515

            
516
160
    let rpc_builder = {
517
160
        let client = node_builder.client.clone();
518
160
        let pool = node_builder.transaction_pool.clone();
519
160
        let pubsub_notification_sinks = pubsub_notification_sinks;
520
160
        let network = node_builder.network.network.clone();
521
160
        let sync = node_builder.network.sync_service.clone();
522
160
        let filter_pool = filter_pool;
523
160
        let frontier_backend = frontier_backend.clone();
524
160
        let backend = node_builder.backend.clone();
525
160
        let max_past_logs = rpc_config.max_past_logs;
526
160
        let max_block_range = rpc_config.max_block_range;
527
160
        let overrides = overrides;
528
160
        let block_data_cache = block_data_cache;
529

            
530
320
        Box::new(move |subscription_task_executor| {
531
320
            let deps = crate::rpc::FullDeps {
532
320
                backend: backend.clone(),
533
320
                client: client.clone(),
534
320
                filter_pool: filter_pool.clone(),
535
320
                frontier_backend: match &*frontier_backend {
536
320
                    fc_db::Backend::KeyValue(b) => b.clone(),
537
                    fc_db::Backend::Sql(b) => b.clone(),
538
                },
539
320
                graph: pool.clone(),
540
320
                pool: pool.clone(),
541
320
                max_past_logs,
542
320
                max_block_range,
543
320
                fee_history_limit,
544
320
                fee_history_cache: fee_history_cache.clone(),
545
320
                network: network.clone(),
546
320
                sync: sync.clone(),
547
320
                block_data_cache: block_data_cache.clone(),
548
320
                overrides: overrides.clone(),
549
                is_authority: false,
550
320
                command_sink: command_sink.clone(),
551
320
                xcm_senders: xcm_senders.clone(),
552
            };
553
320
            crate::rpc::create_full(
554
320
                deps,
555
320
                subscription_task_executor,
556
320
                pubsub_notification_sinks.clone(),
557
            )
558
320
            .map_err(Into::into)
559
320
        })
560
    };
561

            
562
160
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
563

            
564
160
    log::info!("Development Service Ready");
565

            
566
160
    Ok(node_builder.task_manager)
567
160
}