1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
//! Development Polkadot service. Adapted from `polkadot_service` crate
18
//! and removed un-necessary components which are not required in dev node.
19
//!
20
//! Following major changes are made:
21
//! 1. Removed beefy and grandpa notification service and request response protocols
22
//! 2. Removed support for parachains which also eliminated the need to start overseer and all other subsystems associated with collation + network request/response protocols for the same
23
//! 3. Removed support for hardware benchmarking
24
//! 4. Removed authority discovery service
25
//! 5. Removed spawning of beefy, grandpa and MMR worker
26
//! 6. Removed rpc extensions for beefy, grandpa and babe and added support for manual seal
27
//! 7. Removed beefy and grandpa block import from block import pipeline (Babe remains)
28
//! 8. Using manual seal import queue instead of babe import queue
29
//! 9. Started manual seal worker
30
//! 10. If amount of time passed between two block is less than slot duration, we emulate passing of time babe block import and runtime
31
//!     by incrementing timestamp by slot duration.
32

            
33
use {
34
    crate::dev_rpcs::{DevApiServer, DevRpc},
35
    async_io::Timer,
36
    babe::{BabeBlockImport, BabeLink},
37
    codec::{Decode, Encode},
38
    consensus_common::SelectChain,
39
    cumulus_primitives_core::ParaId,
40
    dancelight_runtime::RuntimeApi,
41
    futures::{Stream, StreamExt},
42
    jsonrpsee::RpcModule,
43
    manual_container_chains_exclusion_rpc::{
44
        ManualContainerChainsExclusion, ManualContainerChainsExclusionApiServer,
45
    },
46
    node_common::service::Sealing,
47
    polkadot_core_primitives::{AccountId, Balance, Block, Hash, Nonce},
48
    polkadot_node_core_parachains_inherent::Error as InherentError,
49
    polkadot_overseer::Handle,
50
    polkadot_parachain_primitives::primitives::UpwardMessages,
51
    polkadot_primitives::{
52
        runtime_api::ParachainHost, BackedCandidate, CandidateCommitments, CandidateDescriptor,
53
        CollatorPair, CommittedCandidateReceipt, CompactStatement, EncodeAs,
54
        InherentData as ParachainsInherentData, OccupiedCoreAssumption, SigningContext,
55
        ValidityAttestation,
56
    },
57
    polkadot_rpc::RpcExtension,
58
    polkadot_service::{
59
        BlockT, Error, IdentifyVariant, NewFullParams, OverseerGen, SelectRelayChain,
60
    },
61
    sc_client_api::{AuxStore, Backend},
62
    sc_consensus_manual_seal::{
63
        consensus::babe::BabeConsensusDataProvider,
64
        rpc::{ManualSeal, ManualSealApiServer},
65
        run_manual_seal, EngineCommand, ManualSealParams,
66
    },
67
    sc_executor::{HeapAllocStrategy, WasmExecutor, DEFAULT_HEAP_ALLOC_STRATEGY},
68
    sc_keystore::Keystore,
69
    sc_transaction_pool_api::{OffchainTransactionPoolFactory, TransactionPool},
70
    service::{Configuration, KeystoreContainer, RpcHandlers, TaskManager},
71
    sp_api::ProvideRuntimeApi,
72
    sp_block_builder::BlockBuilder,
73
    sp_blockchain::{HeaderBackend, HeaderMetadata},
74
    sp_consensus_aura::{inherents::InherentType as AuraInherentType, AURA_ENGINE_ID},
75
    sp_consensus_babe::SlotDuration,
76
    sp_core::{ByteArray, Pair, H256},
77
    sp_keystore::KeystorePtr,
78
    sp_runtime::{traits::BlakeTwo256, DigestItem, RuntimeAppPublic},
79
    std::{cmp::max, ops::Add, sync::Arc, time::Duration},
80
    telemetry::{Telemetry, TelemetryWorker, TelemetryWorkerHandle},
81
};
82

            
83
// We use this key to store whether we want the para inherent mocker to be active
84
const PARA_INHERENT_SELECTOR_AUX_KEY: &[u8] = b"__DEV_PARA_INHERENT_SELECTOR";
85

            
86
const CONTAINER_CHAINS_EXCLUSION_AUX_KEY: &[u8] = b"__DEV_CONTAINER_CHAINS_EXCLUSION";
87

            
88
pub type FullBackend = service::TFullBackend<Block>;
89

            
90
pub type FullClient = service::TFullClient<
91
    Block,
92
    RuntimeApi,
93
    WasmExecutor<(
94
        sp_io::SubstrateHostFunctions,
95
        frame_benchmarking::benchmarking::HostFunctions,
96
    )>,
97
>;
98

            
99
pub struct NewFull {
100
    pub task_manager: TaskManager,
101
    pub client: Arc<FullClient>,
102
    pub overseer_handle: Option<Handle>,
103
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
104
    pub sync_service: Arc<sc_network_sync::SyncingService<Block>>,
105
    pub rpc_handlers: RpcHandlers,
106
    pub backend: Arc<FullBackend>,
107
}
108

            
109
/// Custom Deps for dev Rpc extension
110
struct DevDeps<C, P> {
111
    /// The client instance to use.
112
    pub client: Arc<C>,
113
    /// Transaction pool instance.
114
    pub pool: Arc<P>,
115
    /// Manual seal command sink
116
    pub command_sink: Option<futures::channel::mpsc::Sender<EngineCommand<Hash>>>,
117
    /// Dev rpcs
118
    pub dev_rpc: Option<DevRpc>,
119
    /// Channels for manually excluding container chains from producing blocks
120
    pub container_chain_exclusion_sender: Option<flume::Sender<Vec<ParaId>>>,
121
}
122

            
123
1992
fn create_dev_rpc_extension<C, P>(
124
1992
    DevDeps {
125
1992
        client,
126
1992
        pool,
127
1992
        command_sink: maybe_command_sink,
128
1992
        dev_rpc: maybe_dev_rpc,
129
1992
        container_chain_exclusion_sender: maybe_container_chain_exclusion_sender,
130
1992
    }: DevDeps<C, P>,
131
1992
) -> Result<RpcExtension, Box<dyn std::error::Error + Send + Sync>>
132
1992
where
133
1992
    C: ProvideRuntimeApi<Block>
