1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Development Polkadot service. Adapted from `polkadot_service` crate
18
//! and removed un-necessary components which are not required in dev node.
19
//!
20
//! Following major changes are made:
21
//! 1. Removed beefy and grandpa notification service and request response protocols
22
//! 2. Removed support for parachains which also eliminated the need to start overseer and all other subsystems associated with collation + network request/response protocols for the same
23
//! 3. Removed support for hardware benchmarking
24
//! 4. Removed authority discovery service
25
//! 5. Removed spawning of beefy, grandpa and MMR worker
26
//! 6. Removed rpc extensions for beefy, grandpa and babe and added support for manual seal
27
//! 7. Removed beefy and grandpa block import from block import pipeline (Babe remains)
28
//! 8. Using manual seal import queue instead of babe import queue
29
//! 9. Started manual seal worker
30
//! 10. If amount of time passed between two block is less than slot duration, we emulate passing of time babe block import and runtime
31
//!     by incrementing timestamp by slot duration.
32

            
33
use {
34
    crate::dev_rpcs::{DevApiServer, DevRpc},
35
    async_io::Timer,
36
    babe::{BabeBlockImport, BabeLink},
37
    codec::{Decode, Encode},
38
    consensus_common::SelectChain,
39
    dancelight_runtime::RuntimeApi,
40
    futures::{Stream, StreamExt},
41
    jsonrpsee::RpcModule,
42
    node_common::service::Sealing,
43
    polkadot_core_primitives::{AccountId, Balance, Block, Hash, Nonce},
44
    polkadot_node_core_parachains_inherent::Error as InherentError,
45
    polkadot_overseer::Handle,
46
    polkadot_parachain_primitives::primitives::UpwardMessages,
47
    polkadot_primitives::{
48
        runtime_api::ParachainHost, BackedCandidate, CandidateCommitments, CandidateDescriptor,
49
        CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs,
50
        InherentData as ParachainsInherentData, OccupiedCoreAssumption, SigningContext,
51
        ValidityAttestation,
52
    },
53
    polkadot_rpc::RpcExtension,
54
    polkadot_service::{
55
        BlockT, Error, IdentifyVariant, NewFullParams, OverseerGen, SelectRelayChain,
56
    },
57
    sc_client_api::{AuxStore, Backend},
58
    sc_consensus_manual_seal::{
59
        consensus::babe::BabeConsensusDataProvider,
60
        rpc::{ManualSeal, ManualSealApiServer},
61
        run_manual_seal, EngineCommand, ManualSealParams,
62
    },
63
    sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY},
64
    sc_keystore::Keystore,
65
    sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool},
66
    service::{Configuration, KeystoreContainer, RpcHandlers, TaskManager},
67
    sp_api::ProvideRuntimeApi,
68
    sp_block_builder::BlockBuilder,
69
    sp_blockchain::{HeaderBackend, HeaderMetadata},
70
    sp_consensus_aura::{inherents::InherentType as AuraInherentType, AURA_ENGINE_ID},
71
    sp_consensus_babe::SlotDuration,
72
    sp_core::{ByteArray, Pair, H256},
73
    sp_keystore::KeystorePtr,
74
    sp_runtime::{traits::BlakeTwo256, DigestItem, RuntimeAppPublic},
75
    std::{cmp::max, ops::Add, sync::Arc, time::Duration},
76
    telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
77
};
78

            
79
// We use this key to store whether we want the para inherent mocker to be active
80
const PARA_INHERENT_SELECTOR_AUX_KEY: &[u8] = b"__DEV_PARA_INHERENT_SELECTOR";
81

            
82
pub type FullBackend = service::TFullBackend<Block>;
83

            
84
pub type FullClient = service::TFullClient<
85
    Block,
86
    RuntimeApi,
87
    WasmExecutor<(
88
        sp_io::SubstrateHostFunctions,
89
        frame_benchmarking::benchmarking::HostFunctions,
90
    )>,
91
>;
92

            
93
pub struct NewFull {
94
    pub task_manager: TaskManager,
95
    pub client: Arc<FullClient>,
96
    pub overseer_handle: Option<Handle>,
97
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
98
    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
99
    pub rpc_handlers: RpcHandlers,
100
    pub backend: Arc<FullBackend>,
101
}
102

            
103
/// Custom Deps for dev Rpc extension
104
struct DevDeps<C, P> {
105
    /// The client instance to use.
106
    pub client: Arc<C>,
107
    /// Transaction pool instance.
108
    pub pool: Arc<P>,
109
    /// Manual seal command sink
110
    pub command_sink: Option<futures::channel::mpsc::Sender<EngineCommand<Hash>>>,
111
    /// Dev rpcs
112
    pub dev_rpc: Option<DevRpc>,
113
}
114

            
115
1824
fn create_dev_rpc_extension<C, P>(
116
1824
    DevDeps {
117
1824
        client,
118
1824
        pool,
119
1824
        command_sink: maybe_command_sink,
120
1824
        dev_rpc: maybe_dev_rpc,
121
1824
    }: DevDeps<C, P>,
122
1824
) -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>>
123
1824
where
124
1824
    C: ProvideRuntimeApi<Block>
