1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Development Polkadot service. Adapted from `polkadot_service` crate
18
//! and removed un-necessary components which are not required in dev node.
19
//!
20
//! Following major changes are made:
21
//! 1. Removed beefy and grandpa notification service and request response protocols
22
//! 2. Removed support for parachains which also eliminated the need to start overseer and all other subsystems associated with collation + network request/response protocols for the same
23
//! 3. Removed support for hardware benchmarking
24
//! 4. Removed authority discovery service
25
//! 5. Removed spawning of beefy, grandpa and MMR worker
26
//! 6. Removed rpc extensions for beefy, grandpa and babe and added support for manual seal
27
//! 7. Removed beefy and grandpa block import from block import pipeline (Babe remains)
28
//! 8. Using manual seal import queue instead of babe import queue
29
//! 9. Started manual seal worker
30
//! 10. If amount of time passed between two block is less than slot duration, we emulate passing of time babe block import and runtime
31
//!     by incrementing timestamp by slot duration.
32

            
33
use {
34
    crate::dev_rpcs::{DevApiServer, DevRpc},
35
    async_io::Timer,
36
    babe::{BabeBlockImport, BabeLink},
37
    codec::{Decode, Encode},
38
    consensus_common::SelectChain,
39
    cumulus_primitives_core::ParaId,
40
    dancelight_runtime::RuntimeApi,
41
    futures::{Stream, StreamExt},
42
    jsonrpsee::RpcModule,
43
    manual_container_chains_exclusion_rpc::{
44
        ManualContainerChainsExclusion, ManualContainerChainsExclusionApiServer,
45
    },
46
    node_common::service::node_builder::Sealing,
47
    polkadot_core_primitives::{AccountId, Balance, Block, Hash, Nonce},
48
    polkadot_node_core_parachains_inherent::Error as InherentError,
49
    polkadot_overseer::Handle,
50
    polkadot_parachain_primitives::primitives::UpwardMessages,
51
    polkadot_primitives::{
52
        runtime_api::ParachainHost, BackedCandidate, CandidateCommitments, CandidateDescriptor,
53
        CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs,
54
        InherentData as ParachainsInherentData, OccupiedCoreAssumption, SigningContext,
55
        ValidityAttestation,
56
    },
57
    polkadot_rpc::RpcExtension,
58
    polkadot_service::{
59
        BlockT, Error, IdentifyVariant, NewFullParams, OverseerGen, SelectRelayChain,
60
    },
61
    sc_client_api::{AuxStore, Backend},
62
    sc_consensus_manual_seal::{
63
        consensus::babe::BabeConsensusDataProvider,
64
        rpc::{ManualSeal, ManualSealApiServer},
65
        run_manual_seal, EngineCommand, ManualSealParams,
66
    },
67
    sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY},
68
    sc_keystore::Keystore,
69
    sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool},
70
    service::{Configuration, KeystoreContainer, RpcHandlers, TaskManager},
71
    sp_api::ProvideRuntimeApi,
72
    sp_block_builder::BlockBuilder,
73
    sp_blockchain::{HeaderBackend, HeaderMetadata},
74
    sp_consensus_aura::{inherents::InherentType as AuraInherentType, AURA_ENGINE_ID},
75
    sp_consensus_babe::SlotDuration,
76
    sp_core::{ByteArray, Pair, H256},
77
    sp_keystore::KeystorePtr,
78
    sp_runtime::{traits::BlakeTwo256, DigestItem, RuntimeAppPublic},
79
    std::{cmp::max, ops::Add, sync::Arc, time::Duration},
80
    telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
81
};
82

            
83
// We use this key to store whether we want the para inherent mocker to be active
84
const PARA_INHERENT_SELECTOR_AUX_KEY: &[u8] = b"__DEV_PARA_INHERENT_SELECTOR";
85

            
86
const CONTAINER_CHAINS_EXCLUSION_AUX_KEY: &[u8] = b"__DEV_CONTAINER_CHAINS_EXCLUSION";
87

            
88
pub type FullBackend = service::TFullBackend<Block>;
89

            
90
pub type FullClient = service::TFullClient<
91
    Block,
92
    RuntimeApi,
93
    WasmExecutor<(
94
        sp_io::SubstrateHostFunctions,
95
        frame_benchmarking::benchmarking::HostFunctions,
96
    )>,
97
>;
98

            
99
pub struct NewFull {
100
    pub task_manager: TaskManager,
101
    pub client: Arc<FullClient>,
102
    pub overseer_handle: Option<Handle>,
103
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
104
    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
105
    pub rpc_handlers: RpcHandlers,
106
    pub backend: Arc<FullBackend>,
107
}
108

            
109
/// Custom Deps for dev Rpc extension
110
struct DevDeps<C, P> {
111
    /// The client instance to use.
112
    pub client: Arc<C>,
113
    /// Transaction pool instance.
114
    pub pool: Arc<P>,
115
    /// Manual seal command sink
116
    pub command_sink: Option<futures::channel::mpsc::Sender<EngineCommand<Hash>>>,
117
    /// Dev rpcs
118
    pub dev_rpc: Option<DevRpc>,
119
    /// Channels for manually excluding container chains from producing blocks
120
    pub container_chain_exclusion_sender: Option<flume::Sender<Vec<ParaId>>>,
121
}
122

            
123
2496
fn create_dev_rpc_extension<C, P>(
124
2496
    DevDeps {
125
2496
        client,
126
2496
        pool,
127
2496
        command_sink: maybe_command_sink,
128
2496
        dev_rpc: maybe_dev_rpc,
129
2496
        container_chain_exclusion_sender: maybe_container_chain_exclusion_sender,
130
2496
    }: DevDeps<C, P>,
131
2496
) -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>>
132
2496
where
133
2496
    C: ProvideRuntimeApi<Block>
