1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use node_common::timestamp::MockTimestampInherentDataProvider;
20
use {
21
    container_chain_template_frontier_runtime::{opaque::Block, Hash, RuntimeApi},
22
    cumulus_client_cli::CollatorOptions,
23
    cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport,
24
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
25
    cumulus_client_service::{prepare_node_config, ParachainHostFunctions},
26
    cumulus_primitives_core::{
27
        relay_chain::well_known_keys as RelayWellKnownKeys, CollectCollationInfo, ParaId,
28
    },
29
    fc_consensus::FrontierBlockImport,
30
    fc_db::DatabaseSource,
31
    fc_rpc_core::types::{FeeHistoryCache, FilterPool},
32
    fc_storage::StorageOverrideHandler,
33
    nimbus_primitives::NimbusId,
34
    node_common::service::node_builder::{
35
        ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing,
36
    },
37
    parity_scale_codec::Encode,
38
    polkadot_parachain_primitives::primitives::HeadData,
39
    polkadot_primitives::UpgradeGoAhead,
40
    sc_consensus::BasicQueue,
41
    sc_executor::WasmExecutor,
42
    sc_network::NetworkBackend,
43
    sc_service::{Configuration, TFullBackend, TFullClient, TaskManager},
44
    sp_api::ProvideRuntimeApi,
45
    sp_blockchain::HeaderBackend,
46
    sp_consensus_slots::{Slot, SlotDuration},
47
    sp_core::{Pair, H256},
48
    std::{
49
        collections::BTreeMap,
50
        sync::{Arc, Mutex},
51
        time::Duration,
52
    },
53
};
54

            
55
type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
56
type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
57
type ParachainBackend = TFullBackend<Block>;
58
type ParachainBlockImport = TParachainBlockImport<
59
    Block,
60
    FrontierBlockImport<Block, Arc<ParachainClient>, ParachainClient>,
61
    ParachainBackend,
62
>;
63

            
64
pub struct NodeConfig;
65
impl NodeBuilderConfig for NodeConfig {
66
    type Block = Block;
67
    type RuntimeApi = RuntimeApi;
68
    type ParachainExecutor = ParachainExecutor;
69
}
70

            
71
const RELAY_CHAIN_SLOT_DURATION_MILLIS: u64 = 6_000;
72

            
73
162
pub fn frontier_database_dir(config: &Configuration, path: &str) -> std::path::PathBuf {
74
162
    let config_dir = config
75
162
        .base_path
76
162
        .config_dir(config.chain_spec.id())
77
162
        .join("frontier")
78
162
        .join(path);
79

            
80
162
    config_dir
81
162
}
82

            
83
// TODO This is copied from frontier. It should be imported instead after
84
// https://github.com/paritytech/frontier/issues/333 is solved
85
162
pub fn open_frontier_backend<C>(
86
162
    client: Arc<C>,
87
162
    config: &Configuration,
88
162
) -> Result<fc_db::kv::Backend<Block, C>, String>
89
162
where
90
162
    C: sp_blockchain::HeaderBackend<Block>,