125
1824
        + HeaderBackend<Block>
126
1824
        + AuxStore
127
1824
        + HeaderMetadata<Block, Error = sp_blockchain::Error>
128
1824
        + Send
129
1824
        + Sync
130
1824
        + 'static,
131
1824
    C::Api: substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
132
1824
    C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>,
133
1824
    C::Api: BlockBuilder<Block>,
134
1824
    P: TransactionPool + Sync + Send + 'static,
135
1824
{
136
    use {
137
        pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer},
138
        substrate_frame_rpc_system::{System, SystemApiServer},
139
    };
140

            
141
1824
    let mut io = RpcModule::new(());
142
1824
    io.merge(System::new(client.clone(), pool.clone()).into_rpc())?;
143
1824
    io.merge(TransactionPayment::new(client.clone()).into_rpc())?;
144

            
145
1824
    if let Some(command_sink) = maybe_command_sink {
146
1824
        io.merge(ManualSeal::new(command_sink).into_rpc())?;
147
    }
148

            
149
1824
    if let Some(dev_rpc_data) = maybe_dev_rpc {
150
1824
        io.merge(dev_rpc_data.into_rpc())?;
151
    }
152

            
153
1824
    Ok(io)
154
1824
}
155

            
156
/// We use EmptyParachainsInherentDataProvider to insert an empty parachain inherent in the block
157
/// to satisfy runtime
158
struct EmptyParachainsInherentDataProvider;
159

            
160
/// Copied from polkadot service just so that this code retains same structure as
161
/// polkadot_service crate.
162
struct Basics {
163
    task_manager: TaskManager,
164
    client: Arc<FullClient>,
165
    backend: Arc<FullBackend>,
166
    keystore_container: KeystoreContainer,
167
    telemetry: Option<Telemetry>,
168
}
169

            
170
impl EmptyParachainsInherentDataProvider {
171
46482
    pub async fn create<C: HeaderBackend<Block>>(
172
46482
        client: Arc<C>,
173
46482
        parent: Hash,
174
46482
    ) -> Result<ParachainsInherentData, InherentError> {
175
46482
        let parent_header = match client.header(parent) {
176
46482
            Ok(Some(h)) => h,
177
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
178
            Err(err) => return Err(InherentError::Blockchain(err)),
179
        };
180

            
181
46482
        Ok(ParachainsInherentData {
182
46482
            bitfields: Vec::new(),
183
46482
            backed_candidates: Vec::new(),
184
46482
            disputes: Vec::new(),
185
46482
            parent_header,
186
46482
        })
187
46482
    }
188
}
189

            
190
/// Creates new development full node with manual seal
191
912
pub fn build_full<OverseerGenerator: OverseerGen>(
192
912
    sealing: Sealing,
193
912
    config: Configuration,
194
912
    mut params: NewFullParams<OverseerGenerator>,
195
912
) -> Result<NewFull, Error> {
196
912
    let is_polkadot = config.chain_spec.is_polkadot();
197
912

            
198
912
    params.overseer_message_channel_capacity_override = params
199
912
        .overseer_message_channel_capacity_override
200
912
        .map(move |capacity| {
201
            if is_polkadot {
202
                gum::warn!("Channel capacity should _never_ be tampered with on polkadot!");
203
            }
204
            capacity
205
912
        });
206
912

            
207
912
    match config.network.network_backend {
208
        sc_network::config::NetworkBackendType::Libp2p => {
209
912
            new_full::<_, sc_network::NetworkWorker<Block, Hash>>(sealing, config, params)
210
        }
211
        sc_network::config::NetworkBackendType::Litep2p => {
212
            new_full::<_, sc_network::Litep2pNetworkBackend>(sealing, config, params)
213
        }
214
    }
215
912
}
216

            
217
/// We use MockParachainsInherentDataProvider to insert an parachain inherent with mocked
218
/// candidates
219
/// We detect whether any of the keys in our keystore is assigned to a core and provide
220
/// a mocked candidate in such core
221
struct MockParachainsInherentDataProvider<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> {
222
    pub client: Arc<C>,
223
    pub parent: Hash,
224
    pub keystore: KeystorePtr,
225
    pub upward_messages_receiver: flume::Receiver<Vec<u8>>,
226
}
227

            
228
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> MockParachainsInherentDataProvider<C>
229
where
230
    C::Api: ParachainHost<Block>,
231
    C: AuxStore,