134
1992
        + HeaderBackend<Block>
135
1992
        + AuxStore
136
1992
        + HeaderMetadata<Block, Error = sp_blockchain::Error>
137
1992
        + Send
138
1992
        + Sync
139
1992
        + 'static,
140
1992
    C::Api: substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Nonce>,
141
1992
    C::Api: pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi<Block, Balance>,
142
1992
    C::Api: BlockBuilder<Block>,
143
1992
    P: TransactionPool + Sync + Send + 'static,
144
1992
{
145
    use {
146
        pallet_transaction_payment_rpc::{TransactionPayment, TransactionPaymentApiServer},
147
        substrate_frame_rpc_system::{System, SystemApiServer},
148
    };
149

            
150
1992
    let mut io = RpcModule::new(());
151
1992
    io.merge(System::new(client.clone(), pool.clone()).into_rpc())?;
152
1992
    io.merge(TransactionPayment::new(client.clone()).into_rpc())?;
153

            
154
1992
    if let Some(command_sink) = maybe_command_sink {
155
1992
        io.merge(ManualSeal::new(command_sink).into_rpc())?;
156
    }
157

            
158
1992
    if let Some(dev_rpc_data) = maybe_dev_rpc {
159
1992
        io.merge(dev_rpc_data.into_rpc())?;
160
    }
161

            
162
1992
    if let Some(container_chain_exclusion_message_channel) = maybe_container_chain_exclusion_sender
163
    {
164
1992
        io.merge(
165
1992
            ManualContainerChainsExclusion {
166
1992
                container_chain_exclusion_message_channel,
167
1992
            }
168
1992
            .into_rpc(),
169
1992
        )?;
170
    }
171

            
172
1992
    Ok(io)
173
1992
}
174

            
175
/// We use EmptyParachainsInherentDataProvider to insert an empty parachain inherent in the block
176
/// to satisfy runtime
177
struct EmptyParachainsInherentDataProvider;
178

            
179
/// Copied from polkadot service just so that this code retains same structure as
180
/// polkadot_service crate.
181
struct Basics {
182
    task_manager: TaskManager,
183
    client: Arc<FullClient>,
184
    backend: Arc<FullBackend>,
185
    keystore_container: KeystoreContainer,
186
    telemetry: Option<Telemetry>,
187
}
188

            
189
impl EmptyParachainsInherentDataProvider {
190
47814
    pub async fn create<C: HeaderBackend<Block>>(
191
47814
        client: Arc<C>,
192
47814
        parent: Hash,
193
47814
    ) -> Result<ParachainsInherentData, InherentError> {
194
47814
        let parent_header = match client.header(parent) {
195
47814
            Ok(Some(h)) => h,
196
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
197
            Err(err) => return Err(InherentError::Blockchain(err)),
198
        };
199

            
200
47814
        Ok(ParachainsInherentData {
201
47814
            bitfields: Vec::new(),
202
47814
            backed_candidates: Vec::new(),
203
47814
            disputes: Vec::new(),
204
47814
            parent_header,
205
47814
        })
206
47814
    }
207
}
208

            
209
/// Creates new development full node with manual seal
210
996
pub fn build_full<OverseerGenerator: OverseerGen>(
211
996
    sealing: Sealing,
212
996
    config: Configuration,
213
996
    mut params: NewFullParams<OverseerGenerator>,
214
996
) -> Result<NewFull, Error> {
215
996
    let is_polkadot = config.chain_spec.is_polkadot();
216
996

            
217
996
    params.overseer_message_channel_capacity_override = params
218
996
        .overseer_message_channel_capacity_override
219
996
        .map(move |capacity| {
220
            if is_polkadot {
221
                gum::warn!("Channel capacity should _never_ be tampered with on polkadot!");
222
            }
223
            capacity
224
996
        });
225
996

            
226
996
    match config
227
996
        .network
228
996
        .network_backend
229
996
        .unwrap_or(sc_network::config::NetworkBackendType::Libp2p)
230
    {
231
        sc_network::config::NetworkBackendType::Libp2p => {
232
996
            new_full::<_, sc_network::NetworkWorker<Block, Hash>>(sealing, config, params)
233
        }
234
        sc_network::config::NetworkBackendType::Litep2p => {
235
            new_full::<_, sc_network::Litep2pNetworkBackend>(sealing, config, params)
236
        }
237
    }
238
996
}
239

            
240
/// We use MockParachainsInherentDataProvider to insert an parachain inherent with mocked
241
/// candidates
242
/// We detect whether any of the keys in our keystore is assigned to a core and provide
243
/// a mocked candidate in such core
244
struct MockParachainsInherentDataProvider<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> {
245
    pub client: Arc<C>,
246
    pub parent: Hash,
247
    pub keystore: KeystorePtr,
248
    pub upward_messages_receiver: flume::Receiver<Vec<u8>>,
249
    pub container_chain_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
250
}
251

            
252
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> MockParachainsInherentDataProvider<C>
253
where
254
    C::Api: ParachainHost<Block>,
255
    C: AuxStore,
