1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Service and ServiceFactory implementation. Specialized wrapper over substrate service.
18

            
19
use {
20
    container_chain_template_simple_runtime::Hash,
21
    container_chain_template_simple_runtime::{opaque::Block, RuntimeApi},
22
    cumulus_client_cli::CollatorOptions,
23
    cumulus_client_consensus_common::ParachainBlockImport as TParachainBlockImport,
24
    cumulus_client_parachain_inherent::{MockValidationDataInherentDataProvider, MockXcmConfig},
25
    cumulus_client_service::{prepare_node_config, ParachainHostFunctions},
26
    cumulus_primitives_core::{
27
        relay_chain::well_known_keys as RelayWellKnownKeys, CollectCollationInfo, ParaId,
28
    },
29
    nimbus_primitives::NimbusId,
30
    node_common::service::{ManualSealConfiguration, NodeBuilder, NodeBuilderConfig, Sealing},
31
    parity_scale_codec::Encode,
32
    polkadot_parachain_primitives::primitives::HeadData,
33
    polkadot_primitives::UpgradeGoAhead,
34
    sc_consensus::BasicQueue,
35
    sc_executor::WasmExecutor,
36
    sc_network::NetworkBackend,
37
    sc_service::{Configuration, TFullBackend, TFullClient, TaskManager},
38
    sp_api::ProvideRuntimeApi,
39
    sp_blockchain::HeaderBackend,
40
    sp_consensus_slots::{Slot, SlotDuration},
41
    sp_core::{Pair, H256},
42
    std::{sync::Arc, time::Duration},
43
};
44

            
45
type ParachainExecutor = WasmExecutor<ParachainHostFunctions>;
46
type ParachainClient = TFullClient<Block, RuntimeApi, ParachainExecutor>;
47
type ParachainBackend = TFullBackend<Block>;
48
type ParachainBlockImport = TParachainBlockImport<Block, Arc<ParachainClient>, ParachainBackend>;
49

            
50
pub struct NodeConfig;
51
impl NodeBuilderConfig for NodeConfig {
52
    type Block = Block;
53
    type RuntimeApi = RuntimeApi;
54
    type ParachainExecutor = ParachainExecutor;
55
}
56

            
57
thread_local!(static TIMESTAMP: std::cell::RefCell<u64> = const { std::cell::RefCell::new(0) });
58

            
59
/// Provide a mock duration starting at 0 in millisecond for timestamp inherent.
60
/// Each call will increment timestamp by slot_duration making Aura think time has passed.
61
struct MockTimestampInherentDataProvider;
62
#[async_trait::async_trait]
63
impl sp_inherents::InherentDataProvider for MockTimestampInherentDataProvider {
64
    async fn provide_inherent_data(
65
        &self,
66
        inherent_data: &mut sp_inherents::InherentData,
67
538
    ) -> Result<(), sp_inherents::Error> {
68
538
        TIMESTAMP.with(|x| {
69
538
            *x.borrow_mut() += container_chain_template_simple_runtime::SLOT_DURATION;
70
538
            inherent_data.put_data(sp_timestamp::INHERENT_IDENTIFIER, &*x.borrow())
71
538
        })
72
1076
    }
73

            
74
    async fn try_handle_error(
75
        &self,
76
        _identifier: &sp_inherents::InherentIdentifier,
77
        _error: &[u8],
78
    ) -> Option<Result<(), sp_inherents::Error>> {
79
        // The pallet never reports error.
80
        None
81
    }
82
}
83

            
84
66
pub fn import_queue(
85
66
    parachain_config: &Configuration,
86
66
    node_builder: &NodeBuilder<NodeConfig>,
87
66
) -> (ParachainBlockImport, BasicQueue<Block>) {
88
66
    // The nimbus import queue ONLY checks the signature correctness
89
66
    // Any other checks corresponding to the author-correctness should be done
90
66
    // in the runtime
91
66
    let block_import =
92
66
        ParachainBlockImport::new(node_builder.client.clone(), node_builder.backend.clone());
93
66

            
94
66
    let import_queue = nimbus_consensus::import_queue(
95
66
        node_builder.client.clone(),
96
66
        block_import.clone(),
97
66
        move |_, _| async move {
98
            let time = sp_timestamp::InherentDataProvider::from_system_time();
99

            
100
            Ok((time,))
101
66
        },
102
66
        &node_builder.task_manager.spawn_essential_handle(),
103
66
        parachain_config.prometheus_registry(),
104
66
        false,
105
66
        false,
106
66
    )
107
66
    .expect("function never fails");
108
66

            
109
66
    (block_import, import_queue)
110
66
}
111

            
112
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
113
///
114
/// This is the actual implementation that is abstract over the executor and the runtime api.
115
#[sc_tracing::logging::prefix_logs_with("Parachain")]
116
pub async fn start_parachain_node<Net>(
117
    parachain_config: Configuration,
118
    polkadot_config: Configuration,
119
    collator_options: CollatorOptions,
120
    para_id: ParaId,
121
    hwbench: Option<sc_sysinfo::HwBench>,
122
) -> sc_service::error::Result<(TaskManager, Arc<ParachainClient>)>
123
where
124
    Net: NetworkBackend<Block, Hash>,