232
{
233
54864
    pub fn new(
234
54864
        client: Arc<C>,
235
54864
        parent: Hash,
236
54864
        keystore: KeystorePtr,
237
54864
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
238
54864
    ) -> Self {
239
54864
        MockParachainsInherentDataProvider {
240
54864
            client,
241
54864
            parent,
242
54864
            keystore,
243
54864
            upward_messages_receiver,
244
54864
        }
245
54864
    }
246

            
247
8382
    pub async fn create(
248
8382
        client: Arc<C>,
249
8382
        parent: Hash,
250
8382
        keystore: KeystorePtr,
251
8382
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
252
8382
    ) -> Result<ParachainsInherentData, InherentError> {
253
8382
        let parent_header = match client.header(parent) {
254
8382
            Ok(Some(h)) => h,
255
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
256
            Err(err) => return Err(InherentError::Blockchain(err)),
257
        };
258

            
259
        // Strategy:
260
        // we usually have 1 validator per core, and we usually run with --alice
261
        // the idea is that at least alice will be assigned to one core
262
        // if we find in the keystore the validator attached to a particular core,
263
        // we generate a signature for the parachain assigned to that core
264
        // To retrieve the validator keys, cal runtime api:
265

            
266
        // this following piece of code predicts whether the validator is assigned to a particular
267
        // core where a candidate for a parachain needs to be created
268
8382
        let runtime_api = client.runtime_api();
269
8382

            
270
8382
        // we get all validators
271
8382

            
272
8382
        // we get the current claim queue to know core availability
273
8382
        let claim_queue = runtime_api.claim_queue(parent).unwrap();
274
8382

            
275
8382
        // we get the validator groups
276
8382
        let (groups, rotation_info) = runtime_api.validator_groups(parent).unwrap();
277
8382

            
278
8382
        // we calculate rotation since start, which will define the core assignation
279
8382
        // to validators
280
8382
        let rotations_since_session_start = (parent_header.number
281
8382
            - rotation_info.session_start_block)
282
8382
            / rotation_info.group_rotation_frequency;
283
8382

            
284
8382
        // Get all the available keys in the keystore
285
8382
        let available_keys = keystore
286
8382
            .keys(polkadot_primitives::PARACHAIN_KEY_TYPE_ID)
287
8382
            .unwrap();
288
8382

            
289
8382
        // create a slot number identical to the parent block num
290
8382
        let slot_number = AuraInherentType::from(u64::from(parent_header.number));
291
8382

            
292
8382
        // create a mocked header
293
8382
        let parachain_mocked_header = sp_runtime::generic::Header::<u32, BlakeTwo256> {
294
8382
            parent_hash: Default::default(),
295
8382
            number: parent_header.number,
296
8382
            state_root: Default::default(),
297
8382
            extrinsics_root: Default::default(),
298
8382
            digest: sp_runtime::generic::Digest {
299
8382
                logs: vec![DigestItem::PreRuntime(AURA_ENGINE_ID, slot_number.encode())],
300
8382
            },
301
8382
        };
302
8382

            
303
8382
        // retrieve availability cores
304
8382
        let availability_cores = runtime_api.availability_cores(parent).unwrap();
305
8382

            
306
8382
        // retrieve current session_idx
307
8382
        let session_idx = runtime_api.session_index_for_child(parent).unwrap();
308
8382

            
309
8382
        // retrieve all validators
310
8382
        let all_validators = runtime_api.validators(parent).unwrap();
311
8382

            
312
8382
        // construct full availability bitvec
313
8382
        let availability_bitvec = availability_bitvec(1, availability_cores.len());
314
8382

            
315
8382
        let signature_ctx = SigningContext {
316
8382
            parent_hash: parent,
317
8382
            session_index: session_idx,
318
8382
        };
319
8382

            
320
8382
        // we generate the availability bitfield sigs
321
8382
        // TODO: here we assume all validator keys are able to sign with our keystore
322
8382
        // we need to make sure the key is there before we try to sign
323
8382
        // this is mostly to indicate that the erasure coding chunks where received by all val
324
8382
        let bitfields: Vec<UncheckedSigned<AvailabilityBitfield>> = all_validators
325
8382
            .iter()
326
8382
            .enumerate()
327
8382
            .map(|(i, public)| {
328
8382
                keystore_sign(
329
8382
                    &keystore,
330
8382
                    availability_bitvec.clone(),
331
8382
                    &signature_ctx,
332
8382
                    ValidatorIndex(i as u32),
333
8382
                    &public,
334
8382
                )
335
8382
                .unwrap()
336
8382
                .unwrap()
337
8382
            })
338
8382
            .collect();
339
8382

            
340
8382
        // generate a random collator pair
341
8382
        let collator_pair = CollatorPair::generate().0;
342
8382
        let mut backed_cand: Vec<BackedCandidate<H256>> = vec![];
343

            
344
        // iterate over every core|para pair
345
22866
        for (core, para) in claim_queue {
346
            // check which group is assigned to each core
347
14484
            let group_assigned_to_core =
348
14484
                core.0 + rotations_since_session_start % groups.len() as u32;
349
14484
            // check validator indices associated to the core
350
14484
            let indices_associated_to_core = groups.get(group_assigned_to_core as usize).unwrap();
351
21810
            for index in indices_associated_to_core {
352
                // fetch validator keys
353
7326
                let validator_keys_to_find = all_validators.get(index.0 as usize).unwrap();
354
                // Iterate keys until we find an eligible one, or run out of candidates.
355
14652
                for type_public_pair in &available_keys {
356
7326
                    if let Ok(validator) =
357
7326
                        polkadot_primitives::ValidatorId::from_slice(&type_public_pair)
358
                    {
359
                        // if we find the validator in keystore, we try to create a backed cand
360
7326
                        if validator_keys_to_find == &validator {
361
7326
                            // we work with the previous included data
362
7326
                            let mut persisted_validation_data = runtime_api
363
7326
                                .persisted_validation_data(
364
7326
                                    parent,
365
7326
                                    para[0],
366
7326
                                    OccupiedCoreAssumption::Included,
367
7326
                                )
368
7326
                                .unwrap()
369
7326
                                .unwrap();
370
7326

            
371
7326
                            // if we dont do this we have a backed candidate every 2 blocks
372
7326
                            // TODO: figure out why
373
7326
                            persisted_validation_data.relay_parent_storage_root =
374
7326
                                parent_header.state_root;
375
7326

            
376
7326
                            let persisted_validation_data_hash = persisted_validation_data.hash();
377
7326
                            // retrieve the validation code hash
378
7326
                            let validation_code_hash = runtime_api
379
7326
                                .validation_code_hash(
380
7326
                                    parent,
381
7326
                                    para[0],
382
7326
                                    OccupiedCoreAssumption::Included,
383
7326
                                )
384
7326
                                .unwrap()
385
7326
                                .unwrap();
386
7326
                            let pov_hash = Default::default();
387
7326
                            // generate a fake collator signature
388
7326
                            let payload = polkadot_primitives::collator_signature_payload(
389
7326
                                &parent,
390
7326
                                &para[0],
391
7326
                                &persisted_validation_data_hash,
392
7326
                                &pov_hash,
393
7326
                                &validation_code_hash,
394
7326
                            );
395
7326
                            let collator_signature = collator_pair.sign(&payload);
396
7326

            
397
7326
                            let upward_messages = UpwardMessages::try_from(
398
7326
                                upward_messages_receiver.drain().collect::<Vec<_>>(),
399
7326
                            )
400
7326
                            .expect("create upward messages from raw messages");
401
7326

            
402
7326
                            // generate a candidate with most of the values mocked
403
7326
                            let candidate = CommittedCandidateReceipt::<H256> {
404
7326
                                descriptor: CandidateDescriptor::<H256> {
405
7326
                                    para_id: para[0],
406
7326
                                    relay_parent: parent,
407
7326
                                    collator: collator_pair.public(),
408
7326
                                    persisted_validation_data_hash,
409
7326
                                    pov_hash,
410
7326
                                    erasure_root: Default::default(),
411
7326
                                    signature: collator_signature,
412
7326
                                    para_head: parachain_mocked_header.clone().hash(),
413
7326
                                    validation_code_hash,
414
7326
                                },
415
7326
                                commitments: CandidateCommitments::<u32> {
416
7326
                                    upward_messages,
417
7326
                                    horizontal_messages: Default::default(),
418
7326
                                    new_validation_code: None,
419
7326
                                    head_data: parachain_mocked_header.clone().encode().into(),
420
7326
                                    processed_downward_messages: 0,
421
7326
                                    hrmp_watermark: parent_header.number,
422
7326
                                },
423
7326
                            };
424
7326
                            let candidate_hash = candidate.hash();
425
7326
                            let payload = CompactStatement::Valid(candidate_hash);
426
7326

            
427
7326
                            let signature_ctx = SigningContext {
428
7326
                                parent_hash: parent,
429
7326
                                session_index: session_idx,
430
7326
                            };
431
7326

            
432
7326
                            // sign the candidate with the validator key
433
7326
                            let signature = keystore_sign(
434
7326
                                &keystore,
435
7326
                                payload,
436
7326
                                &signature_ctx,
437
7326
                                *index,
438
7326
                                &validator,
439
7326
                            )
440
7326
                            .unwrap()
441
7326
                            .unwrap()
442
7326
                            .benchmark_signature();
443
7326

            
444
7326
                            // construct a validity vote
445
7326
                            let validity_votes = vec![ValidityAttestation::Explicit(signature)];
446
7326

            
447
7326
                            // push the candidate
448
7326
                            backed_cand.push(BackedCandidate::<H256>::new(
449
7326
                                candidate,
450
7326
                                validity_votes.clone(),
451
7326
                                bitvec::bitvec![u8, bitvec::order::Lsb0; 1; indices_associated_to_core.len()],
452
7326
                                Some(core),
453
7326
                            ));
454
7326
                        }
455
                    }
456
                }
457
            }
458
        }
459

            
460
8382
        Ok(ParachainsInherentData {
461
8382
            bitfields: bitfields,
462
8382
            backed_candidates: backed_cand,
463
8382
            disputes: Vec::new(),
464
8382
            parent_header,
465
8382
        })
466
8382
    }
467
}
468

            
469
#[async_trait::async_trait]
470
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> sp_inherents::InherentDataProvider
471
    for MockParachainsInherentDataProvider<C>