134
2496
        + HeaderBackend<Block>
135
2496
        + AuxStore
136
2496
        + HeaderMetadata<Block, Error = sp_blockchain::Error>
137
2496
        + Send
138
2496
        + Sync
139
2496
        + 'static,
140
2496
    C::Api: substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
141
2496
    C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>,
142
2496
    C::Api: BlockBuilder<Block>,
143
2496
    P: TransactionPool + Sync + Send + 'static,
144
{
145
    use {
146
        pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer},
147
        substrate_frame_rpc_system::{System, SystemApiServer},
148
    };
149

            
150
2496
    let mut io = RpcModule::new(());
151
2496
    io.merge(System::new(client.clone(), pool.clone()).into_rpc())?;
152
2496
    io.merge(TransactionPayment::new(client.clone()).into_rpc())?;
153

            
154
2496
    if let Some(command_sink) = maybe_command_sink {
155
2496
        io.merge(ManualSeal::new(command_sink).into_rpc())?;
156
    }
157

            
158
2496
    if let Some(dev_rpc_data) = maybe_dev_rpc {
159
2496
        io.merge(dev_rpc_data.into_rpc())?;
160
    }
161

            
162
2496
    if let Some(container_chain_exclusion_message_channel) = maybe_container_chain_exclusion_sender
163
    {
164
2496
        io.merge(
165
2496
            ManualContainerChainsExclusion {
166
2496
                container_chain_exclusion_message_channel,
167
2496
            }
168
2496
            .into_rpc(),
169
2496
        )?;
170
    }
171

            
172
2496
    Ok(io)
173
2496
}
174

            
175
/// We use EmptyParachainsInherentDataProvider to insert an empty parachain inherent in the block
176
/// to satisfy runtime
177
struct EmptyParachainsInherentDataProvider;
178

            
179
/// Copied from polkadot service just so that this code retains same structure as
180
/// polkadot_service crate.
181
struct Basics {
182
    task_manager: TaskManager,
183
    client: Arc<FullClient>,
184
    backend: Arc<FullBackend>,
185
    keystore_container: KeystoreContainer,
186
    telemetry: Option<Telemetry>,
187
}
188

            
189
impl EmptyParachainsInherentDataProvider {
190
67374
    pub async fn create<C: HeaderBackend<Block>>(
191
67374
        client: Arc<C>,
192
67374
        parent: Hash,
193
67374
    ) -> Result<ParachainsInherentData, InherentError> {
194
67374
        let parent_header = match client.header(parent) {
195
67374
            Ok(Some(h)) => h,
196
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
197
            Err(err) => return Err(InherentError::Blockchain(err)),
198
        };
199

            
200
67374
        Ok(ParachainsInherentData {
201
67374
            bitfields: Vec::new(),
202
67374
            backed_candidates: Vec::new(),
203
67374
            disputes: Vec::new(),
204
67374
            parent_header,
205
67374
        })
206
67374
    }
207
}
208

            
209
/// Creates new development full node with manual seal
210
1248
pub fn build_full<OverseerGenerator: OverseerGen>(
211
1248
    sealing: Sealing,
212
1248
    config: Configuration,
213
1248
    mut params: NewFullParams<OverseerGenerator>,
214
1248
) -> Result<NewFull, Error> {
215
1248
    let is_polkadot = config.chain_spec.is_polkadot();
216

            
217
1248
    params.overseer_message_channel_capacity_override = params
218
1248
        .overseer_message_channel_capacity_override
219
1248
        .map(move |capacity| {
220
            if is_polkadot {
221
                gum::warn!("Channel capacity should _never_ be tampered with on polkadot!");
222
            }
223
            capacity
224
        });
225

            
226
1248
    match config.network.network_backend {
227
        sc_network::config::NetworkBackendType::Libp2p => {
228
            new_full::<_, sc_network::NetworkWorker<Block, Hash>>(sealing, config, params)
229
        }
230
        sc_network::config::NetworkBackendType::Litep2p => {
231
1248
            new_full::<_, sc_network::Litep2pNetworkBackend>(sealing, config, params)
232
        }
233
    }
234
1248
}
235

            
236
/// We use MockParachainsInherentDataProvider to insert an parachain inherent with mocked
237
/// candidates
238
/// We detect whether any of the keys in our keystore is assigned to a core and provide
239
/// a mocked candidate in such core
240
struct MockParachainsInherentDataProvider<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> {
241
    pub client: Arc<C>,
242
    pub parent: Hash,
243
    pub keystore: KeystorePtr,
244
    pub upward_messages_receiver: flume::Receiver<Vec<u8>>,
245
    pub container_chain_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
246
}
247

            
248
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> MockParachainsInherentDataProvider<C>
249
where
250
    C::Api: ParachainHost<Block>,
251
    C: AuxStore,
