1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
#[allow(deprecated)]
20
use {
21
    container_chain_template_frontier_runtime::{opaque::Block, RuntimeApi},
22
    cumulus_client_cli::CollatorOptions,
23
    cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport,
24
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
25
    cumulus_client_service::{prepare_node_config, ParachainHostFunctions},
26
    cumulus_primitives_core::{relay_chain::well_known_keys as RelayWellKnownKeys, ParaId},
27
    fc_consensus::FrontierBlockImport,
28
    fc_db::DatabaseSource,
29
    fc_rpc_core::types::{FeeHistoryCache, FilterPool},
30
    fc_storage::StorageOverrideHandler,
31
    nimbus_primitives::NimbusId,
32
    node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing},
33
    parity_scale_codec::Encode,
34
    polkadot_parachain_primitives::primitives::HeadData,
35
    sc_consensus::BasicQueue,
36
    sc_executor::WasmExecutor,
37
    sc_service::{Configuration, TFullBackend, TFullClient, TaskManager},
38
    sp_blockchain::HeaderBackend,
39
    sp_consensus_slots::{Slot, SlotDuration},
40
    sp_core::{Pair, H256},
41
    std::{
42
        collections::BTreeMap,
43
        sync::{Arc, Mutex},
44
        time::Duration,
45
    },
46
};
47

            
48
type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
49
type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
50
type ParachainBackend = TFullBackend<Block>;
51
type ParachainBlockImport = TParachainBlockImport<
52
    Block,
53
    FrontierBlockImport<Block, Arc<ParachainClient>, ParachainClient>,
54
    ParachainBackend,
55
>;
56

            
57
pub struct NodeConfig;
58
impl NodeBuilderConfig for NodeConfig {
59
    type Block = Block;
60
    type RuntimeApi = RuntimeApi;
61
    type ParachainExecutor = ParachainExecutor;
62
}
63

            
64
138
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
65
138
    let config_dir = config
66
138
        .base_path
67
138
        .config_dir(config.chain_spec.id())
68
138
        .join("frontier")
69
138
        .join(path);
70
138

            
71
138
    config_dir
72
138
}
73

            
74
// TODO This is copied from frontier. It should be imported instead after
75
// https://github.com/paritytech/frontier/issues/333 is solved
76
138
pub fn open_frontier_backend<C>(
77
138
    client: Arc<C>,
78
138
    config: &Configuration,
79
138
) -> Result<fc_db::kv::Backend<Block, C>, String>
80
138
where
81
138
    C: sp_blockchain::HeaderBackend<Block>,
82
138
{
83
138
    fc_db::kv::Backend::<Block, _>::new(
84
138
        client,
85
138
        &fc_db::kv::DatabaseSettings {
86
138
            source: match config.database {
87
138
                DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
88
138
                    path: frontier_database_dir(config, "db"),
89
138
                    cache_size: 0,
90
138
                },
91
                DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
92
                    path: frontier_database_dir(config, "paritydb"),
93
                },
94
                DatabaseSource::Auto { .. } => DatabaseSource::Auto {
95
                    rocksdb_path: frontier_database_dir(config, "db"),
96
                    paritydb_path: frontier_database_dir(config, "paritydb"),
97
                    cache_size: 0,
98
                },
99
                _ => {
100
                    return Err("Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string())
101
                }
102
            },
103
        },
104
    )
105
138
}
106

            
107
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
108

            
109
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
110
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
111
struct MockTimestampInherentDataProvider;
112
#[async_trait::async_trait]
113
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
114
    async fn provide_inherent_data(
115
        &self,
116
        inherent_data: &mut sp_inherents::InherentData,
117
940
    ) -> Result<(), sp_inherents::Error> {
118
940
        TIMESTAMP.with(|x| {
119
940
            *x.borrow_mut() += container_chain_template_frontier_runtime::SLOT_DURATION;
120
940
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
121
940
        })
122
1880
    }