256
{
257
56196
    pub fn new(
258
56196
        client: Arc<C>,
259
56196
        parent: Hash,
260
56196
        keystore: KeystorePtr,
261
56196
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
262
56196
        container_chain_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
263
56196
    ) -> Self {
264
56196
        MockParachainsInherentDataProvider {
265
56196
            client,
266
56196
            parent,
267
56196
            keystore,
268
56196
            upward_messages_receiver,
269
56196
            container_chain_exclusion_receiver,
270
56196
        }
271
56196
    }
272

            
273
8382
    pub async fn create(
274
8382
        client: Arc<C>,
275
8382
        parent: Hash,
276
8382
        keystore: KeystorePtr,
277
8382
        upward_messages_receiver: flume::Receiver<Vec<u8>>,
278
8382
        container_chains_exclusion_receiver: flume::Receiver<Vec<ParaId>>,
279
8382
    ) -> Result<ParachainsInherentData, InherentError> {
280
8382
        let parent_header = match client.header(parent) {
281
8382
            Ok(Some(h)) => h,
282
            Ok(None) => return Err(InherentError::ParentHeaderNotFound(parent)),
283
            Err(err) => return Err(InherentError::Blockchain(err)),
284
        };
285

            
286
        // Strategy:
287
        // we usually have 1 validator per core, and we usually run with --alice
288
        // the idea is that at least alice will be assigned to one core
289
        // if we find in the keystore the validator attached to a particular core,
290
        // we generate a signature for the parachain assigned to that core
291
        // To retrieve the validator keys, cal runtime api:
292

            
293
        // this following piece of code predicts whether the validator is assigned to a particular
294
        // core where a candidate for a parachain needs to be created
295
8382
        let runtime_api = client.runtime_api();
296
8382

            
297
8382
        // we get all validators
298
8382

            
299
8382
        // we get the current claim queue to know core availability
300
8382
        let claim_queue = runtime_api.claim_queue(parent).unwrap();
301
8382

            
302
8382
        // we get the validator groups
303
8382
        let (groups, rotation_info) = runtime_api.validator_groups(parent).unwrap();
304
8382

            
305
8382
        // we calculate rotation since start, which will define the core assignation
306
8382
        // to validators
307
8382
        let rotations_since_session_start = (parent_header.number
308
8382
            - rotation_info.session_start_block)
309
8382
            / rotation_info.group_rotation_frequency;
310
8382

            
311
8382
        // Get all the available keys in the keystore
312
8382
        let available_keys = keystore
313
8382
            .keys(polkadot_primitives::PARACHAIN_KEY_TYPE_ID)
314
8382
            .unwrap();
315
8382

            
316
8382
        // create a slot number identical to the parent block num
317
8382
        let slot_number = AuraInherentType::from(u64::from(parent_header.number));
318
8382

            
319
8382
        // create a mocked header
320
8382
        let parachain_mocked_header = sp_runtime::generic::Header::<u32, BlakeTwo256> {
321
8382
            parent_hash: Default::default(),
322
8382
            number: parent_header.number,
323
8382
            state_root: Default::default(),
324
8382
            extrinsics_root: Default::default(),
325
8382
            digest: sp_runtime::generic::Digest {
326
8382
                logs: vec![DigestItem::PreRuntime(AURA_ENGINE_ID, slot_number.encode())],
327
8382
            },
328
8382
        };
329
8382

            
330
8382
        // retrieve availability cores
331
8382
        let availability_cores = runtime_api.availability_cores(parent).unwrap();
332
8382

            
333
8382
        // retrieve current session_idx
334
8382
        let session_idx = runtime_api.session_index_for_child(parent).unwrap();
335
8382

            
336
8382
        // retrieve all validators
337
8382
        let all_validators = runtime_api.validators(parent).unwrap();
338
8382

            
339
8382
        // construct full availability bitvec
340
8382
        let availability_bitvec = availability_bitvec(1, availability_cores.len());
341
8382

            
342
8382
        let signature_ctx = SigningContext {
343
8382
            parent_hash: parent,
344
8382
            session_index: session_idx,
345
8382
        };
346
8382

            
347
8382
        // we generate the availability bitfield sigs
348
8382
        // TODO: here we assume all validator keys are able to sign with our keystore
349
8382
        // we need to make sure the key is there before we try to sign
350
8382
        // this is mostly to indicate that the erasure coding chunks where received by all val
351
8382
        let bitfields: Vec<UncheckedSigned<AvailabilityBitfield>> = all_validators
352
8382
            .iter()
353
8382
            .enumerate()
354
8382
            .map(|(i, public)| {
355
8382
                keystore_sign(
356
8382
                    &keystore,
357
8382
                    availability_bitvec.clone(),
358
8382
                    &signature_ctx,
359
8382
                    ValidatorIndex(i as u32),
360
8382
                    &public,
361
8382
                )
362
8382
                .unwrap()
363
8382
                .unwrap()
364
8382
            })
365
8382
            .collect();
366
8382

            
367
8382
        // generate a random collator pair
368
8382
        let collator_pair = CollatorPair::generate().0;
369
8382
        let mut backed_cand: Vec<BackedCandidate<H256>> = vec![];
370
8382

            
371
8382
        let container_chains_exclusion_messages: Vec<Vec<ParaId>> =
372
8382
            container_chains_exclusion_receiver.drain().collect();
373
        // If there is a new set of excluded container chains, we update it
374
8382
        if let Some(mock_excluded_container_chains) = container_chains_exclusion_messages.last() {
375
            client
376
                .insert_aux(
377
                    &[(
378
                        CONTAINER_CHAINS_EXCLUSION_AUX_KEY,
379
                        mock_excluded_container_chains.encode().as_slice(),
380
                    )],
381
                    &[],
382
                )
383
                .expect("Should be able to write to aux storage; qed");
384
8382
        }
385
8382
        let new_excluded_container_chains_value = client
386
8382
            .get_aux(CONTAINER_CHAINS_EXCLUSION_AUX_KEY)
387
8382
            .expect("Should be able to query aux storage; qed")
388
8382
            .unwrap_or(Vec::<ParaId>::new().encode());
389
8382
        let mock_excluded_container_chains: Vec<ParaId> =
390
8382
            Decode::decode(&mut new_excluded_container_chains_value.as_slice())
391
8382
                .expect("Vector non-decodable");
392

            
393
        // iterate over every core|para pair
394
22866
        for (core, para) in claim_queue {
395
            // allows preventing container chains from producing blocks in dev mode
396
14484
            let mut para = para.clone();
397
28956
            para.retain(|x| !mock_excluded_container_chains.contains(x));
398
14484
            // check which group is assigned to each core
399
14484
            let group_assigned_to_core =
400
14484
                core.0 + rotations_since_session_start % groups.len() as u32;
401
14484
            // check validator indices associated to the core
402
14484
            let indices_associated_to_core = groups.get(group_assigned_to_core as usize).unwrap();
403
21810
            for index in indices_associated_to_core {
404
                // fetch validator keys
405
7326
                let validator_keys_to_find = all_validators.get(index.0 as usize).unwrap();
406
                // Iterate keys until we find an eligible one, or run out of candidates.
407
14652
                for type_public_pair in &available_keys {
408
7326
                    if let Ok(validator) =
409
7326
                        polkadot_primitives::ValidatorId::from_slice(&type_public_pair)
410
                    {
411
                        // if we find the validator in keystore, we try to create a backed cand
412
7326
                        if validator_keys_to_find == &validator {
413
7326
                            // we work with the previous included data
414
7326
                            let mut persisted_validation_data = runtime_api
415
7326
                                .persisted_validation_data(
416
7326
                                    parent,
417
7326
                                    para[0],
418
7326
                                    OccupiedCoreAssumption::Included,
419
7326
                                )
420
7326
                                .unwrap()
421
7326
                                .unwrap();
422
7326

            
423
7326
                            // if we dont do this we have a backed candidate every 2 blocks
424
7326
                            // we want
425
7326
                            persisted_validation_data.relay_parent_storage_root =
426
7326
                                parent_header.state_root;
427
7326

            
428
7326
                            let persisted_validation_data_hash = persisted_validation_data.hash();
429
7326
                            // retrieve the validation code hash
430
7326
                            let validation_code_hash = runtime_api
431
7326
                                .validation_code_hash(
432
7326
                                    parent,
433
7326
                                    para[0],
434
7326
                                    OccupiedCoreAssumption::Included,
435
7326
                                )
436
7326
                                .unwrap()
437
7326
                                .unwrap();
438
7326
                            let pov_hash = Default::default();
439
7326
                            // generate a fake collator signature
440
7326
                            let payload = polkadot_primitives::collator_signature_payload(
441
7326
                                &parent,
442
7326
                                &para[0],
443
7326
                                &persisted_validation_data_hash,
444
7326
                                &pov_hash,
445
7326
                                &validation_code_hash,
446
7326
                            );
447
7326
                            let collator_signature = collator_pair.sign(&payload);
448
7326

            
449
7326
                            let upward_messages = UpwardMessages::try_from(
450
7326
                                upward_messages_receiver.drain().collect::<Vec<_>>(),
451
7326
                            )
452
7326
                            .expect("create upward messages from raw messages");
453
7326

            
454
7326
                            // generate a candidate with most of the values mocked
455
7326
                            let candidate = CommittedCandidateReceipt::<H256> {
456
7326
                                descriptor: CandidateDescriptor::<H256> {
457
7326
                                    para_id: para[0],
458
7326
                                    relay_parent: parent,
459
7326
                                    collator: collator_pair.public(),
460
7326
                                    persisted_validation_data_hash,
461
7326
                                    pov_hash,
462
7326
                                    erasure_root: Default::default(),
463
7326
                                    signature: collator_signature,
464
7326
                                    para_head: parachain_mocked_header.clone().hash(),
465
7326
                                    validation_code_hash,
466
7326
                                },
467
7326
                                commitments: CandidateCommitments::<u32> {
468
7326
                                    upward_messages,
469
7326
                                    horizontal_messages: Default::default(),
470
7326
                                    new_validation_code: None,
471
7326
                                    head_data: parachain_mocked_header.clone().encode().into(),
472
7326
                                    processed_downward_messages: 0,
473
7326
                                    hrmp_watermark: parent_header.number,
474
7326
                                },
475
7326
                            };
476
7326
                            let candidate_hash = candidate.hash();
477
7326
                            let payload = CompactStatement::Valid(candidate_hash);
478
7326

            
479
7326
                            let signature_ctx = SigningContext {
480
7326
                                parent_hash: parent,
481
7326
                                session_index: session_idx,
482
7326
                            };
483
7326

            
484
7326
                            // sign the candidate with the validator key
485
7326
                            let signature = keystore_sign(
486
7326
                                &keystore,
487
7326
                                payload,
488
7326
                                &signature_ctx,
489
7326
                                *index,
490
7326
                                &validator,
491
7326
                            )
492
7326
                            .unwrap()
493
7326
                            .unwrap()
494
7326
                            .benchmark_signature();
495
7326

            
496
7326
                            // construct a validity vote
497
7326
                            let validity_votes = vec![ValidityAttestation::Explicit(signature)];
498
7326

            
499
7326
                            // push the candidate
500
7326
                            backed_cand.push(BackedCandidate::<H256>::new(
501
7326
                                candidate,
502
7326
                                validity_votes.clone(),
503
7326
                                bitvec::bitvec![u8, bitvec::order::Lsb0; 1; indices_associated_to_core.len()],
504
7326
                                core,
505
7326
                            ));
506
7326
                        }
507
                    }
508
                }
509
            }
510
        }
511

            
512
8382
        Ok(ParachainsInherentData {
513
8382
            bitfields: bitfields,
514
8382
            backed_candidates: backed_cand,
515
8382
            disputes: Vec::new(),
516
8382
            parent_header,
517
8382
        })
518
8382
    }
519
}
520

            
521
#[async_trait::async_trait]
522
impl<C: HeaderBackend<Block> + ProvideRuntimeApi<Block>> sp_inherents::InherentDataProvider
523
    for MockParachainsInherentDataProvider<C>