252
{
253
83538
    pub fn new(
254
83538
        client: Arc<C>,
255
83538
        parent: Hash,
256
83538
        keystore: KeystorePtr,
257
83538
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
258
83538
        container_chain_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
259
83538
    ) -> Self {
260
83538
        MockParachainsInherentDataProvider {
261
83538
            client,
262
83538
            parent,
263
83538
            keystore,
264
83538
            upward_messages_receiver,
265
83538
            container_chain_exclusion_receiver,
266
83538
        }
267
83538
    }
268

            
269
16164
    pub async fn create(
270
16164
        client: Arc<C>,
271
16164
        parent: Hash,
272
16164
        keystore: KeystorePtr,
273
16164
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
274
16164
        container_chains_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
275
16164
    ) -> Result<ParachainsInherentData, InherentError> {
276
16164
        let parent_header = match client.header(parent) {
277
16164
            Ok(Some(h)) => h,
278
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
279
            Err(err) => return Err(InherentError::Blockchain(err)),
280
        };
281

            
282
        // Strategy:
283
        // we usually have 1 validator per core, and we usually run with --alice
284
        // the idea is that at least alice will be assigned to one core
285
        // if we find in the keystore the validator attached to a particular core,
286
        // we generate a signature for the parachain assigned to that core
287
        // To retrieve the validator keys, cal runtime api:
288

            
289
        // this following piece of code predicts whether the validator is assigned to a particular
290
        // core where a candidate for a parachain needs to be created
291
16164
        let runtime_api = client.runtime_api();
292

            
293
        // we get all validators
294

            
295
        // we get the current claim queue to know core availability
296
16164
        let claim_queue = runtime_api.claim_queue(parent).unwrap();
297

            
298
        // we get the validator groups
299
16164
        let (groups, rotation_info) = runtime_api.validator_groups(parent).unwrap();
300

            
301
        // we calculate rotation since start, which will define the core assignation
302
        // to validators
303
16164
        let rotations_since_session_start = (parent_header.number
304
16164
            - rotation_info.session_start_block)
305
16164
            / rotation_info.group_rotation_frequency;
306

            
307
        // Get all the available keys in the keystore
308
16164
        let available_keys = keystore
309
16164
            .keys(polkadot_primitives::PARACHAIN_KEY_TYPE_ID)
310
16164
            .unwrap();
311

            
312
        // create a slot number identical to the parent block num
313
16164
        let slot_number = AuraInherentType::from(u64::from(parent_header.number));
314

            
315
        // create a mocked header
316
16164
        let parachain_mocked_header = sp_runtime::generic::Header::<u32, BlakeTwo256> {
317
16164
            parent_hash: Default::default(),
318
16164
            number: parent_header.number,
319
16164
            state_root: Default::default(),
320
16164
            extrinsics_root: Default::default(),
321
16164
            digest: sp_runtime::generic::Digest {
322
16164
                logs: vec![DigestItem::PreRuntime(AURA_ENGINE_ID, slot_number.encode())],
323
16164
            },
324
16164
        };
325

            
326
        // retrieve availability cores
327
16164
        let availability_cores = runtime_api.availability_cores(parent).unwrap();
328

            
329
        // retrieve current session_idx
330
16164
        let session_idx = runtime_api.session_index_for_child(parent).unwrap();
331

            
332
        // retrieve all validators
333
16164
        let all_validators = runtime_api.validators(parent).unwrap();
334

            
335
        // construct full availability bitvec
336
16164
        let availability_bitvec = availability_bitvec(1, availability_cores.len());
337

            
338
16164
        let signature_ctx = SigningContext {
339
16164
            parent_hash: parent,
340
16164
            session_index: session_idx,
341
16164
        };
342

            
343
        // we generate the availability bitfield sigs
344
        // TODO: here we assume all validator keys are able to sign with our keystore
345
        // we need to make sure the key is there before we try to sign
346
        // this is mostly to indicate that the erasure coding chunks where received by all val
347
16164
        let bitfields: Vec<UncheckedSigned<AvailabilityBitfield>> = all_validators
348
16164
            .iter()
349
16164
            .enumerate()
350
16164
            .map(|(i, public)| {
351
16164
                keystore_sign(
352
16164
                    &keystore,
353
16164
                    availability_bitvec.clone(),
354
16164
                    &signature_ctx,
355
16164
                    ValidatorIndex(i as u32),
356
16164
                    public,
357
                )
358
16164
                .unwrap()
359
16164
                .unwrap()
360
16164
            })
361
16164
            .collect();
362

            
363
        // generate a random collator pair
364
16164
        let collator_pair = CollatorPair::generate().0;
365
16164
        let mut backed_cand: Vec<BackedCandidate<H256>> = vec![];
366

            
367
16164
        let container_chains_exclusion_messages: Vec<Vec<ParaId>> =
368
16164
            container_chains_exclusion_receiver.drain().collect();
369
        // If there is a new set of excluded container chains, we update it
370
16164
        if let Some(mock_excluded_container_chains) = container_chains_exclusion_messages.last() {
371
            client
372
                .insert_aux(
373
                    &[(
374
                        CONTAINER_CHAINS_EXCLUSION_AUX_KEY,
375
                        mock_excluded_container_chains.encode().as_slice(),
376
                    )],
377
                    &[],
378
                )
379
                .expect("Should be able to write to aux storage; qed");
380
16164
        }
381
16164
        let new_excluded_container_chains_value = client
382
16164
            .get_aux(CONTAINER_CHAINS_EXCLUSION_AUX_KEY)
383
16164
            .expect("Should be able to query aux storage; qed")
384
16164
            .unwrap_or(Vec::<ParaId>::new().encode());
385
16164
        let mock_excluded_container_chains: Vec<ParaId> =
386
16164
            Decode::decode(&mut new_excluded_container_chains_value.as_slice())
387
16164
                .expect("Vector non-decodable");
388

            
389
        // iterate over every core|para pair
390
45144
        for (core, para) in claim_queue {
391
            // allows preventing container chains from producing blocks in dev mode
392
28980
            let mut para = para.clone();
393
57936
            para.retain(|x| !mock_excluded_container_chains.contains(x));
394
            // check which group is assigned to each core
395
28980
            let group_assigned_to_core =
396
28980
                core.0 + rotations_since_session_start % groups.len() as u32;
397
            // check validator indices associated to the core
398
28980
            let indices_associated_to_core = groups.get(group_assigned_to_core as usize).unwrap();
399
43638
            for index in indices_associated_to_core {
400
                // fetch validator keys
401
14658
                let validator_keys_to_find = all_validators.get(index.0 as usize).unwrap();
402
                // Iterate keys until we find an eligible one, or run out of candidates.
403
29316
                for type_public_pair in &available_keys {
404
14658
                    if let Ok(validator) =
405
14658
                        polkadot_primitives::ValidatorId::from_slice(type_public_pair)
406
                    {
407
                        // if we find the validator in keystore, we try to create a backed cand
408
14658
                        if validator_keys_to_find == &validator {
409
14658
                            // we work with the previous included data
410
14658
                            let mut persisted_validation_data = runtime_api
411
14658
                                .persisted_validation_data(
412
14658
                                    parent,
413
14658
                                    para[0],
414
14658
                                    OccupiedCoreAssumption::Included,
415
14658
                                )
416
14658
                                .unwrap()
417
14658
                                .unwrap();
418
14658

            
419
14658
                            // if we dont do this we have a backed candidate every 2 blocks
420
14658
                            // we want
421
14658
                            persisted_validation_data.relay_parent_storage_root =
422
14658
                                parent_header.state_root;
423
14658

            
424
14658
                            let persisted_validation_data_hash = persisted_validation_data.hash();
425
14658
                            // retrieve the validation code hash
426
14658
                            let validation_code_hash = runtime_api
427
14658
                                .validation_code_hash(
428
14658
                                    parent,
429
14658
                                    para[0],
430
14658
                                    OccupiedCoreAssumption::Included,
431
14658
                                )
432
14658
                                .unwrap()
433
14658
                                .unwrap();
434
14658
                            let pov_hash = Default::default();
435
14658
                            // generate a fake collator signature
436
14658
                            let payload = polkadot_primitives::collator_signature_payload(
437
14658
                                &parent,
438
14658
                                &para[0],
439
14658
                                &persisted_validation_data_hash,
440
14658
                                &pov_hash,
441
14658
                                &validation_code_hash,
442
14658
                            );
443
14658
                            let collator_signature = collator_pair.sign(&payload);
444
14658

            
445
14658
                            let upward_messages = UpwardMessages::try_from(
446
14658
                                upward_messages_receiver.drain().collect::<Vec<_>>(),
447
14658
                            )
448
14658
                            .expect("create upward messages from raw messages");
449
14658

            
450
14658
                            // generate a candidate with most of the values mocked
451
14658
                            let candidate = CommittedCandidateReceipt::<H256> {
452
14658
                                descriptor: CandidateDescriptor::<H256> {
453
14658
                                    para_id: para[0],
454
14658
                                    relay_parent: parent,
455
14658
                                    collator: collator_pair.public(),
456
14658
                                    persisted_validation_data_hash,
457
14658
                                    pov_hash,
458
14658
                                    erasure_root: Default::default(),
459
14658
                                    signature: collator_signature,
460
14658
                                    para_head: parachain_mocked_header.clone().hash(),
461
14658
                                    validation_code_hash,
462
14658
                                },
463
14658
                                commitments: CandidateCommitments::<u32> {
464
14658
                                    upward_messages,
465
14658
                                    horizontal_messages: Default::default(),
466
14658
                                    new_validation_code: None,
467
14658
                                    head_data: parachain_mocked_header.clone().encode().into(),
468
14658
                                    processed_downward_messages: 0,
469
14658
                                    hrmp_watermark: parent_header.number,
470
14658
                                },
471
14658
                            };
472
14658
                            let candidate_hash = candidate.hash();
473
14658
                            let payload = CompactStatement::Valid(candidate_hash);
474
14658

            
475
14658
                            let signature_ctx = SigningContext {
476
14658
                                parent_hash: parent,
477
14658
                                session_index: session_idx,
478
14658
                            };
479
14658

            
480
14658
                            // sign the candidate with the validator key
481
14658
                            let signature = keystore_sign(
482
14658
                                &keystore,
483
14658
                                payload,
484
14658
                                &signature_ctx,
485
14658
                                *index,
486
14658
                                &validator,
487
14658
                            )
488
14658
                            .unwrap()
489
14658
                            .unwrap()
490
14658
                            .benchmark_signature();
491
14658

            
492
14658
                            // construct a validity vote
493
14658
                            let validity_votes = vec![ValidityAttestation::Explicit(signature)];
494
14658

            
495
14658
                            // push the candidate
496
14658
                            backed_cand.push(BackedCandidate::<H256>::new(
497
14658
                                candidate,
498
14658
                                validity_votes.clone(),
499
14658
                                bitvec::bitvec![u8, bitvec::order::Lsb0; 1; indices_associated_to_core.len()],
500
14658
                                core,
501
14658
                            ));
502
14658
                        }
503
                    }
504
                }
505
            }
506
        }
507

            
508
16164
        Ok(ParachainsInherentData {
509
16164
            bitfields,
510
16164
            backed_candidates: backed_cand,
511
16164
            disputes: Vec::new(),
512
16164
            parent_header,
513
16164
        })
514
16164
    }
515
}
516

            
517
#[async_trait::async_trait]
518
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> sp_inherents::InherentDataProvider
519
    for MockParachainsInherentDataProvider<C>