472
where
473
    C::Api: ParachainHost<Block>,
474
    C: AuxStore,
475
{
476
    async fn provide_inherent_data(
477
        &self,
478
        dst_inherent_data: &mut sp_inherents::InherentData,
479
54864
    ) -> Result<(), sp_inherents::Error> {
480
        // fetch whether the para inherent selector has been set
481
54864
        let maybe_para_selector = self
482
54864
            .client
483
54864
            .get_aux(PARA_INHERENT_SELECTOR_AUX_KEY)
484
54864
            .expect("Should be able to query aux storage; qed");
485

            
486
54864
        let inherent_data = {
487
54864
            if let Some(aux) = maybe_para_selector {
488
                // if it is true, the candidates need to be mocked
489
                // else, we output the empty parachain inherent data provider
490
8382
                if aux == true.encode() {
491
8382
                    MockParachainsInherentDataProvider::create(
492
8382
                        self.client.clone(),
493
8382
                        self.parent,
494
8382
                        self.keystore.clone(),
495
8382
                        self.upward_messages_receiver.clone(),
496
8382
                    )
497
                    .await
498
8382
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
499
                } else {
500
                    EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
501
                        .await
502
                        .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
503
                }
504
            } else {
505
46482
                EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
506
                    .await
507
46482
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
508
            }
509
        };
510

            
511
54864
        dst_inherent_data.put_data(
512
54864
            polkadot_primitives::PARACHAINS_INHERENT_IDENTIFIER,
513
54864
            &inherent_data,
514
54864
        )
515
109728
    }
516

            
517
    async fn try_handle_error(
518
        &self,
519
        _identifier: &sp_inherents::InherentIdentifier,
520
        _error: &[u8],
521
    ) -> Option<Result<(), sp_inherents::Error>> {
522
        // Inherent isn't checked and can not return any error
523
        None
524
    }
525
}
526

            
527
/// We store past timestamp we created in the aux storage, which enable us to return timestamp which is increased by
528
/// slot duration from previous timestamp or current timestamp if in reality more time is passed.
529
54864
fn get_next_timestamp(
530
54864
    client: Arc<FullClient>,
531
54864
    slot_duration: SlotDuration,
532
54864
) -> sp_timestamp::InherentDataProvider {
533
    const TIMESTAMP_AUX_KEY: &[u8] = b"__DEV_TIMESTAMP";
534

            
535
54864
    let maybe_last_timestamp = client
536
54864
        .get_aux(TIMESTAMP_AUX_KEY)
537
54864
        .expect("Should be able to query aux storage; qed");
538
54864
    if let Some(last_timestamp) = maybe_last_timestamp {
539
54060
        let last_inherent_data = sp_timestamp::InherentType::decode(&mut last_timestamp.as_slice())
540
54060
            .expect("Timestamp data must be decoded; qed");
541
54060
        let new_inherent_data: sp_timestamp::InherentType = max(
542
54060
            last_inherent_data.add(slot_duration.as_millis()),
543
54060
            sp_timestamp::InherentType::current(),
544
54060
        );
545
54060
        client
546
54060
            .insert_aux(
547
54060
                &[(TIMESTAMP_AUX_KEY, new_inherent_data.encode().as_slice())],
548
54060
                &[],
549
54060
            )
550
54060
            .expect("Should be able to write to aux storage; qed");
551
54060
        sp_timestamp::InherentDataProvider::new(new_inherent_data)
552
    } else {
553
804
        let current_timestamp = sp_timestamp::InherentType::current();
554
804
        client
555
804
            .insert_aux(
556
804
                &[(TIMESTAMP_AUX_KEY, current_timestamp.encode().as_slice())],
557
804
                &[],
558
804
            )
559
804
            .expect("Should be able to write to aux storage; qed");
560
804
        sp_timestamp::InherentDataProvider::new(current_timestamp)
561
    }
562
54864
}
563

            
564
912
fn new_full<
565
912
    OverseerGenerator: OverseerGen,