123

            
124
    async fn try_handle_error(
125
        &self,
126
        _identifier: &sp_inherents::InherentIdentifier,
127
        _error: &[u8],
128
    ) -> Option<Result<(), sp_inherents::Error>> {
129
        // The pallet never reports error.
130
        None
131
    }
132
}
133

            
134
138
pub fn import_queue(
135
138
    parachain_config: &Configuration,
136
138
    node_builder: &NodeBuilder<NodeConfig>,
137
138
) -> (ParachainBlockImport, BasicQueue<Block>) {
138
138
    let frontier_block_import =
139
138
        FrontierBlockImport::new(node_builder.client.clone(), node_builder.client.clone());
140
138

            
141
138
    // The parachain block import and import queue
142
138
    let block_import = cumulus_client_consensus_common::ParachainBlockImport::new(
143
138
        frontier_block_import,
144
138
        node_builder.backend.clone(),
145
138
    );
146
138
    let import_queue = nimbus_consensus::import_queue(
147
138
        node_builder.client.clone(),
148
138
        block_import.clone(),
149
138
        move |_, _| async move {
150
            let time = sp_timestamp::InherentDataProvider::from_system_time();
151

            
152
            Ok((time,))
153
138
        },
154
138
        &node_builder.task_manager.spawn_essential_handle(),
155
138
        parachain_config.prometheus_registry(),
156
138
        false,
157
138
    )
158
138
    .expect("function never fails");
159
138

            
160
138
    (block_import, import_queue)
161
138
}
162

            
163
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
164
///
165
/// This is the actual implementation that is abstract over the executor and the runtime api.
166
#[sc_tracing::logging::prefix_logs_with("Parachain")]
167
async fn start_node_impl(
168
    parachain_config: Configuration,
169
    polkadot_config: Configuration,
170
    collator_options: CollatorOptions,
171
    para_id: ParaId,
172
    rpc_config: crate::cli::RpcConfig,
173
    hwbench: Option<sc_sysinfo::HwBench>,
174
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
175
    let parachain_config = prepare_node_config(parachain_config);
176

            
177
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
178
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
179

            
180
    // Frontier specific stuff
181
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
182
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
183
    let frontier_backend = fc_db::Backend::KeyValue(
184
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
185
    );
186
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
187
    let fee_history_limit = rpc_config.fee_history_limit;
188

            
189
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
190
        fc_mapping_sync::EthereumBlockNotification<Block>,
191
    > = Default::default();
192
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
193

            
194
    let (_, import_queue) = import_queue(&parachain_config, &node_builder);
195

            
196
    // Relay chain interface
197
    let (relay_chain_interface, _collator_key) = node_builder
198
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
199
        .await?;
200

            
201
    // Build cumulus network, allowing to access network-related services.
202
    let node_builder = node_builder
203
        .build_cumulus_network::<_, sc_network::NetworkWorker<_, _>>(
204
            &parachain_config,
205
            para_id,
206
            import_queue,
207
            relay_chain_interface.clone(),
208
        )
209
        .await?;
210

            
211
    let frontier_backend = Arc::new(frontier_backend);
212

            
213
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
214
        task_manager: &node_builder.task_manager,
215
        client: node_builder.client.clone(),
216
        substrate_backend: node_builder.backend.clone(),
217
        frontier_backend: frontier_backend.clone(),
218
        filter_pool: filter_pool.clone(),
219
        overrides: overrides.clone(),
220
        fee_history_limit,
221
        fee_history_cache: fee_history_cache.clone(),
222
        sync_service: node_builder.network.sync_service.clone(),
223
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
224
    });
225

            
226
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
227
        node_builder.task_manager.spawn_handle(),
228
        overrides.clone(),
229
        rpc_config.eth_log_block_cache,
230
        rpc_config.eth_statuses_cache,
231
        node_builder.prometheus_registry.clone(),
232
    ));