524
where
525
    C::Api: ParachainHost<Block>,
526
    C: AuxStore,
527
{
528
    async fn provide_inherent_data(
529
        &self,
530
        dst_inherent_data: &mut sp_inherents::InherentData,
531
56196
    ) -> Result<(), sp_inherents::Error> {
532
        // fetch whether the para inherent selector has been set
533
56196
        let maybe_para_selector = self
534
56196
            .client
535
56196
            .get_aux(PARA_INHERENT_SELECTOR_AUX_KEY)
536
56196
            .expect("Should be able to query aux storage; qed");
537

            
538
56196
        let inherent_data = {
539
56196
            if let Some(aux) = maybe_para_selector {
540
                // if it is true, the candidates need to be mocked
541
                // else, we output the empty parachain inherent data provider
542
8382
                if aux == true.encode() {
543
8382
                    MockParachainsInherentDataProvider::create(
544
8382
                        self.client.clone(),
545
8382
                        self.parent,
546
8382
                        self.keystore.clone(),
547
8382
                        self.upward_messages_receiver.clone(),
548
8382
                        self.container_chain_exclusion_receiver.clone(),
549
8382
                    )
550
8382
                    .await
551
8382
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
552
                } else {
553
                    EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
554
                        .await
555
                        .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
556
                }
557
            } else {
558
47814
                EmptyParachainsInherentDataProvider::create(self.client.clone(), self.parent)
559
47814
                    .await
560
47814
                    .map_err(|e| sp_inherents::Error::Application(Box::new(e)))?
561
            }
562
        };
563

            
564
56196
        dst_inherent_data.put_data(
565
56196
            polkadot_primitives::PARACHAINS_INHERENT_IDENTIFIER,
566
56196
            &inherent_data,
567
56196
        )
568
112392
    }
569

            
570
    async fn try_handle_error(
571
        &self,
572
        _identifier: &sp_inherents::InherentIdentifier,
573
        _error: &[u8],
574
    ) -> Option<Result<(), sp_inherents::Error>> {
575
        // Inherent isn't checked and can not return any error
576
        None
577
    }
578
}
579

            
580
/// We store past timestamp we created in the aux storage, which enable us to return timestamp which is increased by
581
/// slot duration from previous timestamp or current timestamp if in reality more time is passed.
582
56196
fn get_next_timestamp(
583
56196
    client: Arc<FullClient>,
584
56196
    slot_duration: SlotDuration,
585
56196
) -> sp_timestamp::InherentDataProvider {
586
    const TIMESTAMP_AUX_KEY: &[u8] = b"__DEV_TIMESTAMP";
587

            
588
56196
    let maybe_last_timestamp = client
589
56196
        .get_aux(TIMESTAMP_AUX_KEY)
590
56196
        .expect("Should be able to query aux storage; qed");
591
56196
    if let Some(last_timestamp) = maybe_last_timestamp {
592
55314
        let last_inherent_data = sp_timestamp::InherentType::decode(&mut last_timestamp.as_slice())
593
55314
            .expect("Timestamp data must be decoded; qed");
594
55314
        let new_inherent_data: sp_timestamp::InherentType = max(
595
55314
            last_inherent_data.add(slot_duration.as_millis()),
596
55314
            sp_timestamp::InherentType::current(),
597
55314
        );
598
55314
        client
599
55314
            .insert_aux(
600
55314
                &[(TIMESTAMP_AUX_KEY, new_inherent_data.encode().as_slice())],
601
55314
                &[],
602
55314
            )
603
55314
            .expect("Should be able to write to aux storage; qed");
604
55314
        sp_timestamp::InherentDataProvider::new(new_inherent_data)
605
    } else {
606
882
        let current_timestamp = sp_timestamp::InherentType::current();
607
882
        client
608
882
            .insert_aux(
609
882
                &[(TIMESTAMP_AUX_KEY, current_timestamp.encode().as_slice())],
610
882
                &[],
611
882
            )
612
882
            .expect("Should be able to write to aux storage; qed");
613
882
        sp_timestamp::InherentDataProvider::new(current_timestamp)
614
    }
615
56196
}
616

            
617
996
fn new_full<
618
996
    OverseerGenerator: OverseerGen,