566
912
    Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>,
567
912
>(
568
912
    sealing: Sealing,
569
912
    mut config: Configuration,
570
912
    NewFullParams {
571
912
        telemetry_worker_handle,
572
912
        ..
573
912
    }: NewFullParams<OverseerGenerator>,
574
912
) -> Result<NewFull, Error> {
575
912
    let role = config.role;
576

            
577
912
    let basics = new_partial_basics(&mut config, telemetry_worker_handle)?;
578

            
579
912
    let prometheus_registry = config.prometheus_registry().cloned();
580
912

            
581
912
    let keystore = basics.keystore_container.local_keystore();
582
912

            
583
912
    let select_chain = SelectRelayChain::new_longest_chain(basics.backend.clone());
584

            
585
912
    let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
586
912
        client,
587
912
        backend,
588
912
        mut task_manager,
589
912
        keystore_container,
590
912
        select_chain,
591
912
        import_queue,
592
912
        transaction_pool,
593
912
        other: (block_import, babe_link, slot_duration, mut telemetry),
594
912
    } = new_partial::<SelectRelayChain<_>>(&mut config, basics, select_chain)?;
595

            
596
912
    let metrics = Network::register_notification_metrics(
597
912
        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
598
912
    );
599
912

            
600
912
    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
601
912
        &config.network,
602
912
        prometheus_registry.clone(),
603
912
    );