233

            
234
    let rpc_builder = {
235
        let client = node_builder.client.clone();
236
        let pool = node_builder.transaction_pool.clone();
237
        let pubsub_notification_sinks = pubsub_notification_sinks;
238
        let network = node_builder.network.network.clone();
239
        let sync = node_builder.network.sync_service.clone();
240
        let filter_pool = filter_pool.clone();
241
        let backend = node_builder.backend.clone();
242
        let max_past_logs = rpc_config.max_past_logs;
243
        let overrides = overrides;
244
        let fee_history_cache = fee_history_cache.clone();
245
        let block_data_cache = block_data_cache;
246
        let frontier_backend = frontier_backend.clone();
247

            
248
        Box::new(move |deny_unsafe, subscription_task_executor| {
249
            let deps = crate::rpc::FullDeps {
250
                backend: backend.clone(),
251
                client: client.clone(),
252
                deny_unsafe,
253
                filter_pool: filter_pool.clone(),
254
                frontier_backend: match &*frontier_backend {
255
                    fc_db::Backend::KeyValue(b) => b.clone(),
256
                    fc_db::Backend::Sql(b) => b.clone(),
257
                },
258
                graph: pool.pool().clone(),
259
                pool: pool.clone(),
260
                max_past_logs,
261
                fee_history_limit,
262
                fee_history_cache: fee_history_cache.clone(),
263
                network: Arc::new(network.clone()),
264
                sync: sync.clone(),
265
                block_data_cache: block_data_cache.clone(),
266
                overrides: overrides.clone(),
267
                is_authority: false,
268
                command_sink: None,
269
                xcm_senders: None,
270
            };
271
            crate::rpc::create_full(
272
                deps,
273
                subscription_task_executor,
274
                pubsub_notification_sinks.clone(),
275
            )
276
            .map_err(Into::into)
277
        })
278
    };
279

            
280
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
281

            
282
    let relay_chain_slot_duration = Duration::from_secs(6);
283
    let node_builder = node_builder.start_full_node(
284
        para_id,
285
        relay_chain_interface.clone(),
286
        relay_chain_slot_duration,
287
    )?;
288

            
289
    node_builder.network.start_network.start_network();
290

            
291
    Ok((node_builder.task_manager, node_builder.client))
292
}
293

            
294
/// Start a parachain node.
295
pub async fn start_parachain_node(
296
    parachain_config: Configuration,
297
    polkadot_config: Configuration,
298
    collator_options: CollatorOptions,
299
    para_id: ParaId,
300
    rpc_config: crate::cli::RpcConfig,
301
    hwbench: Option<sc_sysinfo::HwBench>,
302
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)> {
303
    start_node_impl(
304
        parachain_config,
305
        polkadot_config,
306
        collator_options,
307
        para_id,
308
        rpc_config,
309
        hwbench,
310
    )
311
    .await
312
}
313

            
314
/// Helper function to generate a crypto pair from seed
315
138
fn get_aura_id_from_seed(seed: &str) -> NimbusId {
316
138
    sp_core::sr25519::Pair::from_string(&format!("//{}", seed), None)
317
138
        .expect("static values are valid; qed")
318
138
        .public()
319
138
        .into()
320
138
}
321

            
322
/// Builds a new development service. This service uses manual seal, and mocks
323
/// the parachain inherent.
324
138
pub async fn start_dev_node(
325
138
    parachain_config: Configuration,
326
138
    sealing: Sealing,
327
138
    rpc_config: crate::cli::RpcConfig,
328
138
    para_id: ParaId,
329
138
    hwbench: Option<sc_sysinfo::HwBench>,
330
138
) -> Result<TaskManager, sc_service::error::Error> {
331
    // TODO: Not present before, is this wanted and was forgotten?
332
    // let parachain_config = prepare_node_config(parachain_config);
333

            
334
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
335
138
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
336

            
337
    // Frontier specific stuff
338
138
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
339
138
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
340
138
    let frontier_backend = fc_db::Backend::KeyValue(
341
138
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
342
138
    );
343
138
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
344
138
    let fee_history_limit = rpc_config.fee_history_limit;
345
138

            
346
138
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
347
138
        fc_mapping_sync::EthereumBlockNotification<Block>,
348
138
    > = Default::default();
349
138
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
350
138

            
351
138
    let (parachain_block_import, import_queue) = import_queue(&parachain_config, &node_builder);
352

            
353
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
354
    // the relaychain)
355
138
    let mut node_builder = node_builder
356
138
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
357
138
            &parachain_config,
358
138
            import_queue,
359
138
        )?;