520
where
521
    C::Api: ParachainHost<Block>,
522
    C: AuxStore,
523
{
524
    async fn provide_inherent_data(
525
        &self,
526
        dst_inherent_data: &mut sp_inherents::InherentData,
527
167076
    ) -> Result<(), sp_inherents::Error> {
528
        // fetch whether the para inherent selector has been set
529
83538
        let maybe_para_selector = self
530
83538
            .client
531
83538
            .get_aux(PARA_INHERENT_SELECTOR_AUX_KEY)
532
83538
            .expect("Should be able to query aux storage; qed");
533

            
534
83538
        let inherent_data = {
535
83538
            if let Some(aux) = maybe_para_selector {
536
                // if it is true, the candidates need to be mocked
537
                // else, we output the empty parachain inherent data provider
538
16164
                if aux == true.encode() {
539
16164
                    MockParachainsInherentDataProvider::create(
540
16164
                        self.client.clone(),
541
16164
                        self.parent,
542
16164
                        self.keystore.clone(),
543
16164
                        self.upward_messages_receiver.clone(),
544
16164
                        self.container_chain_exclusion_receiver.clone(),
545
16164
                    )
546
16164
                    .await
547
16164
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
548
                } else {
549
                    EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
550
                        .await
551
                        .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
552
                }
553
            } else {
554
67374
                EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
555
67374
                    .await
556
67374
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
557
            }
558
        };
559

            
560
83538
        dst_inherent_data.put_data(
561
            polkadot_primitives::PARACHAINS_INHERENT_IDENTIFIER,
562
83538
            &inherent_data,
563
        )
564
167076
    }
565

            
566
    async fn try_handle_error(
567
        &self,
568
        _identifier: &sp_inherents::InherentIdentifier,
569
        _error: &[u8],
570
    ) -> Option<Result<(), sp_inherents::Error>> {
571
        // Inherent isn't checked and can not return any error
572
        None
573
    }
574
}
575

            
576
/// We store past timestamp we created in the aux storage, which enable us to return timestamp which is increased by
577
/// slot duration from previous timestamp or current timestamp if in reality more time is passed.
578
83538
fn get_next_timestamp(
579
83538
    client: Arc<FullClient>,
580
83538
    slot_duration: SlotDuration,
581
83538
) -> sp_timestamp::InherentDataProvider {
582
    const TIMESTAMP_AUX_KEY: &[u8] = b"__DEV_TIMESTAMP";
583

            
584
83538
    let maybe_last_timestamp = client
585
83538
        .get_aux(TIMESTAMP_AUX_KEY)
586
83538
        .expect("Should be able to query aux storage; qed");
587
83538
    if let Some(last_timestamp) = maybe_last_timestamp {
588
82488
        let last_inherent_data = sp_timestamp::InherentType::decode(&mut last_timestamp.as_slice())
589
82488
            .expect("Timestamp data must be decoded; qed");
590
82488
        let new_inherent_data: sp_timestamp::InherentType = max(
591
82488
            last_inherent_data.add(slot_duration.as_millis()),
592
82488
            sp_timestamp::InherentType::current(),
593
        );
594
82488
        client
595
82488
            .insert_aux(
596
82488
                &[(TIMESTAMP_AUX_KEY, new_inherent_data.encode().as_slice())],
597
82488
                &[],
598
82488
            )
599
82488
            .expect("Should be able to write to aux storage; qed");
600
82488
        sp_timestamp::InherentDataProvider::new(new_inherent_data)
601
    } else {
602
1050
        let current_timestamp = sp_timestamp::InherentType::current();
603
1050
        client
604
1050
            .insert_aux(
605
1050
                &[(TIMESTAMP_AUX_KEY, current_timestamp.encode().as_slice())],
606
1050
                &[],
607
1050
            )
608
1050
            .expect("Should be able to write to aux storage; qed");
609
1050
        sp_timestamp::InherentDataProvider::new(current_timestamp)
610
    }
611
83538
}
612

            
613
1248
fn new_full<
614
1248
    OverseerGenerator: OverseerGen,