604
912

            
605
912
    // Create channels for mocked parachain candidates.
606
912
    let (downward_mock_para_inherent_sender, downward_mock_para_inherent_receiver) =
607
912
        flume::bounded::<Vec<u8>>(100);
608
912

            
609
912
    let (upward_mock_sender, upward_mock_receiver) = flume::bounded::<Vec<u8>>(100);
610

            
611
912
    let (network, system_rpc_tx, tx_handler_controller, network_starter, sync_service) =
612
912
        service::build_network(service::BuildNetworkParams {
613
912
            config: &config,
614
912
            net_config,
615
912
            client: client.clone(),
616
912
            transaction_pool: transaction_pool.clone(),
617
912
            spawn_handle: task_manager.spawn_handle(),
618
912
            import_queue,
619
912
            block_announce_validator_builder: None,
620
912
            warp_sync_config: None,
621
912
            block_relay: None,
622
912
            metrics,
623
912
        })?;
624

            
625
912
    if config.offchain_worker.enabled {
626
912
        use futures::FutureExt;
627

            
628
912
        task_manager.spawn_handle().spawn(
629
912
            "offchain-workers-runner",
630
912
            "offchain-work",
631
912
            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
632
912
                runtime_api_provider: client.clone(),
633
912
                keystore: Some(keystore_container.keystore()),
634
912
                offchain_db: backend.offchain_storage(),
635
912
                transaction_pool: Some(OffchainTransactionPoolFactory::new(
636
912
                    transaction_pool.clone(),
637
912
                )),
638
912
                network_provider: Arc::new(network.clone()),
639
912
                is_validator: role.is_authority(),
640
912
                enable_http_requests: false,
641
54864
                custom_extensions: move |_| vec![],
642
912
            })?
643
912
            .run(client.clone(), task_manager.spawn_handle())
644
912
            .boxed(),
645
        );
646
    }
647

            
648
912
    let mut command_sink = None;
649
912

            
650
912
    if role.is_authority() {
651
912
        let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
652
912
            task_manager.spawn_handle(),
653
912
            client.clone(),
654
912
            transaction_pool.clone(),
655
912
            prometheus_registry.as_ref(),
656
912
            telemetry.as_ref().map(|x| x.handle()),
657
912
        );
658

            
659
912
        let commands_stream: Box<
660
912
            dyn Stream<Item = EngineCommand<<Block as BlockT>::Hash>> + Send + Sync + Unpin,
661
912
        > = match sealing {
662
            Sealing::Instant => {
663
                Box::new(
664
                    // This bit cribbed from the implementation of instant seal.
665
                    transaction_pool.import_notification_stream().map(|_| {
666
                        EngineCommand::SealNewBlock {
667
                            create_empty: false,
668
                            finalize: false,
669
                            parent_hash: None,
670
                            sender: None,
671
                        }
672
                    }),
673
                )
674
            }
675
            Sealing::Manual => {
676
912
                let (sink, stream) = futures::channel::mpsc::channel(1000);
677
912
                // Keep a reference to the other end of the channel. It goes to the RPC.
678
912
                command_sink = Some(sink);
679
912
                Box::new(stream)
680
            }
681
            Sealing::Interval(millis) => Box::new(StreamExt::map(
682
                Timer::interval(Duration::from_millis(millis)),
683
                |_| EngineCommand::SealNewBlock {
684
                    create_empty: true,
685
                    finalize: true,
686
                    parent_hash: None,
687
                    sender: None,
688
                },
689
            )),
690
        };
691
912
        let keystore_clone = keystore.clone();
692
912

            
693
912
        let babe_config = babe_link.config();
694
912
        let babe_consensus_provider = BabeConsensusDataProvider::new(
695
912
            client.clone(),
696
912
            keystore,
697
912
            babe_link.epoch_changes().clone(),
698
912
            babe_config.authorities.clone(),
699
912
        )
700
912
        .map_err(|babe_error| {
701
            Error::Consensus(consensus_common::Error::Other(babe_error.into()))
702
912
        })?;
703

            
704
        // Need to clone it and store here to avoid moving of `client`
705
        // variable in closure below.
706
912
        let client_clone = client.clone();