360

            
361
138
    let mut command_sink = None;
362
138
    let mut xcm_senders = None;
363
138

            
364
138
    if parachain_config.role.is_authority() {
365
138
        let client = node_builder.client.clone();
366
138
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
367
138
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
368
138
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
369
138

            
370
138
        let authorities = vec![get_aura_id_from_seed("alice")];
371

            
372
138
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
373
138
            block_import: parachain_block_import,
374
138
            sealing,
375
138
            soft_deadline: None,
376
138
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
377
138
            consensus_data_provider: Some(Box::new(
378
138
                tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
379
138
                    SlotDuration::from_millis(
380
138
                        container_chain_template_frontier_runtime::SLOT_DURATION,
381
138
                    ),
382
138
                    authorities.clone(),
383
138
                ),
384
138
            )),
385
940
            create_inherent_data_providers: move |block: H256, ()| {
386
940
                let current_para_block = client
387
940
                    .number(block)
388
940
                    .expect("Header lookup should succeed")
389
940
                    .expect("Header passed in as parent should be present in backend.");
390
940

            
391
940
                let hash = client
392
940
                    .hash(current_para_block.saturating_sub(1))
393
940
                    .expect("Hash of the desired block must be present")
394
940
                    .expect("Hash of the desired block should exist");
395
940

            
396
940
                let para_header = client
397
940
                    .expect_header(hash)
398
940
                    .expect("Expected parachain header should exist")
399
940
                    .encode();
400
940

            
401
940
                let para_head_data: Vec<u8> = HeadData(para_header).encode();
402
940
                let client_for_xcm = client.clone();
403
940
                let authorities_for_cidp = authorities.clone();
404
940
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
405
940
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
406
940
                let slot_duration = container_chain_template_frontier_runtime::SLOT_DURATION;
407
940

            
408
940
                let mut timestamp = 0u64;
409
940
                TIMESTAMP.with(|x| {
410
940
                    timestamp = x.clone().take();
411
940
                });
412
940

            
413
940
                timestamp += slot_duration;
414
940

            
415
940
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
416
940
						timestamp.into(),
417
940
						SlotDuration::from_millis(slot_duration),
418
940
                    );
419
940
                let relay_slot = u64::from(*relay_slot);
420
940

            
421
940
                let downward_xcm_receiver = downward_xcm_receiver.clone();
422
940
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
423

            
424
940
                async move {
425
940
                    let mocked_authorities_noting =
426
940
                        ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
427
940
                            current_para_block,
428
940
                            relay_offset: 1000,
429
940
                            relay_blocks_per_para_block: 2,
430
940
                            orchestrator_para_id: crate::chain_spec::ORCHESTRATOR,
431
940
                            container_para_id: para_id,
432
940
                            authorities: authorities_for_cidp
433
940
                    };
434
940

            
435
940
                    let mut additional_keys = mocked_authorities_noting.get_key_values();
436
940
                    additional_keys.append(&mut vec![(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode())]);
437
940

            
438
940
                    let time = MockTimestampInherentDataProvider;