619
996
    Network: sc_network::NetworkBackend<Block, <Block as BlockT>::Hash>,
620
996
>(
621
996
    sealing: Sealing,
622
996
    mut config: Configuration,
623
996
    NewFullParams {
624
996
        telemetry_worker_handle,
625
996
        ..
626
996
    }: NewFullParams<OverseerGenerator>,
627
996
) -> Result<NewFull, Error> {
628
996
    let role = config.role;
629

            
630
996
    let basics = new_partial_basics(&mut config, telemetry_worker_handle)?;
631

            
632
996
    let prometheus_registry = config.prometheus_registry().cloned();
633
996

            
634
996
    let keystore = basics.keystore_container.local_keystore();
635
996

            
636
996
    let select_chain = SelectRelayChain::new_longest_chain(basics.backend.clone());
637

            
638
996
    let service::PartialComponents::<_, _, SelectRelayChain<_>, _, _, _> {
639
996
        client,
640
996
        backend,
641
996
        mut task_manager,
642
996
        keystore_container,
643
996
        select_chain,
644
996
        import_queue,
645
996
        transaction_pool,
646
996
        other: (block_import, babe_link, slot_duration, mut telemetry),
647
996
    } = new_partial::<SelectRelayChain<_>>(&mut config, basics, select_chain)?;
648

            
649
996
    let metrics = Network::register_notification_metrics(
650
996
        config.prometheus_config.as_ref().map(|cfg| &cfg.registry),
651
996
    );
652
996

            
653
996
    let net_config = sc_network::config::FullNetworkConfiguration::<_, _, Network>::new(
654
996
        &config.network,
655
996
        prometheus_registry.clone(),
656
996
    );
657
996

            
658
996
    // Create channels for mocked parachain candidates.
659
996
    let (downward_mock_para_inherent_sender, downward_mock_para_inherent_receiver) =
660
996
        flume::bounded::<Vec<u8>>(100);
661
996

            
662
996
    let (upward_mock_sender, upward_mock_receiver) = flume::bounded::<Vec<u8>>(100);
663

            
664
996
    let (network, system_rpc_tx, tx_handler_controller, sync_service) =
665
996
        service::build_network(service::BuildNetworkParams {
666
996
            config: &config,
667
996
            net_config,
668
996
            client: client.clone(),
669
996
            transaction_pool: transaction_pool.clone(),
670
996
            spawn_handle: task_manager.spawn_handle(),
671
996
            import_queue,
672
996
            block_announce_validator_builder: None,
673
996
            warp_sync_config: None,
674
996
            block_relay: None,
675
996
            metrics,
676
996
        })?;
677

            
678
996
    if config.offchain_worker.enabled {
679
996
        use futures::FutureExt;
680

            
681
996
        task_manager.spawn_handle().spawn(
682
996
            "offchain-workers-runner",
683
996
            "offchain-work",
684
996
            sc_offchain::OffchainWorkers::new(sc_offchain::OffchainWorkerOptions {
685
996
                runtime_api_provider: client.clone(),
686
996
                keystore: Some(keystore_container.keystore()),
687
996
                offchain_db: backend.offchain_storage(),
688
996
                transaction_pool: Some(OffchainTransactionPoolFactory::new(
689
996
                    transaction_pool.clone(),
690
996
                )),
691
996
                network_provider: Arc::new(network.clone()),
692
996
                is_validator: role.is_authority(),
693
996
                enable_http_requests: false,
694
56196
                custom_extensions: move |_| vec![],
695
996
            })?
696
996
            .run(client.clone(), task_manager.spawn_handle())
697
996
            .boxed(),
698
        );
699
    }
700

            
701
996
    let mut command_sink = None;
702
996
    let mut container_chain_exclusion_sender = None;
703
996

            
704
996
    if role.is_authority() {
705
996
        let proposer = sc_basic_authorship::ProposerFactory::with_proof_recording(
706
996
            task_manager.spawn_handle(),
707
996
            client.clone(),
708
996
            transaction_pool.clone(),
709
996
            prometheus_registry.as_ref(),
710
996
            telemetry.as_ref().map(|x| x.handle()),
711
996
        );
712

            
713
996
        let commands_stream: Box<
714
996
            dyn Stream<Item = EngineCommand<<Block as BlockT>::Hash>> + Send + Sync + Unpin,
715
996
        > = match sealing {
716
            Sealing::Instant => {
717
                Box::new(
718
                    // This bit cribbed from the implementation of instant seal.
719
                    transaction_pool.import_notification_stream().map(|_| {
720
                        EngineCommand::SealNewBlock {
721
                            create_empty: false,
722
                            finalize: false,
723
                            parent_hash: None,
724
                            sender: None,
725
                        }
726
                    }),
727
                )
728
            }
729
            Sealing::Manual => {
730
996
                let (sink, stream) = futures::channel::mpsc::channel(1000);
731
996
                // Keep a reference to the other end of the channel. It goes to the RPC.
732
996
                command_sink = Some(sink);
733
996
                Box::new(stream)
734
            }
735
            Sealing::Interval(millis) => Box::new(StreamExt::map(
736
                Timer::interval(Duration::from_millis(millis)),
737
                |_| EngineCommand::SealNewBlock {
738
                    create_empty: true,
739
                    finalize: true,
740
                    parent_hash: None,
741
                    sender: None,
742
                },
743
            )),
744
        };
745
996
        let keystore_clone = keystore.clone();
746
996

            
747
996
        let babe_config = babe_link.config();
748
996
        let babe_consensus_provider = BabeConsensusDataProvider::new(
749
996
            client.clone(),
750
996
            keystore,
751
996
            babe_link.epoch_changes().clone(),
752
996
            babe_config.authorities.clone(),
753
996
        )
754
996
        .map_err(|babe_error| {
755
            Error::Consensus(consensus_common::Error::Other(babe_error.into()))
756
996
        })?;
757

            
758
996
        let (mock_container_chains_exclusion_sender, mock_container_chains_exclusion_receiver) =
759
996
            flume::bounded::<Vec<ParaId>>(100);
760
996
        container_chain_exclusion_sender = Some(mock_container_chains_exclusion_sender);
761
996

            
762
996
        // Need to clone it and store here to avoid moving of `client`
763
996
        // variable in closure below.
764
996
        let client_clone = client.clone();
765
996

            
766
996
        task_manager.spawn_essential_handle().spawn_blocking(
767
996
            "authorship_task",
768
996
            Some("block-authoring"),
769
996
            run_manual_seal(ManualSealParams {
770
996
                block_import,
771
996
                env: proposer,
772
996
                client: client.clone(),
773
996
                pool: transaction_pool.clone(),
774
996
                commands_stream,
775
996
                select_chain,
776
56196
                create_inherent_data_providers: move |parent, ()| {
777
56196
                    let client_clone = client_clone.clone();
778
56196
                    let keystore = keystore_clone.clone();
779
56196
                    let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
780
56196
                    let upward_mock_receiver = upward_mock_receiver.clone();
781
56196
                    let mock_container_chains_exclusion_receiver = mock_container_chains_exclusion_receiver.clone();
782
56196
                    async move {
783
56196

            
784
56196
                        let downward_mock_para_inherent_receiver = downward_mock_para_inherent_receiver.clone();
785
56196
                        // here we only take the last one
786
56196
                        let para_inherent_decider_messages: Vec<Vec<u8>> = downward_mock_para_inherent_receiver.drain().collect();
787
56196

            
788
56196
                        let upward_messages_receiver = upward_mock_receiver.clone();
789

            
790
                        // If there is a value to be updated, we update it
791
56196
                        if let Some(value) = para_inherent_decider_messages.last() {
792
48
                            client_clone
793
48
                            .insert_aux(
794
48
                                &[(PARA_INHERENT_SELECTOR_AUX_KEY, value.as_slice())],
795
48
                                &[],
796
48
                            )
797
48
                            .expect("Should be able to write to aux storage; qed");
798
56148
                        }
799

            
800
56196
                        let parachain = MockParachainsInherentDataProvider::new(
801
56196
                            client_clone.clone(),
802
56196
                            parent,
803
56196
                            keystore,
804
56196
                            upward_messages_receiver,
805
56196
                            mock_container_chains_exclusion_receiver
806
56196
                        );
807
56196

            
808
56196
                        let timestamp = get_next_timestamp(client_clone, slot_duration);
809
56196

            
810
56196
                        let slot =
811
56196
                            sp_consensus_babe::inherents::InherentDataProvider::from_timestamp_and_slot_duration(
812
56196
                                *timestamp,
813
56196
                                slot_duration,
814
56196
                            );
815
56196

            
816
56196
                        Ok((slot, timestamp, parachain))
817
56196
                    }
818
56196
                },
819
996
                consensus_data_provider: Some(Box::new(babe_consensus_provider)),
820
996
            }),
821
996
        );
822
996
    }
823

            
824
996
    let dev_rpc = if role.clone().is_authority() {
825
996
        Some(DevRpc {
826
996
            mock_para_inherent_channel: downward_mock_para_inherent_sender,
827
996
            upward_message_channel: upward_mock_sender,
828
996
        })
829
    } else {
830
        None
831
    };
832

            
833
996
    let rpc_extensions_builder = {
834
996
        let client = client.clone();
835
996
        let transaction_pool = transaction_pool.clone();
836

            
837
        move |_subscription_executor: polkadot_rpc::SubscriptionTaskExecutor|
838
1992
            -> Result<RpcExtension, service::Error> {
839
1992
            let deps = DevDeps {
840
1992
                client: client.clone(),
841
1992
                pool: transaction_pool.clone(),
842
1992
                command_sink: command_sink.clone(),
843
1992
                dev_rpc: dev_rpc.clone(),
844
1992
                container_chain_exclusion_sender: container_chain_exclusion_sender.clone(),
845
1992
            };
846
1992

            
847
1992
            create_dev_rpc_extension(deps).map_err(Into::into)
848
1992
        }
849
    };
850

            
851
996
    let rpc_handlers = service::spawn_tasks(service::SpawnTasksParams {
852
996
        config,
853
996
        backend: backend.clone(),
854
996
        client: client.clone(),
855
996
        keystore: keystore_container.keystore(),
856
996
        network: network.clone(),
857
996
        sync_service: sync_service.clone(),
858
996
        rpc_builder: Box::new(rpc_extensions_builder),
859
996
        transaction_pool: transaction_pool.clone(),
860
996
        task_manager: &mut task_manager,
861
996
        system_rpc_tx,
862
996
        tx_handler_controller,
863
996
        telemetry: telemetry.as_mut(),
864
996
    })?;
865

            
866
996
    Ok(NewFull {
867
996
        task_manager,
868
996
        client,
869
996
        overseer_handle: None,
870
996
        network,
871
996
        sync_service,
872
996
        rpc_handlers,
873
996
        backend,
874
996
    })
875
996
}
876

            
877
996
fn new_partial<ChainSelection>(
878
996
    config: &mut Configuration,
879
996
    Basics {
880
996
        task_manager,
881
996
        backend,
882
996
        client,
883
996
        keystore_container,
884
996
        telemetry,
885
996
    }: Basics,
886
996
    select_chain: ChainSelection,
887
996
) -> Result<
888
996
    service::PartialComponents<