707
912

            
708
912
        task_manager.spawn_essential_handle().spawn_blocking(
709
912
            "authorship_task",
710
912
            Some("block-authoring"),
711
912
            run_manual_seal(ManualSealParams {
712
912
                block_import,
713
912
                env: proposer,
714
912
                client: client.clone(),
715
912
                pool: transaction_pool.clone(),
716
912
                commands_stream,
717
912
                select_chain,
718
54864
                create_inherent_data_providers: move |parent, ()| {
719
54864
                    let client_clone = client_clone.clone();
720
54864
                    let keystore = keystore_clone.clone();
721
54864
                    let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
722
54864
                    let upward_mock_receiver = upward_mock_receiver.clone();
723
54864
                    async move {
724
54864

            
725
54864
                        let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
726
54864
                        // here we only take the last one
727
54864
                        let para_inherent_decider_messages: Vec<Vec<u8>> = downward_mock_para_inherent_receiver.drain().collect();
728
54864

            
729
54864
                        let upward_messages_receiver = upward_mock_receiver.clone();
730

            
731
                        // If there is a value to be updated, we update it
732
54864
                        if let Some(value) = para_inherent_decider_messages.last() {
733
48
                            client_clone
734
48
                            .insert_aux(
735
48
                                &[(PARA_INHERENT_SELECTOR_AUX_KEY, value.as_slice())],
736
48
                                &[],
737
48
                            )
738
48
                            .expect("Should be able to write to aux storage; qed");
739
54816
                        }
740

            
741
54864
                        let parachain = MockParachainsInherentDataProvider::new(
742
54864
                            client_clone.clone(),
743
54864
                            parent,
744
54864
                            keystore,
745
54864
                            upward_messages_receiver,
746
54864
                        );
747
54864

            
748
54864
                        let timestamp = get_next_timestamp(client_clone, slot_duration);
749
54864

            
750
54864
                        let slot =
751
54864
                            sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
752
54864
                                *timestamp,
753
54864
                                slot_duration,
754
54864
                            );
755
54864

            
756
54864
                        Ok((slot, timestamp, parachain))
757
54864
                    }
758
54864
                },
759
912
                consensus_data_provider: Some(Box::new(babe_consensus_provider)),
760
912
            }),
761
912
        );
762
912
    }
763

            
764
912
    let dev_rpc = if role.clone().is_authority() {
765
912
        Some(DevRpc {
766
912
            mock_para_inherent_channel: downward_mock_para_inherent_sender,
767
912
            upward_message_channel: upward_mock_sender,
768
912
        })
769
    } else {
770
        None
771
    };
772

            
773
912
    let rpc_extensions_builder = {
774
912
        let client = client.clone();
775
912
        let transaction_pool = transaction_pool.clone();
776

            
777
        move |_subscription_executor: polkadot_rpc::SubscriptionTaskExecutor|
778
1824
            -> Result<RpcExtension, service::Error> {
779
1824
            let deps = DevDeps {
780
1824
                client: client.clone(),
781
1824
                pool: transaction_pool.clone(),
782
1824
                command_sink: command_sink.clone(),
783
1824
                dev_rpc: dev_rpc.clone(),
784
1824
            };
785
1824

            
786
1824
            create_dev_rpc_extension(deps).map_err(Into::into)
787
1824
        }
788
    };
789

            
790
912
    let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
791
912
        config,
792
912
        backend: backend.clone(),
793
912
        client: client.clone(),
794
912
        keystore: keystore_container.keystore(),
795
912
        network: network.clone(),
796
912
        sync_service: sync_service.clone(),
797
912
        rpc_builder: Box::new(rpc_extensions_builder),
798
912
        transaction_pool: transaction_pool.clone(),
799
912
        task_manager: &mut task_manager,
800
912
        system_rpc_tx,
801
912
        tx_handler_controller,
802
912
        telemetry: telemetry.as_mut(),
803
912
    })?;
804

            
805
912
    network_starter.start_network();
806
912

            
807
912
    Ok(NewFull {
808
912
        task_manager,
809
912
        client,
810
912
        overseer_handle: None,
811
912
        network,
812
912
        sync_service,
813
912
        rpc_handlers,
814
912
        backend,
815
912
    })
816
912
}
817

            
818
912
fn new_partial<ChainSelection>(
819
912
    config: &mut Configuration,
820
912
    Basics {
821
912
        task_manager,
822
912
        backend,
823
912
        client,
824
912
        keystore_container,
825
912
        telemetry,
826
912
    }: Basics,
827
912
    select_chain: ChainSelection,
828
912
) -> Result<
829
912
    service::PartialComponents<
830
912
        FullClient,
831
912
        FullBackend,
832
912
        ChainSelection,
833
912
        sc_consensus::DefaultImportQueue<Block>,
834
912
        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
835
912
        (
836
912
            BabeBlockImport<Block, FullClient, Arc<FullClient>>,
837
912
            BabeLink<Block>,
838
912
            SlotDuration,
839
912
            Option<Telemetry>,
840
912
        ),
841
912
    >,
842
912
    Error,
843
912
>
844
912
where
845
912
    ChainSelection: 'static + SelectChain<Block>,