91
{
92
162
    fc_db::kv::Backend::<Block, _>::new(
93
162
        client,
94
        &fc_db::kv::DatabaseSettings {
95
162
            source: match config.database {
96
162
                DatabaseSource::RocksDb { .. } => DatabaseSource::RocksDb {
97
162
                    path: frontier_database_dir(config, "db"),
98
162
                    cache_size: 0,
99
162
                },
100
                DatabaseSource::ParityDb { .. } => DatabaseSource::ParityDb {
101
                    path: frontier_database_dir(config, "paritydb"),
102
                },
103
                DatabaseSource::Auto { .. } => DatabaseSource::Auto {
104
                    rocksdb_path: frontier_database_dir(config, "db"),
105
                    paritydb_path: frontier_database_dir(config, "paritydb"),
106
                    cache_size: 0,
107
                },
108
                _ => {
109
                    return Err("Supported db sources: `rocksdb` | `paritydb` | `auto`".to_string())
110
                }
111
            },
112
        },
113
    )
114
162
}
115

            
116
162
pub fn import_queue(
117
162
    parachain_config: &Configuration,
118
162
    node_builder: &NodeBuilder<NodeConfig>,
119
162
) -> (ParachainBlockImport, BasicQueue<Block>) {
120
162
    let frontier_block_import =
121
162
        FrontierBlockImport::new(node_builder.client.clone(), node_builder.client.clone());
122

            
123
    // The parachain block import and import queue
124
162
    let block_import = cumulus_client_consensus_common::ParachainBlockImport::new(
125
162
        frontier_block_import,
126
162
        node_builder.backend.clone(),
127
    );
128
162
    let import_queue = nimbus_consensus::import_queue(
129
162
        node_builder.client.clone(),
130
162
        block_import.clone(),
131
        move |_, _| async move {
132
            let time = sp_timestamp::InherentDataProvider::from_system_time();
133

            
134
            Ok((time,))
135
        },
136
162
        &node_builder.task_manager.spawn_essential_handle(),
137
162
        parachain_config.prometheus_registry(),
138
        false,
139
        false,
140
    )
141
162
    .expect("function never fails");
142

            
143
162
    (block_import, import_queue)
144
162
}
145

            
146
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
147
///
148
/// This is the actual implementation that is abstract over the executor and the runtime api.
149
#[sc_tracing::logging::prefix_logs_with("Parachain")]
150
async fn start_node_impl<Net>(
151
    parachain_config: Configuration,
152
    polkadot_config: Configuration,
153
    collator_options: CollatorOptions,
154
    para_id: ParaId,
155
    rpc_config: crate::cli::RpcConfig,
156
    hwbench: Option<sc_sysinfo::HwBench>,
157
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
158
where
159
    Net: NetworkBackend<Block, Hash>,
160
{
161
    let parachain_config = prepare_node_config(parachain_config);
162

            
163
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
164
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
165

            
166
    // Frontier specific stuff
167
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
168
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
169
    let frontier_backend = fc_db::Backend::KeyValue(
170
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
171
    );
172
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
173
    let fee_history_limit = rpc_config.fee_history_limit;
174

            
175
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
176
        fc_mapping_sync::EthereumBlockNotification<Block>,
177
    > = Default::default();
178
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
179

            
180
    let (_, import_queue) = import_queue(&parachain_config, &node_builder);
181

            
182
    // Relay chain interface
183
    let (relay_chain_interface, _collator_key, start_bootnode_params) = node_builder
184
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
185
        .await?;
186

            
187
    // Build cumulus network, allowing to access network-related services.
188
    let node_builder = node_builder
189
        .build_cumulus_network::<_, Net>(
190
            &parachain_config,
191
            para_id,
192
            import_queue,
193
            relay_chain_interface.clone(),
194
        )
195
        .await?;
196

            
197
    let frontier_backend = Arc::new(frontier_backend);
198

            
199
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
200
        task_manager: &node_builder.task_manager,
201
        client: node_builder.client.clone(),
202
        substrate_backend: node_builder.backend.clone(),
203
        frontier_backend: frontier_backend.clone(),
204
        filter_pool: filter_pool.clone(),
205
        overrides: overrides.clone(),
206
        fee_history_limit,
207
        fee_history_cache: fee_history_cache.clone(),
208
        sync_service: node_builder.network.sync_service.clone(),
209
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
210
    });
211

            
212
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
213
        node_builder.task_manager.spawn_handle(),
214
        overrides.clone(),
215
        rpc_config.eth_log_block_cache,
216
        rpc_config.eth_statuses_cache,
217
        node_builder.prometheus_registry.clone(),
218
    ));