615
1248
    Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>,
616
1248
>(
617
1248
    sealing: Sealing,
618
1248
    mut config: Configuration,
619
1248
    NewFullParams {
620
1248
        telemetry_worker_handle,
621
1248
        ..
622
1248
    }: NewFullParams<OverseerGenerator>,
623
1248
) -> Result<NewFull, Error> {
624
1248
    let role = config.role;
625

            
626
1248
    let basics = new_partial_basics(&mut config, telemetry_worker_handle)?;
627

            
628
1248
    let prometheus_registry = config.prometheus_registry().cloned();
629

            
630
1248
    let keystore = basics.keystore_container.local_keystore();
631

            
632
1248
    let select_chain = SelectRelayChain::new_longest_chain(basics.backend.clone());
633

            
634
1248
    let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
635
1248
        client,
636
1248
        backend,
637
1248
        mut task_manager,
638
1248
        keystore_container,
639
1248
        select_chain,
640
1248
        import_queue,
641
1248
        transaction_pool,
642
1248
        other: (block_import, babe_link, slot_duration, mut telemetry),
643
1248
    } = new_partial::<SelectRelayChain<_>>(&mut config, basics, select_chain)?;
644

            
645
1248
    let metrics = Network::register_notification_metrics(
646
1248
        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
647
    );
