1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    container_chain_template_simple_runtime::Hash,
21
    container_chain_template_simple_runtime::{opaque::Block, RuntimeApi},
22
    cumulus_client_cli::CollatorOptions,
23
    cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport,
24
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
25
    cumulus_client_service::{prepare_node_config, ParachainHostFunctions},
26
    cumulus_primitives_core::{
27
        relay_chain::well_known_keys as RelayWellKnownKeys, CollectCollationInfo, ParaId,
28
    },
29
    nimbus_primitives::NimbusId,
30
    node_common::service::node_builder::{
31
        ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing,
32
    },
33
    parity_scale_codec::Encode,
34
    polkadot_parachain_primitives::primitives::HeadData,
35
    polkadot_primitives::UpgradeGoAhead,
36
    sc_consensus::BasicQueue,
37
    sc_executor::WasmExecutor,
38
    sc_network::NetworkBackend,
39
    sc_service::{Configuration, TFullBackend, TFullClient, TaskManager},
40
    sp_api::ProvideRuntimeApi,
41
    sp_blockchain::HeaderBackend,
42
    sp_consensus_slots::{Slot, SlotDuration},
43
    sp_core::{Pair, H256},
44
    std::{sync::Arc, time::Duration},
45
};
46

            
47
type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
48
type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
49
type ParachainBackend = TFullBackend<Block>;
50
type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
51

            
52
pub struct NodeConfig;
53
impl NodeBuilderConfig for NodeConfig {
54
    type Block = Block;
55
    type RuntimeApi = RuntimeApi;
56
    type ParachainExecutor = ParachainExecutor;
57
}
58

            
59
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
60

            
61
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
62
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
63
struct MockTimestampInherentDataProvider;
64
#[async_trait::async_trait]
65
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
66
    async fn provide_inherent_data(
67
        &self,
68
        inherent_data: &mut sp_inherents::InherentData,
69
1200
    ) -> Result<(), sp_inherents::Error> {
70
600
        TIMESTAMP.with(|x| {
71
600
            *x.borrow_mut() += container_chain_template_simple_runtime::SLOT_DURATION;
72
600
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
73
600
        })
74
1200
    }
75

            
76
    async fn try_handle_error(
77
        &self,
78
        _identifier: &sp_inherents::InherentIdentifier,
79
        _error: &[u8],
80
    ) -> Option<Result<(), sp_inherents::Error>> {
81
        // The pallet never reports error.
82
        None
83
    }