846
912
{
847
912
    let transaction_pool = sc_transaction_pool::Builder::new(
848
912
        task_manager.spawn_essential_handle(),
849
912
        client.clone(),
850
912
        config.role.is_authority().into(),
851
912
    )
852
912
    .with_options(config.transaction_pool.clone())
853
912
    .with_prometheus(config.prometheus_registry())
854
912
    .build();
855

            
856
    // Create babe block import queue; this is required to have correct epoch data
857
    // available for manual seal to produce block
858
912
    let babe_config = babe::configuration(&*client)?;
859
912
    let (babe_block_import, babe_link) =
860
912
        babe::block_import(babe_config.clone(), client.clone(), client.clone())?;
861
912
    let slot_duration = babe_link.config().slot_duration();
862
912

            
863
912
    // Create manual seal block import with manual seal block import queue
864
912
    let import_queue = sc_consensus_manual_seal::import_queue(
865
912
        Box::new(babe_block_import.clone()),
866
912
        &task_manager.spawn_essential_handle(),
867
912
        config.prometheus_registry(),
868
912
    );
869
912

            
870
912
    Ok(service::PartialComponents {
871
912
        client,
872
912
        backend,
873
912
        task_manager,
874
912
        keystore_container,
875
912
        select_chain,
876
912
        import_queue,
877
912
        transaction_pool: transaction_pool.into(),
878
912
        other: (babe_block_import, babe_link, slot_duration, telemetry),
879
912
    })
880
912
}
881

            
882
912
fn new_partial_basics(
883
912
    config: &mut Configuration,
884
912
    telemetry_worker_handle: Option<TelemetryWorkerHandle>,
885
912
) -> Result<Basics, Error> {
886
912
    let telemetry = config
887
912
        .telemetry_endpoints
888
912
        .clone()
889
912
        .filter(|x| !x.is_empty())
890
912
        .map(move |endpoints| -> Result<_, telemetry::Error> {
891
            let (worker, mut worker_handle) = if let Some(worker_handle) = telemetry_worker_handle {
892
                (None, worker_handle)
893
            } else {
894
                let worker = TelemetryWorker::new(16)?;
895
                let worker_handle = worker.handle();
896
                (Some(worker), worker_handle)
897
            };
898
            let telemetry = worker_handle.new_telemetry(endpoints);
899
            Ok((worker, telemetry))
900
912
        })
901
912
        .transpose()?;
902

            
903
912
    let heap_pages = config
904
912
        .executor
905
912
        .default_heap_pages
906
912
        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
907
            extra_pages: h as u32,
908
912
        });
909
912

            
910
912
    let mut wasm_builder = WasmExecutor::builder()
911
912
        .with_execution_method(config.executor.wasm_method)
912
912
        .with_onchain_heap_alloc_strategy(heap_pages)
913
912
        .with_offchain_heap_alloc_strategy(heap_pages)
914
912
        .with_max_runtime_instances(config.executor.max_runtime_instances)
915
912
        .with_runtime_cache_size(config.executor.runtime_cache_size);
916
912
    if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
917
852
        wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
918
852
    }
919
912
    let executor = wasm_builder.build();
920

            
921
912
    let (client, backend, keystore_container, task_manager) =
922
912
        service::new_full_parts::<Block, RuntimeApi, _>(
923
912
            config,
924
912
            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
925
912
            executor,
926
912
        )?;
927
912
    let client = Arc::new(client);
928
912

            
929
912
    let telemetry = telemetry.map(|(worker, telemetry)| {
930
        if let Some(worker) = worker {
931
            task_manager.spawn_handle().spawn(
932
                "telemetry",
933
                Some("telemetry"),
934
                Box::pin(worker.run()),
935
            );
936
        }
937
        telemetry
938
912
    });
939
912

            
940
912
    Ok(Basics {
941
912
        task_manager,
942
912
        client,
943
912
        backend,
944
912
        keystore_container,
945
912
        telemetry,
946
912
    })
947
912
}
948

            
949
use {
950
    polkadot_primitives::{AvailabilityBitfield, UncheckedSigned, ValidatorId, ValidatorIndex},
951
    sp_keystore::Error as KeystoreError,
952
};
953
15708
fn keystore_sign<H: Encode, Payload: Encode>(
954
15708
    keystore: &KeystorePtr,
955
15708
    payload: Payload,
956
15708
    context: &SigningContext<H>,
957
15708
    validator_index: ValidatorIndex,
958
15708
    key: &ValidatorId,
959
15708
) -> Result<Option<UncheckedSigned<Payload>>, KeystoreError> {
960
15708
    let data = payload_data(&payload, context);
961
15708
    let signature = keystore
962
15708
        .sr25519_sign(ValidatorId::ID, key.as_ref(), &data)?
963
15708
        .map(|sig| UncheckedSigned::new(payload, validator_index, sig.into()));
964
15708
    Ok(signature)
965
15708
}
966

            
967
15708
fn payload_data<H: Encode, Payload: Encode>(
968
15708
    payload: &Payload,
969
15708
    context: &SigningContext<H>,
970
15708
) -> Vec<u8> {
971
15708
    // equivalent to (`real_payload`, context).encode()
972
15708
    let mut out = payload.encode_as();
973
15708
    out.extend(context.encode());
974
15708
    out
975
15708
}
976

            
977
/// Create an `AvailabilityBitfield` with size `total_cores`. The first `used_cores` set to true (occupied),
978
/// and the remaining to false (available).
979
8382
fn availability_bitvec(used_cores: usize, total_cores: usize) -> AvailabilityBitfield {
980
8382
    let mut bitfields = bitvec::bitvec![u8, bitvec::order::Lsb0; 0; 0];
981
31656
    for i in 0..total_cores {
982
31656
        if i < used_cores {
983
7914
            bitfields.push(true);
984
7914
        } else {
985
23742
            bitfields.push(false)
986
        }
987
    }
988

            
989
8382
    bitfields.into()
990
8382
}