125
{
126
    let parachain_config = prepare_node_config(parachain_config);
127

            
128
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
129
    let mut node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
130

            
131
    let (_, import_queue) = import_queue(&parachain_config, &node_builder);
132

            
133
    // Relay chain interface
134
    let (relay_chain_interface, _collator_key) = node_builder
135
        .build_relay_chain_interface(&parachain_config, polkadot_config, collator_options.clone())
136
        .await?;
137

            
138
    // Build cumulus network, allowing to access network-related services.
139
    let node_builder = node_builder
140
        .build_cumulus_network::<_, Net>(
141
            &parachain_config,
142
            para_id,
143
            import_queue,
144
            relay_chain_interface.clone(),
145
        )
146
        .await?;
147

            
148
    let rpc_builder = {
149
        let client = node_builder.client.clone();
150
        let transaction_pool = node_builder.transaction_pool.clone();
151

            
152
        Box::new(move |_| {
153
            let deps = crate::rpc::FullDeps {
154
                client: client.clone(),
155
                pool: transaction_pool.clone(),
156
                command_sink: None,
157
                xcm_senders: None,
158
            };
159

            
160
            crate::rpc::create_full(deps).map_err(Into::into)
161
        })
162
    };
163

            
164
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
165

            
166
    let relay_chain_slot_duration = Duration::from_secs(6);
167
    let node_builder = node_builder.start_full_node(
168
        para_id,
169
        relay_chain_interface.clone(),
170
        relay_chain_slot_duration,
171
    )?;
172

            
173
    Ok((node_builder.task_manager, node_builder.client))
174
}
175

            
176
/// Helper function to generate a crypto pair from seed
177
66
fn get_aura_id_from_seed(seed: &str) -> NimbusId {
178
66
    sp_core::sr25519::Pair::from_string(&format!("//{}", seed), None)
179
66
        .expect("static values are valid; qed")
180
66
        .public()
181
66
        .into()
182
66
}
183

            
184
/// Start a node with the given parachain `Configuration` and relay chain `Configuration`.
185
///
186
/// This is the actual implementation that is abstract over the executor and the runtime api.
187
#[sc_tracing::logging::prefix_logs_with("Parachain Dev Node")]
188
pub async fn start_dev_node(
189
    parachain_config: Configuration,
190
    sealing: Sealing,
191
    para_id: ParaId,
192
    hwbench: Option<sc_sysinfo::HwBench>,
193
) -> sc_service::error::Result<TaskManager> {
194
    let parachain_config = prepare_node_config(parachain_config);
195

            
196
    // Create a `NodeBuilder` which helps setup parachain nodes common systems.
197
    let node_builder = NodeConfig::new_builder(&parachain_config, hwbench.clone())?;
198

            
199
    let (parachain_block_import, import_queue) = import_queue(&parachain_config, &node_builder);
200

            
201
    // Build a Substrate Network. (not cumulus since it is a dev node, it mocks
202
    // the relaychain)
203
    let mut node_builder = node_builder
204
        .build_substrate_network::<sc_network::NetworkWorker<_, _>>(
205
            &parachain_config,
206
            import_queue,
207
        )?;
208

            
209
    let mut command_sink = None;
210
    let mut xcm_senders = None;
211

            
212
    if parachain_config.role.is_authority() {
213
        let client = node_builder.client.clone();
214
        let (downward_xcm_sender, downward_xcm_receiver) = flume::bounded::<Vec<u8>>(100);
215
        let (hrmp_xcm_sender, hrmp_xcm_receiver) = flume::bounded::<(ParaId, Vec<u8>)>(100);
216
        xcm_senders = Some((downward_xcm_sender, hrmp_xcm_sender));
217

            
218
        let authorities = vec![get_aura_id_from_seed("alice")];
219

            
220
        command_sink = node_builder.install_manual_seal(ManualSealConfiguration {
221
            block_import: parachain_block_import,
222
            sealing,
223
            soft_deadline: None,
224
            select_chain: sc_consensus::LongestChain::new(node_builder.backend.clone()),
225
            consensus_data_provider: Some(Box::new(
226
                tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
227
                    SlotDuration::from_millis(
228
                        container_chain_template_simple_runtime::SLOT_DURATION,
229
                    ),
230
                    authorities.clone(),
231
                ),
232
            )),
233
538
            create_inherent_data_providers: move |block: H256, ()| {
234
538
                let current_para_block = client
235
538
                    .number(block)
236
538
                    .expect("Header lookup should succeed")
237
538
                    .expect("Header passed in as parent should be present in backend.");
238
538

            
239
538
                let hash = client
240
538
                    .hash(current_para_block.saturating_sub(1))
241
538
                    .expect("Hash of the desired block must be present")
242
538
                    .expect("Hash of the desired block should exist");
243
538

            
244
538
                let para_header = client
245
538
                    .expect_header(hash)
246
538
                    .expect("Expected parachain header should exist")
247
538
                    .encode();
248
538

            
249
538
                let para_head_data: Vec<u8> = HeadData(para_header).encode();
250
538
                let client_set_aside_for_cidp = client.clone();
251
538
                let client_for_xcm = client.clone();
252
538
                let authorities_for_cidp = authorities.clone();
253
538
                let para_head_key = RelayWellKnownKeys::para_head(para_id);
254
538
                let relay_slot_key = RelayWellKnownKeys::CURRENT_SLOT.to_vec();
255
538
                let slot_duration = container_chain_template_simple_runtime::SLOT_DURATION;
256
538

            
257
538
                let mut timestamp = 0u64;
258
538
                TIMESTAMP.with(|x| {
259
538
                    timestamp = x.clone().take();
260
538
                });
261
538

            
262
538
                timestamp += slot_duration;
263
538

            
264
538
                let relay_slot = sp_consensus_aura::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
265
538
						timestamp.into(),
266
538
						SlotDuration::from_millis(slot_duration),
267
538
                    );
268
538
                let relay_slot = u64::from(*relay_slot);
269
538

            
270
538
                let downward_xcm_receiver = downward_xcm_receiver.clone();
271
538
                let hrmp_xcm_receiver = hrmp_xcm_receiver.clone();
272

            
273
538
                async move {
274
538
                    let mocked_authorities_noting =
275
538
                        ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
276
538
                            current_para_block,
277
538
                            relay_offset: 1000,
278
538
                            relay_blocks_per_para_block: 2,
279
538
                            orchestrator_para_id: crate::chain_spec::ORCHESTRATOR,
280
538
                            container_para_id: para_id,
281
538
                            authorities: authorities_for_cidp
282
538
                    };
283
538

            
284
538
                    let mut additional_keys = mocked_authorities_noting.get_key_values();
285
538
                    additional_keys.append(&mut vec![(para_head_key, para_head_data), (relay_slot_key, Slot::from(relay_slot).encode())]);
286
538

            
287
538
                    let time = MockTimestampInherentDataProvider;
288
538
                    let current_para_head = client_set_aside_for_cidp
289
538
                            .header(block)
290
538
                            .expect("Header lookup should succeed")
291
538
                            .expect("Header passed in as parent should be present in backend.");
292
538
                    let should_send_go_ahead = match client_set_aside_for_cidp
293
538
                            .runtime_api()
294
538
                            .collect_collation_info(block, &current_para_head)
295
                    {
296
538
                            Ok(info) => info.new_validation_code.is_some(),
297
                            Err(e) => {
298
                                    log::error!("Failed to collect collation info: {:?}", e);
299
                                    false
300
                            },
301
                    };
302

            
303
538
                    let mocked_parachain = MockValidationDataInherentDataProvider {
304
538
                        current_para_block,
305
538
                        current_para_block_head: None,
306
538
                        relay_offset: 1000,
307
538
                        relay_blocks_per_para_block: 2,
308
538
                        // TODO: Recheck
309
538
                        para_blocks_per_relay_epoch: 10,
310
538
                        relay_randomness_config: (),
311
538
                        xcm_config: MockXcmConfig::new(
312
538
                            &*client_for_xcm,
313
538
                            block,
314
538
                            Default::default(),
315
538
                        ),
316
538
                        raw_downward_messages: downward_xcm_receiver.drain().collect(),
317
538
                        raw_horizontal_messages: hrmp_xcm_receiver.drain().collect(),
318
538
                        additional_key_values: Some(additional_keys),
319
538
                        para_id,
320
538
                        upgrade_go_ahead: should_send_go_ahead.then(|| {
321
2
                            log::info!(
322
2
                                "Detected pending validation code, sending go-ahead signal."
323
                            );
324
2
                            UpgradeGoAhead::GoAhead
325
538
                        }),
326
538
                    };
327
538

            
328
538
                    Ok((time, mocked_parachain, mocked_authorities_noting))
329
538
                }
330
538
            },
331
        })?;
332
    }
333

            
334
    let rpc_builder = {
335
        let client = node_builder.client.clone();
336
        let transaction_pool = node_builder.transaction_pool.clone();
337

            
338
132
        Box::new(move |_| {
339
132
            let deps = crate::rpc::FullDeps {
340
132
                client: client.clone(),
341
132
                pool: transaction_pool.clone(),
342
132
                command_sink: command_sink.clone(),
343
132
                xcm_senders: xcm_senders.clone(),
344
132
            };
345
132

            
346
132
            crate::rpc::create_full(deps).map_err(Into::into)
347
132
        })
348
    };
349

            
350
    let node_builder = node_builder.spawn_common_tasks(parachain_config, rpc_builder)?;
351

            
352
    log::info!("Development Service Ready");
353

            
354
    Ok(node_builder.task_manager)
355
}