889
996
        FullClient,
890
996
        FullBackend,
891
996
        ChainSelection,
892
996
        sc_consensus::DefaultImportQueue<Block>,
893
996
        sc_transaction_pool::TransactionPoolHandle<Block, FullClient>,
894
996
        (
895
996
            BabeBlockImport<Block, FullClient, Arc<FullClient>>,
896
996
            BabeLink<Block>,
897
996
            SlotDuration,
898
996
            Option<Telemetry>,
899
996
        ),
900
996
    >,
901
996
    Error,
902
996
>
903
996
where
904
996
    ChainSelection: 'static + SelectChain<Block>,
905
996
{
906
996
    let transaction_pool = sc_transaction_pool::Builder::new(
907
996
        task_manager.spawn_essential_handle(),
908
996
        client.clone(),
909
996
        config.role.is_authority().into(),
910
996
    )
911
996
    .with_options(config.transaction_pool.clone())
912
996
    .with_prometheus(config.prometheus_registry())
913
996
    .build();
914

            
915
    // Create babe block import queue; this is required to have correct epoch data
916
    // available for manual seal to produce block
917
996
    let babe_config = babe::configuration(&*client)?;
918
996
    let (babe_block_import, babe_link) =
919
996
        babe::block_import(babe_config.clone(), client.clone(), client.clone())?;
920
996
    let slot_duration = babe_link.config().slot_duration();
921
996

            
922
996
    // Create manual seal block import with manual seal block import queue
923
996
    let import_queue = sc_consensus_manual_seal::import_queue(
924
996
        Box::new(babe_block_import.clone()),
925
996
        &task_manager.spawn_essential_handle(),
926
996
        config.prometheus_registry(),
927
996
    );
928
996

            
929
996
    Ok(service::PartialComponents {
930
996
        client,
931
996
        backend,
932
996
        task_manager,
933
996
        keystore_container,
934
996
        select_chain,
935
996
        import_queue,
936
996
        transaction_pool: transaction_pool.into(),
937
996
        other: (babe_block_import, babe_link, slot_duration, telemetry),
938
996
    })
939
996
}
940

            
941
996
fn new_partial_basics(
942
996
    config: &mut Configuration,
943
996
    telemetry_worker_handle: Option<TelemetryWorkerHandle>,
944
996
) -> Result<Basics, Error> {
945
996
    let telemetry = config
946
996
        .telemetry_endpoints
947
996
        .clone()
948
996
        .filter(|x| !x.is_empty())
949
996
        .map(move |endpoints| -> Result<_, telemetry::Error> {
950
            let (worker, mut worker_handle) = if let Some(worker_handle) = telemetry_worker_handle {
951
                (None, worker_handle)
952
            } else {
953
                let worker = TelemetryWorker::new(16)?;
954
                let worker_handle = worker.handle();
955
                (Some(worker), worker_handle)
956
            };
957
            let telemetry = worker_handle.new_telemetry(endpoints);
958
            Ok((worker, telemetry))
959
996
        })
960
996
        .transpose()?;
961

            
962
996
    let heap_pages = config
963
996
        .executor
964
996
        .default_heap_pages
965
996
        .map_or(DEFAULT_HEAP_ALLOC_STRATEGY, |h| HeapAllocStrategy::Static {
966
            extra_pages: h as u32,
967
996
        });
968
996

            
969
996
    let mut wasm_builder = WasmExecutor::builder()
970
996
        .with_execution_method(config.executor.wasm_method)
971
996
        .with_onchain_heap_alloc_strategy(heap_pages)
972
996
        .with_offchain_heap_alloc_strategy(heap_pages)
973
996
        .with_max_runtime_instances(config.executor.max_runtime_instances)
974
996
        .with_runtime_cache_size(config.executor.runtime_cache_size);
975
996
    if let Some(ref wasmtime_precompiled_path) = config.executor.wasmtime_precompiled {
976
936
        wasm_builder = wasm_builder.with_wasmtime_precompiled_path(wasmtime_precompiled_path);
977
936
    }
978
996
    let executor = wasm_builder.build();
979

            
980
996
    let (client, backend, keystore_container, task_manager) =
981
996
        service::new_full_parts::<Block, RuntimeApi, _>(
982
996
            config,
983
996
            telemetry.as_ref().map(|(_, telemetry)| telemetry.handle()),
984
996
            executor,
985
996
        )?;
986
996
    let client = Arc::new(client);
987
996

            
988
996
    let telemetry = telemetry.map(|(worker, telemetry)| {
989
        if let Some(worker) = worker {
990
            task_manager.spawn_handle().spawn(
991
                "telemetry",
992
                Some("telemetry"),
993
                Box::pin(worker.run()),
994
            );
995
        }
996
        telemetry
997
996
    });
998
996

            
999
996
    Ok(Basics {
996
        task_manager,
996
        client,
996
        backend,
996
        keystore_container,
996
        telemetry,
996
    })
996
}
use {
    polkadot_primitives::{AvailabilityBitfield, UncheckedSigned, ValidatorId, ValidatorIndex},
    sp_keystore::Error as KeystoreError,
};
15708
fn keystore_sign<H: Encode, Payload: Encode>(
15708
    keystore: &KeystorePtr,
15708
    payload: Payload,
15708
    context: &SigningContext<H>,
15708
    validator_index: ValidatorIndex,
15708
    key: &ValidatorId,
15708
) -> Result<Option<UncheckedSigned<Payload>>, KeystoreError> {
15708
    let data = payload_data(&payload, context);
15708
    let signature = keystore
15708
        .sr25519_sign(ValidatorId::ID, key.as_ref(), &data)?
15708
        .map(|sig| UncheckedSigned::new(payload, validator_index, sig.into()));
15708
    Ok(signature)
15708
}
15708
fn payload_data<H: Encode, Payload: Encode>(
15708
    payload: &Payload,
15708
    context: &SigningContext<H>,
15708
) -> Vec<u8> {
15708
    // equivalent to (`real_payload`, context).encode()
15708
    let mut out = payload.encode_as();
15708
    out.extend(context.encode());
15708
    out
15708
}
/// Create an `AvailabilityBitfield` with size `total_cores`. The first `used_cores` set to true (occupied),
/// and the remaining to false (available).
8382
fn availability_bitvec(used_cores: usize, total_cores: usize) -> AvailabilityBitfield {
8382
    let mut bitfields = bitvec::bitvec![u8, bitvec::order::Lsb0; 0; 0];
31656
    for i in 0..total_cores {
31656
        if i < used_cores {
7914
            bitfields.push(true);
7914
        } else {
23742
            bitfields.push(false)
        }
    }
8382
    bitfields.into()
8382
}