219

            
220
    let rpc_builder = {
221
        let client = node_builder.client.clone();
222
        let pool = node_builder.transaction_pool.clone();
223
        let pubsub_notification_sinks = pubsub_notification_sinks;
224
        let network = node_builder.network.network.clone();
225
        let sync = node_builder.network.sync_service.clone();
226
        let filter_pool = filter_pool.clone();
227
        let backend = node_builder.backend.clone();
228
        let max_past_logs = rpc_config.max_past_logs;
229
        let max_block_range = rpc_config.max_block_range;
230
        let overrides = overrides;
231
        let fee_history_cache = fee_history_cache.clone();
232
        let block_data_cache = block_data_cache;
233
        let frontier_backend = frontier_backend.clone();
234

            
235
        Box::new(move |subscription_task_executor| {
236
            let deps = crate::rpc::FullDeps {
237
                backend: backend.clone(),
238
                client: client.clone(),
239
                filter_pool: filter_pool.clone(),
240
                frontier_backend: match &*frontier_backend {
241
                    fc_db::Backend::KeyValue(b) => b.clone(),
242
                    fc_db::Backend::Sql(b) => b.clone(),
243
                },
244
                graph: pool.clone(),
245
                pool: pool.clone(),
246
                max_past_logs,
247
                max_block_range,
248
                fee_history_limit,
249
                fee_history_cache: fee_history_cache.clone(),
250
                network: Arc::new(network.clone()),
251
                sync: sync.clone(),
252
                block_data_cache: block_data_cache.clone(),
253
                overrides: overrides.clone(),
254
                is_authority: false,
255
                command_sink: None,
256
                xcm_senders: None,
257
            };
258
            crate::rpc::create_full(
259
                deps,
260
                subscription_task_executor,
261
                pubsub_notification_sinks.clone(),
262
            )
263
            .map_err(Into::into)
264
        })
265
    };
266

            
267
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
268

            
269
    let relay_chain_slot_duration = Duration::from_secs(6);
270
    let node_builder = node_builder.start_full_node(
271
        para_id,
272
        relay_chain_interface.clone(),
273
        relay_chain_slot_duration,
274
        start_bootnode_params,
275
    )?;
276

            
277
    Ok((node_builder.task_manager, node_builder.client))
278
}
279

            
280
/// Start a parachain node.
281
pub async fn start_parachain_node<Net>(
282
    parachain_config: Configuration,
283
    polkadot_config: Configuration,
284
    collator_options: CollatorOptions,
285
    para_id: ParaId,
286
    rpc_config: crate::cli::RpcConfig,
287
    hwbench: Option<sc_sysinfo::HwBench>,
288
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
289
where
290
    Net: NetworkBackend<Block, Hash>,
291
{
292
    start_node_impl::<Net>(
293
        parachain_config,
294
        polkadot_config,
295
        collator_options,
296
        para_id,
297
        rpc_config,
298
        hwbench,
299
    )
300
    .await
301
}
302

            
303
/// Helper function to generate a crypto pair from seed
304
162
fn get_aura_id_from_seed(seed: &str) -> NimbusId {
305
162
    sp_core::sr25519::Pair::from_string(&format!("//{}", seed), None)
306
162
        .expect("static values are valid; qed")
307
162
        .public()
308
162
        .into()
309
162
}
310

            
311
/// Builds a new development service. This service uses manual seal, and mocks
312
/// the parachain inherent.
313
162
pub async fn start_dev_node(
314
162
    parachain_config: Configuration,
315
162
    sealing: Sealing,
316
162
    rpc_config: crate::cli::RpcConfig,
317
162
    para_id: ParaId,
318
162
    hwbench: Option<sc_sysinfo::HwBench>,
319
162
) -> Result<TaskManager, sc_service::error::Error> {
320
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
321
162
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench)?;
322

            
323
    // Frontier specific stuff
324
162
    let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
325
162
    let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
326
162
    let frontier_backend = fc_db::Backend::KeyValue(
327
162
        open_frontier_backend(node_builder.client.clone(), &parachain_config)?.into(),
328
    );
329
162
    let overrides = Arc::new(StorageOverrideHandler::new(node_builder.client.clone()));
330
162
    let fee_history_limit = rpc_config.fee_history_limit;
331

            
332
162
    let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
333
162
        fc_mapping_sync::EthereumBlockNotification<Block>,
334
162
    > = Default::default();
335
162
    let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
336

            
337
162
    let (parachain_block_import, import_queue) = import_queue(&parachain_config, &node_builder);
338

            
339
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
340
    // the relaychain)
341
162
    let mut node_builder = node_builder
342
162
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
343
162
            &parachain_config,
344
162
            import_queue,
345
        )?;