84
}
85

            
86
78
pub fn import_queue(
87
78
    parachain_config: &Configuration,
88
78
    node_builder: &NodeBuilder<NodeConfig>,
89
78
) -> (ParachainBlockImport, BasicQueue<Block>) {
90
    // The nimbus import queue ONLY checks the signature correctness
91
    // Any other checks corresponding to the author-correctness should be done
92
    // in the runtime
93
78
    let block_import =
94
78
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
95

            
96
78
    let import_queue = nimbus_consensus::import_queue(
97
78
        node_builder.client.clone(),
98
78
        block_import.clone(),
99
        move |_, _| async move {
100
            let time = sp_timestamp::InherentDataProvider::from_system_time();
101

            
102
            Ok((time,))
103
        },
104
78
        &node_builder.task_manager.spawn_essential_handle(),
105
78
        parachain_config.prometheus_registry(),
106
        false,
107
        false,
108
    )
109
78
    .expect("function never fails");
110

            
111
78
    (block_import, import_queue)
112
78
}
113

            
114
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
115
///
116
/// This is the actual implementation that is abstract over the executor and the runtime api.
117
#[sc_tracing::logging::prefix_logs_with("Parachain")]
118
pub async fn start_parachain_node<Net>(
119
    parachain_config: Configuration,
120
    polkadot_config: Configuration,
121
    collator_options: CollatorOptions,
122
    para_id: ParaId,
123
    hwbench: Option<sc_sysinfo::HwBench>,
124
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
125
where
126
    Net: NetworkBackend<Block, Hash>,
127
{
128
    let parachain_config = prepare_node_config(parachain_config);
129

            
130
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
131
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
132

            
133
    let (_, import_queue) = import_queue(&parachain_config, &node_builder);
134

            
135
    // Relay chain interface
136
    let (relay_chain_interface, _collator_key, start_bootnode_params) = node_builder
137
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
138
        .await?;
139

            
140
    // Build cumulus network, allowing to access network-related services.
141
    let node_builder = node_builder
142
        .build_cumulus_network::<_, Net>(
143
            &parachain_config,
144
            para_id,
145
            import_queue,
146
            relay_chain_interface.clone(),
147
        )
148
        .await?;
149

            
150
    let rpc_builder = {
151
        let client = node_builder.client.clone();
152
        let transaction_pool = node_builder.transaction_pool.clone();
153

            
154
        Box::new(move |_| {
155
            let deps = crate::rpc::FullDeps {
156
                client: client.clone(),
157
                pool: transaction_pool.clone(),
158
                command_sink: None,
159
                xcm_senders: None,
160
            };
161

            
162
            crate::rpc::create_full(deps).map_err(Into::into)
163
        })
164
    };
165

            
166
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
167

            
168
    let relay_chain_slot_duration = Duration::from_secs(6);
169
    let node_builder = node_builder.start_full_node(
170
        para_id,
171
        relay_chain_interface.clone(),
172
        relay_chain_slot_duration,
173
        start_bootnode_params,
174
    )?;
175

            
176
    Ok((node_builder.task_manager, node_builder.client))
177
}
178

            
179
/// Helper function to generate a crypto pair from seed
180
78
fn get_aura_id_from_seed(seed: &str) -> NimbusId {
181
78
    sp_core::sr25519::Pair::from_string(&format!("//{}", seed), None)
182
78
        .expect("static values are valid; qed")
183
78
        .public()
184
78
        .into()
185
78
}
186

            
187
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
188
///
189
/// This is the actual implementation that is abstract over the executor and the runtime api.
190
#[sc_tracing::logging::prefix_logs_with("Parachain Dev Node")]
191
pub async fn start_dev_node(
192
    parachain_config: Configuration,
193
    sealing: Sealing,
194
    para_id: ParaId,
195
    hwbench: Option<sc_sysinfo::HwBench>,
196
) -> sc_service::error::Result<TaskManager> {
197
    let parachain_config = prepare_node_config(parachain_config);
198

            
199
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
200
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
201

            
202
    let (parachain_block_import, import_queue) = import_queue(&parachain_config, &node_builder);
203

            
204
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
205
    // the relaychain)
206
    let mut node_builder = node_builder
207
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
208
            &parachain_config,
209
            import_queue,
210
        )?;
211

            
212
    let mut command_sink = None;
213
    let mut xcm_senders = None;
214

            
215
    if parachain_config.role.is_authority() {
216
        let client = node_builder.client.clone();
217
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
218
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
219
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
220

            
221
        let authorities = vec![get_aura_id_from_seed("alice")];
222

            
223
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
224
            block_import: parachain_block_import,
225
            sealing,
226
            soft_deadline: None,
227
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
228
            consensus_data_provider: Some(Box::new(
229
                tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
230
                    SlotDuration::from_millis(
231
                        container_chain_template_simple_runtime::SLOT_DURATION,
232
                    ),
233
                    authorities.clone(),
234
                ),
235
            )),
236
600
            create_inherent_data_providers: move |block: H256, ()| {
237
600
                let current_para_block = client
238
600
                    .number(block)
239
600
                    .expect("Header lookup should succeed")
240
600
                    .expect("Header passed in as parent should be present in backend.");
241

            
242
600
                let hash = client
243
600
                    .hash(current_para_block.saturating_sub(1))
244
600
                    .expect("Hash of the desired block must be present")
245
600
                    .expect("Hash of the desired block should exist");
246

            
247
600
                let para_header = client
248
600
                    .expect_header(hash)
249
600
                    .expect("Expected parachain header should exist")
250
600
                    .encode();
251

            
252
600
                let para_head_data: Vec<u8> = HeadData(para_header).encode();
253
600
                let client_set_aside_for_cidp = client.clone();
254
600
                let client_for_xcm = client.clone();
255
600
                let authorities_for_cidp = authorities.clone();
256
600
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
257
600
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
258
600
                let slot_duration = container_chain_template_simple_runtime::SLOT_DURATION;
259

            
260
600
                let mut timestamp = 0u64;
261
600
                TIMESTAMP.with(|x| {
262
600
                    timestamp = *x.borrow();
263
600
                });
264

            
265
600
                timestamp += slot_duration;
266

            
267
600
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
268
600
                    timestamp.into(),
269
600
                    SlotDuration::from_millis(slot_duration),
270
                );
271
600
                let relay_slot = u64::from(*relay_slot);
272

            
273
600
                let downward_xcm_receiver = downward_xcm_receiver.clone();
274
600
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
275

            
276
600
                async move {
277
600
                    let mocked_authorities_noting =
278
600
                        ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
279
600
                            current_para_block,
280
600
                            relay_offset: 1000,
281
600
                            relay_blocks_per_para_block: 2,
282
600
                            orchestrator_para_id: container_chain_template_simple_runtime::genesis_config_presets::ORCHESTRATOR,
283
600
                            container_para_id: para_id,
284
600
                            authorities: authorities_for_cidp,
285
600
                        };
286

            
287
600
                    let mut additional_keys = mocked_authorities_noting.get_key_values();
288
600
                    additional_keys.append(&mut vec![(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode())]);
289

            
290
600
                    let time = MockTimestampInherentDataProvider;
291
600
                    let current_para_head = client_set_aside_for_cidp
292
600
                        .header(block)
293
600
                        .expect("Header lookup should succeed")
294
600
                        .expect("Header passed in as parent should be present in backend.");
295
600
                    let should_send_go_ahead = match client_set_aside_for_cidp
296
600
                        .runtime_api()
297
600
                        .collect_collation_info(block, &current_para_head)
298
                    {
299
600
                        Ok(info) => info.new_validation_code.is_some(),
300
                        Err(e) => {
301
                            log::error!("Failed to collect collation info: {:?}", e);
302
                            false
303
                        }
304
                    };
305

            
306
600
                    let mocked_parachain = MockValidationDataInherentDataProvider {
307
600
                        current_para_block,
308
600
                        current_para_block_head: None,
309
                        relay_offset: 1000,
310
                        relay_blocks_per_para_block: 2,
311
                        // TODO: Recheck
312
                        para_blocks_per_relay_epoch: 10,
313
600
                        relay_randomness_config: (),
314
600
                        xcm_config: MockXcmConfig::new(
315
600
                            &*client_for_xcm,
316
600
                            block,
317
600
                            Default::default(),
318
                        ),
319
600
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
320
600
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
321
600
                        additional_key_values: Some(additional_keys),
322
600
                        para_id,
323
600
                        upgrade_go_ahead: should_send_go_ahead.then(|| {
324
2
                            log::info!(
325
2
                                "Detected pending validation code, sending go-ahead signal."
326
                            );
327
2
                            UpgradeGoAhead::GoAhead
328
2
                        }),
329
                    };
330

            
331
600
                    Ok((time, mocked_parachain, mocked_authorities_noting))
332
600
                }
333
600
            },
334
        })?;
335
    }
336

            
337
    let rpc_builder = {
338
        let client = node_builder.client.clone();
339
        let transaction_pool = node_builder.transaction_pool.clone();
340

            
341
156
        Box::new(move |_| {
342
156
            let deps = crate::rpc::FullDeps {
343
156
                client: client.clone(),
344
156
                pool: transaction_pool.clone(),
345
156
                command_sink: command_sink.clone(),
346
156
                xcm_senders: xcm_senders.clone(),
347
156
            };
348

            
349
156
            crate::rpc::create_full(deps).map_err(Into::into)
350
156
        })
351
    };
352

            
353
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
354

            
355
    log::info!("Development Service Ready");
356

            
357
    Ok(node_builder.task_manager)
358
}