648

            
649
1248
    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
650
1248
        &config.network,
651
1248
        prometheus_registry.clone(),
652
    );
653

            
654
    // Create channels for mocked parachain candidates.
655
1248
    let (downward_mock_para_inherent_sender, downward_mock_para_inherent_receiver) =
656
1248
        flume::bounded::<Vec<u8>>(100);
657

            
658
1248
    let (upward_mock_sender, upward_mock_receiver) = flume::bounded::<Vec<u8>>(100);
659

            
660
1248
    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
661
1248
        service::build_network(service::BuildNetworkParams {
662
1248
            config: &config,
663
1248
            net_config,
664
1248
            client: client.clone(),
665
1248
            transaction_pool: transaction_pool.clone(),
666
1248
            spawn_handle: task_manager.spawn_handle(),
667
1248
            import_queue,
668
1248
            block_announce_validator_builder: None,
669
1248
            warp_sync_config: None,
670
1248
            block_relay: None,
671
1248
            metrics,
672
1248
        })?;
673

            
674
1248
    if config.offchain_worker.enabled {
675
        use futures::FutureExt;
676

            
677
1248
        task_manager.spawn_handle().spawn(
678
            "offchain-workers-runner",
679
            "offchain-work",
680
1248
            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
681
1248
                runtime_api_provider: client.clone(),
682
1248
                keystore: Some(keystore_container.keystore()),
683
1248
                offchain_db: backend.offchain_storage(),
684
1248
                transaction_pool: Some(OffchainTransactionPoolFactory::new(
685
1248
                    transaction_pool.clone(),
686
1248
                )),
687
1248
                network_provider: Arc::new(network.clone()),
688
1248
                is_validator: role.is_authority(),
689
                enable_http_requests: false,
690
                custom_extensions: move |_| vec![],
691
            })?
692
1248
            .run(client.clone(), task_manager.spawn_handle())
693
1248
            .boxed(),
694
        );
695
    }
696

            
697
1248
    let mut command_sink = None;
698
1248
    let mut container_chain_exclusion_sender = None;
699

            
700
1248
    if role.is_authority() {
701
1248
        let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
702
1248
            task_manager.spawn_handle(),
703
1248
            client.clone(),
704
1248
            transaction_pool.clone(),
705
1248
            prometheus_registry.as_ref(),
706
1248
            telemetry.as_ref().map(|x| x.handle()),
707
        );
708

            
709
1248
        let commands_stream: Box<
710
1248
            dyn Stream<Item = EngineCommand<<Block as BlockT>::Hash>> + Send + Sync + Unpin,
711
1248
        > = match sealing {
712
            Sealing::Instant => {
713
                Box::new(
714
                    // This bit cribbed from the implementation of instant seal.
715
                    transaction_pool.import_notification_stream().map(|_| {
716
                        EngineCommand::SealNewBlock {
717
                            create_empty: false,
718
                            finalize: false,
719
                            parent_hash: None,
720
                            sender: None,
721
                        }
722
                    }),
723
                )
724
            }
725
            Sealing::Manual => {
726
1248
                let (sink, stream) = futures::channel::mpsc::channel(1000);
727
                // Keep a reference to the other end of the channel. It goes to the RPC.
728
1248
                command_sink = Some(sink);
729
1248
                Box::new(stream)
730
            }
731
            Sealing::Interval(millis) => Box::new(StreamExt::map(
732
                Timer::interval(Duration::from_millis(millis)),
733
                |_| EngineCommand::SealNewBlock {
734
                    create_empty: true,
735
                    finalize: true,
736
                    parent_hash: None,
737
                    sender: None,
738
                },
739
            )),
740
        };
741
1248
        let keystore_clone = keystore.clone();
742

            
743
1248
        let babe_config = babe_link.config();
744
1248
        let babe_consensus_provider = BabeConsensusDataProvider::new(
745
1248
            client.clone(),
746
1248
            keystore,
747
1248
            babe_link.epoch_changes().clone(),
748
1248
            babe_config.authorities.clone(),
749
        )
750
1248
        .map_err(|babe_error| {
751
            Error::Consensus(consensus_common::Error::Other(babe_error.into()))
752
        })?;
753

            
754
1248
        let (mock_container_chains_exclusion_sender, mock_container_chains_exclusion_receiver) =
755
1248
            flume::bounded::<Vec<ParaId>>(100);
756
1248
        container_chain_exclusion_sender = Some(mock_container_chains_exclusion_sender);
757

            
758
        // Need to clone it and store here to avoid moving of `client`
759
        // variable in closure below.
760
1248
        let client_clone = client.clone();