346

            
347
162
    let mut command_sink = None;
348
162
    let mut xcm_senders = None;
349

            
350
162
    if parachain_config.role.is_authority() {
351
162
        let client = node_builder.client.clone();
352
162
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
353
162
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
354
162
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
355

            
356
162
        let authorities = vec![get_aura_id_from_seed("alice")];
357

            
358
162
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
359
162
            block_import: parachain_block_import,
360
162
            sealing,
361
162
            soft_deadline: None,
362
162
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
363
162
            consensus_data_provider: Some(Box::new(
364
162
                tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
365
162
                    SlotDuration::from_millis(
366
162
                        container_chain_template_frontier_runtime::SLOT_DURATION,
367
162
                    ),
368
162
                    authorities.clone(),
369
162
                ),
370
162
            )),
371
3108
            create_inherent_data_providers: move |block: H256, ()| {
372
3108
                MockTimestampInherentDataProvider::advance_timestamp(
373
                    RELAY_CHAIN_SLOT_DURATION_MILLIS,
374
                );
375

            
376
3108
                let current_para_block = client
377
3108
                    .number(block)
378
3108
                    .expect("Header lookup should succeed")
379
3108
                    .expect("Header passed in as parent should be present in backend.");
380

            
381
3108
                let hash = client
382
3108
                    .hash(current_para_block.saturating_sub(1))
383
3108
                    .expect("Hash of the desired block must be present")
384
3108
                    .expect("Hash of the desired block should exist");
385

            
386
3108
                let para_header = client
387
3108
                    .expect_header(hash)
388
3108
                    .expect("Expected parachain header should exist")
389
3108
                    .encode();
390

            
391
3108
                let para_head_data: Vec<u8> = HeadData(para_header).encode();
392
3108
                let client_set_aside_for_cidp = client.clone();
393
3108
                let client_for_xcm = client.clone();
394
3108
                let authorities_for_cidp = authorities.clone();
395
3108
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
396
3108
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
397

            
398
                // Get the mocked timestamp
399
3108
                let timestamp = MockTimestampInherentDataProvider::load();
400
                // Calculate mocked slot number
401
3108
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
402
3108
                    timestamp.into(),
403
3108
                    SlotDuration::from_millis(RELAY_CHAIN_SLOT_DURATION_MILLIS),
404
                );
405
3108
                let relay_slot = u64::from(*relay_slot);
406

            
407
3108
                let downward_xcm_receiver = downward_xcm_receiver.clone();
408
3108
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
409

            
410
3108
                async move {
411
3108
                    let mocked_authorities_noting =
412
3108
                        ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
413
3108
                            current_para_block,
414
3108
                            relay_offset: 1000,
415
3108
                            relay_blocks_per_para_block: 2,
416
3108
                            orchestrator_para_id: container_chain_template_frontier_runtime::genesis_config_presets::ORCHESTRATOR,
417
3108
                            container_para_id: para_id,
418
3108
                            authorities: authorities_for_cidp,
419
3108
                        };
420

            
421
3108
                    let mut additional_keys = mocked_authorities_noting.get_key_values();
422
3108
                    additional_keys.append(&mut vec![(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode())]);
423

            
424
3108
                    let time = MockTimestampInherentDataProvider;
425
3108
                    let current_para_head = client_set_aside_for_cidp
426
3108
                        .header(block)
427
3108
                        .expect("Header lookup should succeed")
428
3108
                        .expect("Header passed in as parent should be present in backend.");
429
3108
                    let should_send_go_ahead = match client_set_aside_for_cidp
430
3108
                        .runtime_api()
431
3108
                        .collect_collation_info(block, &current_para_head)
432
                    {
433
3108
                        Ok(info) => info.new_validation_code.is_some(),
434
                        Err(e) => {
435
                            log::error!("Failed to collect collation info: {:?}", e);
436
                            false
437
                        }
438
                    };