439
940
                    let mocked_parachain = MockValidationDataInherentDataProvider {
440
940
                        current_para_block,
441
940
                        current_para_block_head: None,
442
940
                        relay_offset: 1000,
443
940
                        relay_blocks_per_para_block: 2,
444
940
                        // TODO: Recheck
445
940
                        para_blocks_per_relay_epoch: 10,
446
940
                        relay_randomness_config: (),
447
940
                        xcm_config: MockXcmConfig::new(
448
940
                            &*client_for_xcm,
449
940
                            block,
450
940
                            Default::default(),
451
940
                        ),
452
940
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
453
940
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
454
940
                        additional_key_values: Some(additional_keys),
455
940
                        para_id,
456
940
                    };
457
940

            
458
940
                    Ok((time, mocked_parachain, mocked_authorities_noting))
459
940
                }
460
940
            },
461
138
        })?;
462
    }
463

            
464
138
    let frontier_backend = Arc::new(frontier_backend);
465
138

            
466
138
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
467
138
        task_manager: &node_builder.task_manager,
468
138
        client: node_builder.client.clone(),
469
138
        substrate_backend: node_builder.backend.clone(),
470
138
        frontier_backend: frontier_backend.clone(),
471
138
        filter_pool: filter_pool.clone(),
472
138
        overrides: overrides.clone(),
473
138
        fee_history_limit,
474
138
        fee_history_cache: fee_history_cache.clone(),
475
138
        sync_service: node_builder.network.sync_service.clone(),
476
138
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
477
138
    });
478
138

            
479
138
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
480
138
        node_builder.task_manager.spawn_handle(),
481
138
        overrides.clone(),
482
138
        rpc_config.eth_log_block_cache,
483
138
        rpc_config.eth_statuses_cache,
484
138
        node_builder.prometheus_registry.clone(),
485
138
    ));
486
138

            
487
138
    let rpc_builder = {
488
138
        let client = node_builder.client.clone();
489
138
        let pool = node_builder.transaction_pool.clone();
490
138
        let pubsub_notification_sinks = pubsub_notification_sinks;
491
138
        let network = node_builder.network.network.clone();
492
138
        let sync = node_builder.network.sync_service.clone();
493
138
        let filter_pool = filter_pool;
494
138
        let frontier_backend = frontier_backend.clone();
495
138
        let backend = node_builder.backend.clone();
496
138
        let max_past_logs = rpc_config.max_past_logs;
497
138
        let overrides = overrides;
498
138
        let block_data_cache = block_data_cache;
499
138

            
500
276
        Box::new(move |deny_unsafe, subscription_task_executor| {
501
276
            let deps = crate::rpc::FullDeps {
502
276
                backend: backend.clone(),
503
276
                client: client.clone(),
504
276
                deny_unsafe,
505
276
                filter_pool: filter_pool.clone(),
506
276
                frontier_backend: match &*frontier_backend {
507
276
                    fc_db::Backend::KeyValue(b) => b.clone(),
508
                    fc_db::Backend::Sql(b) => b.clone(),
509
                },
510
276
                graph: pool.pool().clone(),
511
276
                pool: pool.clone(),
512
276
                max_past_logs,
513
276
                fee_history_limit,
514
276
                fee_history_cache: fee_history_cache.clone(),
515
276
                network: network.clone(),
516
276
                sync: sync.clone(),
517
276
                block_data_cache: block_data_cache.clone(),
518
276
                overrides: overrides.clone(),
519
276
                is_authority: false,
520
276
                command_sink: command_sink.clone(),
521
276
                xcm_senders: xcm_senders.clone(),
522
276
            };
523
276
            crate::rpc::create_full(
524
276
                deps,
525
276
                subscription_task_executor,
526
276
                pubsub_notification_sinks.clone(),
527
276
            )
528
276
            .map_err(Into::into)
529
276
        })
530
    };
531

            
532
138
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
533

            
534
138
    log::info!("Development Service Ready");
535

            
536
138
    node_builder.network.start_network.start_network();
537
138
    Ok(node_builder.task_manager)
538
138
}