761

            
762
1248
        task_manager.spawn_essential_handle().spawn_blocking(
763
            "authorship_task",
764
1248
            Some("block-authoring"),
765
1248
            run_manual_seal(ManualSealParams {
766
1248
                block_import,
767
1248
                env: proposer,
768
1248
                client: client.clone(),
769
1248
                pool: transaction_pool.clone(),
770
1248
                commands_stream,
771
1248
                select_chain,
772
83538
                create_inherent_data_providers: move |parent, ()| {
773
83538
                    let client_clone = client_clone.clone();
774
83538
                    let keystore = keystore_clone.clone();
775
83538
                    let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
776
83538
                    let upward_mock_receiver = upward_mock_receiver.clone();
777
83538
                    let mock_container_chains_exclusion_receiver = mock_container_chains_exclusion_receiver.clone();
778
83538
                    async move {
779

            
780
83538
                        let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
781
                        // here we only take the last one
782
83538
                        let para_inherent_decider_messages: Vec<Vec<u8>> = downward_mock_para_inherent_receiver.drain().collect();
783

            
784
83538
                        let upward_messages_receiver = upward_mock_receiver.clone();
785

            
786
                        // If there is a value to be updated, we update it
787
83538
                        if let Some(value) = para_inherent_decider_messages.last() {
788
78
                            client_clone
789
78
                            .insert_aux(
790
78
                                &[(PARA_INHERENT_SELECTOR_AUX_KEY, value.as_slice())],
791
78
                                &[],
792
78
                            )
793
78
                            .expect("Should be able to write to aux storage; qed");
794
83460
                        }
795

            
796
83538
                        let parachain = MockParachainsInherentDataProvider::new(
797
83538
                            client_clone.clone(),
798
83538
                            parent,
799
83538
                            keystore,
800
83538
                            upward_messages_receiver,
801
83538
                            mock_container_chains_exclusion_receiver
802
                        );
803

            
804
83538
                        let timestamp = get_next_timestamp(client_clone, slot_duration);
805

            
806
83538
                        let slot =
807
83538
                            sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
808
83538
                                *timestamp,
809
83538
                                slot_duration,
810
                            );
811

            
812
83538
                        Ok((slot, timestamp, parachain))
813
83538
                    }
814
83538
                },
815
1248
                consensus_data_provider: Some(Box::new(babe_consensus_provider)),
816
            }),
817
        );
818
    }
819

            
820
1248
    let dev_rpc = if role.clone().is_authority() {
821
1248
        Some(DevRpc {
822
1248
            mock_para_inherent_channel: downward_mock_para_inherent_sender,
823
1248
            upward_message_channel: upward_mock_sender,
824
1248
        })
825
    } else {
826
        None
827
    };
828

            
829
1248
    let rpc_extensions_builder = {
830
1248
        let client = client.clone();
831
1248
        let transaction_pool = transaction_pool.clone();
832

            
833
        move |_subscription_executor: polkadot_rpc::SubscriptionTaskExecutor|
834
2496
            -> Result<RpcExtension, service::Error> {
835
2496
            let deps = DevDeps {
836
2496
                client: client.clone(),
837
2496
                pool: transaction_pool.clone(),
838
2496
                command_sink: command_sink.clone(),
839
2496
                dev_rpc: dev_rpc.clone(),
840
2496
                container_chain_exclusion_sender: container_chain_exclusion_sender.clone(),
841
2496
            };
842

            
843
2496
            create_dev_rpc_extension(deps).map_err(Into::into)
844
2496
        }
845
    };
846

            
847
1248
    let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
848
1248
        config,
849
1248
        backend: backend.clone(),
850
1248
        client: client.clone(),
851
1248
        keystore: keystore_container.keystore(),
852
1248
        network: network.clone(),
853
1248
        sync_service: sync_service.clone(),
854
1248
        rpc_builder: Box::new(rpc_extensions_builder),
855
1248
        transaction_pool: transaction_pool.clone(),
856
1248
        task_manager: &mut task_manager,
857
1248
        system_rpc_tx,
858
1248
        tx_handler_controller,
859
1248
        telemetry: telemetry.as_mut(),
860
1248
    })?;
861

            
862
1248
    Ok(NewFull {
863
1248
        task_manager,
864
1248
        client,
865
1248
        overseer_handle: None,
866
1248
        network,
867
1248
        sync_service,
868
1248
        rpc_handlers,
869
1248
        backend,
870
1248
    })
871
1248
}
872

            
873
1248
fn new_partial<ChainSelection>(
874
1248
    config: &mut Configuration,
875
1248
    Basics {
876
1248
        task_manager,
877
1248
        backend,
878
1248
        client,
879
1248
        keystore_container,
880
1248
        telemetry,
881
1248
    }: Basics,
882
1248
    select_chain: ChainSelection,
883
1248
) -> Result<
884
1248
    service::PartialComponents<
885
1248
        FullClient,
886
1248
        FullBackend,
887
1248
        ChainSelection,
888
1248
        sc_consensus::DefaultImportQueue<Block>,
889
1248
        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
890
1248
        (
891
1248
            BabeBlockImport<Block, FullClient, Arc<FullClient>>,
892
1248
            BabeLink<Block>,
893
1248
            SlotDuration,
894
1248
            Option<Telemetry>,
895
1248
        ),
896
1248
    >,
897
1248
    Error,
898
1248
>
899
1248
where
900
1248
    ChainSelection: 'static + SelectChain<Block>,