439
3108
                    let mocked_parachain = MockValidationDataInherentDataProvider {
440
3108
                        current_para_block,
441
3108
                        current_para_block_head: None,
442
                        relay_offset: 1000,
443
                        relay_blocks_per_para_block: 2,
444
                        para_blocks_per_relay_epoch: 10,
445
3108
                        relay_randomness_config: (),
446
3108
                        xcm_config: MockXcmConfig::new(
447
3108
                            &*client_for_xcm,
448
3108
                            block,
449
3108
                            Default::default(),
450
                        ),
451
3108
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
452
3108
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
453
3108
                        additional_key_values: Some(additional_keys),
454
3108
                        para_id,
455
3108
                        upgrade_go_ahead: should_send_go_ahead.then(|| {
456
2
                            log::info!(
457
2
                                "Detected pending validation code, sending go-ahead signal."
458
                            );
459
2
                            UpgradeGoAhead::GoAhead
460
2
                        }),
461
                    };
462

            
463
3108
                    Ok((time, mocked_parachain, mocked_authorities_noting))
464
3108
                }
465
3108
            },
466
        })?;
467
    }
468

            
469
162
    let frontier_backend = Arc::new(frontier_backend);
470

            
471
162
    crate::rpc::spawn_essential_tasks(crate::rpc::SpawnTasksParams {
472
162
        task_manager: &node_builder.task_manager,
473
162
        client: node_builder.client.clone(),
474
162
        substrate_backend: node_builder.backend.clone(),
475
162
        frontier_backend: frontier_backend.clone(),
476
162
        filter_pool: filter_pool.clone(),
477
162
        overrides: overrides.clone(),
478
162
        fee_history_limit,
479
162
        fee_history_cache: fee_history_cache.clone(),
480
162
        sync_service: node_builder.network.sync_service.clone(),
481
162
        pubsub_notification_sinks: pubsub_notification_sinks.clone(),
482
162
    });
483

            
484
162
    let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
485
162
        node_builder.task_manager.spawn_handle(),
486
162
        overrides.clone(),
487
162
        rpc_config.eth_log_block_cache,
488
162
        rpc_config.eth_statuses_cache,
489
162
        node_builder.prometheus_registry.clone(),
490
    ));
491

            
492
162
    let rpc_builder = {
493
162
        let client = node_builder.client.clone();
494
162
        let pool = node_builder.transaction_pool.clone();
495
162
        let pubsub_notification_sinks = pubsub_notification_sinks;
496
162
        let network = node_builder.network.network.clone();
497
162
        let sync = node_builder.network.sync_service.clone();
498
162
        let filter_pool = filter_pool;
499
162
        let frontier_backend = frontier_backend.clone();
500
162
        let backend = node_builder.backend.clone();
501
162
        let max_past_logs = rpc_config.max_past_logs;
502
162
        let max_block_range = rpc_config.max_block_range;
503
162
        let overrides = overrides;
504
162
        let block_data_cache = block_data_cache;
505

            
506
324
        Box::new(move |subscription_task_executor| {
507
324
            let deps = crate::rpc::FullDeps {
508
324
                backend: backend.clone(),
509
324
                client: client.clone(),
510
324
                filter_pool: filter_pool.clone(),
511
324
                frontier_backend: match &*frontier_backend {
512
324
                    fc_db::Backend::KeyValue(b) => b.clone(),
513
                    fc_db::Backend::Sql(b) => b.clone(),
514
                },
515
324
                graph: pool.clone(),
516
324
                pool: pool.clone(),
517
324
                max_past_logs,
518
324
                max_block_range,
519
324
                fee_history_limit,
520
324
                fee_history_cache: fee_history_cache.clone(),
521
324
                network: network.clone(),
522
324
                sync: sync.clone(),
523
324
                block_data_cache: block_data_cache.clone(),
524
324
                overrides: overrides.clone(),
525
                is_authority: false,
526
324
                command_sink: command_sink.clone(),
527
324
                xcm_senders: xcm_senders.clone(),
528
            };
529
324
            crate::rpc::create_full(
530
324
                deps,
531
324
                subscription_task_executor,
532
324
                pubsub_notification_sinks.clone(),
533
            )
534
324
            .map_err(Into::into)
535
324
        })
536
    };
537

            
538
162
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
539

            
540
162
    log::info!("Development Service Ready");
541

            
542
162
    Ok(node_builder.task_manager)
543
162
}