901
{
902
1248
    let transaction_pool = sc_transaction_pool::Builder::new(
903
1248
        task_manager.spawn_essential_handle(),
904
1248
        client.clone(),
905
1248
        config.role.is_authority().into(),
906
    )
907
1248
    .with_options(config.transaction_pool.clone())
908
1248
    .with_prometheus(config.prometheus_registry())
909
1248
    .build();
910

            
911
    // Create babe block import queue; this is required to have correct epoch data
912
    // available for manual seal to produce block
913
1248
    let babe_config = babe::configuration(&*client)?;
914
1248
    let (babe_block_import, babe_link) =
915
1248
        babe::block_import(babe_config.clone(), client.clone(), client.clone())?;
916
1248
    let slot_duration = babe_link.config().slot_duration();
917

            
918
    // Create manual seal block import with manual seal block import queue
919
1248
    let import_queue = sc_consensus_manual_seal::import_queue(
920
1248
        Box::new(babe_block_import.clone()),
921
1248
        &task_manager.spawn_essential_handle(),
922
1248
        config.prometheus_registry(),
923
    );
924

            
925
1248
    Ok(service::PartialComponents {
926
1248
        client,
927
1248
        backend,
928
1248
        task_manager,
929
1248
        keystore_container,
930
1248
        select_chain,
931
1248
        import_queue,
932
1248
        transaction_pool: transaction_pool.into(),
933
1248
        other: (babe_block_import, babe_link, slot_duration, telemetry),
934
1248
    })
935
1248
}
936

            
937
1248
fn new_partial_basics(
938
1248
    config: &mut Configuration,
939
1248
    telemetry_worker_handle: Option<TelemetryWorkerHandle>,
940
1248
) -> Result<Basics, Error> {
941
1248
    let telemetry = config
942
1248
        .telemetry_endpoints
943
1248
        .clone()
944
1248
        .filter(|x| !x.is_empty())
945
1248
        .map(move |endpoints| -> Result<_, telemetry::Error> {
946
            let (worker, mut worker_handle) = if let Some(worker_handle) = telemetry_worker_handle {
947
                (None, worker_handle)
948
            } else {
949
                let worker = TelemetryWorker::new(16)?;
950
                let worker_handle = worker.handle();
951
                (Some(worker), worker_handle)
952
            };
953
            let telemetry = worker_handle.new_telemetry(endpoints);
954
            Ok((worker, telemetry))
955
        })
956
1248
        .transpose()?;
957

            
958
1248
    let heap_pages = config
959
1248
        .executor
960
1248
        .default_heap_pages
961
1248
        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
962
            extra_pages: h as u32,
963
        });
964

            
965
1248
    let mut wasm_builder = WasmExecutor::builder()
966
1248
        .with_execution_method(config.executor.wasm_method)
967
1248
        .with_onchain_heap_alloc_strategy(heap_pages)
968
1248
        .with_offchain_heap_alloc_strategy(heap_pages)
969
1248
        .with_max_runtime_instances(config.executor.max_runtime_instances)
970
1248
        .with_runtime_cache_size(config.executor.runtime_cache_size);
971
1248
    if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
972
1188
        wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
973
1188
    }
974
1248
    let executor = wasm_builder.build();
975

            
976
1248
    let (client, backend, keystore_container, task_manager) =
977
1248
        service::new_full_parts::<Block, RuntimeApi, _>(
978
1248
            config,
979
1248
            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
980
1248
            executor,
981
        )?;
982
1248
    let client = Arc::new(client);
983

            
984
1248
    let telemetry = telemetry.map(|(worker, telemetry)| {
985
        if let Some(worker) = worker {
986
            task_manager.spawn_handle().spawn(
987
                "telemetry",
988
                Some("telemetry"),
989
                Box::pin(worker.run()),
990
            );
991
        }
992
        telemetry
993
    });
994

            
995
1248
    Ok(Basics {
996
1248
        task_manager,
997
1248
        client,
998
1248
        backend,
999
1248
        keystore_container,
1248
        telemetry,
1248
    })
1248
}
use {
    polkadot_primitives::{AvailabilityBitfield, UncheckedSigned, ValidatorId, ValidatorIndex},
    sp_keystore::Error as KeystoreError,
};
30822
fn keystore_sign<H: Encode, Payload: Encode>(
30822
    keystore: &KeystorePtr,
30822
    payload: Payload,
30822
    context: &SigningContext<H>,
30822
    validator_index: ValidatorIndex,
30822
    key: &ValidatorId,
30822
) -> Result<Option<UncheckedSigned<Payload>>, KeystoreError> {
30822
    let data = payload_data(&payload, context);
30822
    let signature = keystore
30822
        .sr25519_sign(ValidatorId::ID, key.as_ref(), &data)?
30822
        .map(|sig| UncheckedSigned::new(payload, validator_index, sig.into()));
30822
    Ok(signature)
30822
}
30822
fn payload_data<H: Encode, Payload: Encode>(
30822
    payload: &Payload,
30822
    context: &SigningContext<H>,
30822
) -> Vec<u8> {
    // equivalent to (`real_payload`, context).encode()
30822
    let mut out = payload.encode_as();
30822
    out.extend(context.encode());
30822
    out
30822
}
/// Create an `AvailabilityBitfield` with size `total_cores`. The first `used_cores` set to true (occupied),
/// and the remaining to false (available).
16164
fn availability_bitvec(used_cores: usize, total_cores: usize) -> AvailabilityBitfield {
16164
    let mut bitfields = bitvec::bitvec![u8, bitvec::order::Lsb0; 0; 0];
62136
    for i in 0..total_cores {
62136
        if i < used_cores {
15534
            bitfields.push(true);
15534
        } else {
46602
            bitfields.push(false)
        }
    }
16164
    bitfields.into()
16164
}