1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
pub mod basic;
18
pub mod lookahead;
19

            
20
use {
21
    crate::{find_pre_digest, AuthorityId, OrchestratorAuraWorkerAuxData},
22
    cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface,
23
    cumulus_client_consensus_aura::collators::RelayParentData,
24
    cumulus_client_consensus_common::ParachainCandidate,
25
    cumulus_client_consensus_proposer::ProposerInterface,
26
    cumulus_client_parachain_inherent::{ParachainInherentData, ParachainInherentDataProvider},
27
    cumulus_primitives_core::{
28
        relay_chain::Hash as PHash, DigestItem, ParachainBlockData, PersistedValidationData,
29
    },
30
    cumulus_relay_chain_interface::RelayChainInterface,
31
    futures::prelude::*,
32
    nimbus_primitives::{CompatibleDigestItem as NimbusCompatibleDigestItem, NIMBUS_KEY_ID},
33
    parity_scale_codec::{Codec, Encode},
34
    polkadot_node_primitives::{Collation, MaybeCompressedPoV},
35
    polkadot_primitives::Id as ParaId,
36
    sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction},
37
    sp_application_crypto::{AppCrypto, AppPublic},
38
    sp_consensus::BlockOrigin,
39
    sp_consensus_aura::{digests::CompatibleDigestItem, Slot},
40
    sp_core::crypto::{ByteArray, Pair},
41
    sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvider},
42
    sp_keystore::{Keystore, KeystorePtr},
43
    sp_runtime::{
44
        generic::Digest,
45
        traits::{Block as BlockT, HashingFor, Header as HeaderT, Member, Zero},
46
    },
47
    sp_state_machine::StorageChanges,
48
    sp_timestamp::Timestamp,
49
    std::{convert::TryFrom, error::Error, time::Duration},
50
};
51

            
52
/// Parameters for instantiating a [`Collator`].
53
pub struct Params<BI, CIDP, RClient, Proposer, CS> {
54
    /// A builder for inherent data builders.
55
    pub create_inherent_data_providers: CIDP,
56
    /// The block import handle.
57
    pub block_import: BI,
58
    /// An interface to the relay-chain client.
59
    pub relay_client: RClient,
60
    /// The keystore handle used for accessing parachain key material.
61
    pub keystore: KeystorePtr,
62
    /// The identifier of the parachain within the relay-chain.
63
    pub para_id: ParaId,
64
    /// The block proposer used for building blocks.
65
    pub proposer: Proposer,
66
    /// The collator service used for bundling proposals into collations and announcing
67
    /// to the network.
68
    pub collator_service: CS,
69
}
70

            
71
/// A utility struct for writing collation logic that makes use of
72
/// Tanssi Aura entirely or in part.
73
pub struct Collator<Block, P, BI, CIDP, RClient, Proposer, CS> {
74
    create_inherent_data_providers: CIDP,
75
    block_import: BI,
76
    relay_client: RClient,
77
    keystore: KeystorePtr,
78
    para_id: ParaId,
79
    proposer: Proposer,
80
    collator_service: CS,
81
    _marker: std::marker::PhantomData<(Block, Box<dyn Fn(P) + Send + Sync + 'static>)>,
82
}
83

            
84
impl<Block, P, BI, CIDP, RClient, Proposer, CS> Collator<Block, P, BI, CIDP, RClient, Proposer, CS>
85
where
86
    Block: BlockT,
87
    RClient: RelayChainInterface,
88
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)> + 'static,
89
    BI: BlockImport<Block> + Send + Sync + 'static,
90
    Proposer: ProposerInterface<Block>,
91
    CS: CollatorServiceInterface<Block>,
92
    P: Pair + Send + Sync + 'static,
93
    P::Public: AppPublic + Member,
94
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
95
{
96
    /// Instantiate a new instance of the `Tanssi Aura` manager.
97
    pub fn new(params: Params<BI, CIDP, RClient, Proposer, CS>) -> Self {
98
        Collator {
99
            create_inherent_data_providers: params.create_inherent_data_providers,
100
            block_import: params.block_import,
101
            relay_client: params.relay_client,
102
            keystore: params.keystore,
103
            para_id: params.para_id,
104
            proposer: params.proposer,
105
            collator_service: params.collator_service,
106
            _marker: std::marker::PhantomData,
107
        }
108
    }
109

            
110
    /// Explicitly creates the inherent data for parachain block authoring.
111
    pub async fn create_inherent_data(
112
        &self,
113
        relay_parent: PHash,
114
        validation_data: &PersistedValidationData,
115
        parent_hash: <Block as BlockT>::Hash,
116
        _timestamp: impl Into<Option<Timestamp>>,
117
        relay_parent_descendants: Option<RelayParentData>,
118
    ) -> Result<(ParachainInherentData, InherentData), Box<dyn Error + Send + Sync + 'static>> {
119
        let paras_inherent_data = ParachainInherentDataProvider::create_at(
120
            relay_parent,
121
            &self.relay_client,
122
            validation_data,
123
            self.para_id,
124
            relay_parent_descendants
125
                .map(RelayParentData::into_inherent_descendant_list)
126
                .unwrap_or_default(),
127
        )
128
        .await;
129

            
130
        let paras_inherent_data = match paras_inherent_data {
131
            Some(p) => p,
132
            None => {
133
                return Err(
134
                    format!("Could not create paras inherent data at {:?}", relay_parent).into(),
135
                )
136
            }
137
        };
138

            
139
        let other_inherent_data = self
140
            .create_inherent_data_providers
141
            .create_inherent_data_providers(parent_hash, (relay_parent, validation_data.clone()))
142
            .map_err(|e| e as Box<dyn Error + Send + Sync + 'static>)
143
            .await?
144
            .create_inherent_data()
145
            .await
146
            .map_err(Box::new)?;
147

            
148
        Ok((paras_inherent_data, other_inherent_data))
149
    }
150

            
151
    /// Propose, seal, and import a block, packaging it into a collation.
152
    ///
153
    /// Provide the slot to build at as well as any other necessary pre-digest logs,
154
    /// the inherent data, and the proposal duration and PoV size limits.
155
    ///
156
    /// The Tanssi Aura pre-digest is set internally.
157
    ///
158
    /// This does not announce the collation to the parachain network or the relay chain.
159
    #[allow(clippy::cast_precision_loss)]
160
    pub async fn collate(
161
        &mut self,
162
        parent_header: &Block::Header,
163
        slot_claim: &mut SlotClaim<P::Public>,
164
        additional_pre_digest: impl Into<Option<Vec<DigestItem>>>,
165
        inherent_data: (ParachainInherentData, InherentData),
166
        proposal_duration: Duration,
167
        max_pov_size: usize,
168
    ) -> Result<Option<(Collation, ParachainBlockData<Block>)>, Box<dyn Error + Send + 'static>>
169
    {
170
        let mut digest = additional_pre_digest.into().unwrap_or_default();
171
        digest.append(&mut slot_claim.pre_digest);
172

            
173
        let maybe_proposal = self
174
            .proposer
175
            .propose(
176
                parent_header,
177
                &inherent_data.0,
178
                inherent_data.1,
179
                Digest { logs: digest },
180
                proposal_duration,
181
                Some(max_pov_size),
182
            )
183
            .await
184
            .map_err(|e| Box::new(e) as Box<dyn Error + Send>)?;
185

            
186
        let proposal = match maybe_proposal {
187
            None => return Ok(None),
188
            Some(p) => p,
189
        };
190

            
191
        let sealed_importable = seal_tanssi::<_, P>(
192
            proposal.block,
193
            proposal.storage_changes,
194
            &slot_claim.author_pub,
195
            &self.keystore,
196
        )
197
        .map_err(|e| e as Box<dyn Error + Send>)?;
198

            
199
        let post_hash = sealed_importable.post_hash();
200
        let block = Block::new(
201
            sealed_importable.post_header(),
202
            sealed_importable
203
                .body
204
                .as_ref()
205
                .expect("body always created with this `propose` fn; qed")
206
                .clone(),
207
        );
208

            
209
        self.block_import
210
            .import_block(sealed_importable)
211
            .map_err(|e| Box::new(e) as Box<dyn Error + Send>)
212
            .await?;
213

            
214
        if let Some((collation, block_data)) = self.collator_service.build_collation(
215
            parent_header,
216
            post_hash,
217
            ParachainCandidate {
218
                block,
219
                proof: proposal.proof,
220
            },
221
        ) {
222
            // Inlining this function to change log target
223
            //block_data.log_size_info();
224
            tracing::info!(
225
                target: crate::LOG_TARGET,
226
                header_kb = %block_data.blocks().iter().map(|b| b.header().encoded_size()).sum::<usize>() as f64 / 1024f64,
227
                extrinsics_kb = %block_data.blocks().iter().map(|b| b.extrinsics().encoded_size()).sum::<usize>() as f64 / 1024f64,
228
                storage_proof_kb = %block_data.proof().encoded_size() as f64 / 1024f64,
229
                "PoV size",
230
            );
231

            
232
            if let MaybeCompressedPoV::Compressed(ref pov) = collation.proof_of_validity {
233
                tracing::info!(
234
                    target: crate::LOG_TARGET,
235
                    "Compressed PoV size: {}kb",
236
                    pov.block_data.0.len() as f64 / 1024f64,
237
                );
238
            }
239

            
240
            Ok(Some((collation, block_data)))
241
        } else {
242
            Err(
243
                Box::<dyn Error + Send + Sync>::from("Unable to produce collation")
244
                    as Box<dyn Error + Send>,
245
            )
246
        }
247
    }
248

            
249
    /// Get the underlying collator service.
250
    pub fn collator_service(&self) -> &CS {
251
        &self.collator_service
252
    }
253
}
254

            
255
fn pre_digest_data<P: Pair>(slot: Slot, claim: P::Public) -> Vec<sp_runtime::DigestItem>
256
where
257
    P::Public: Codec,
258
    P::Signature: Codec,
259
{
260
    vec![
261
        <DigestItem as CompatibleDigestItem<P::Signature>>::aura_pre_digest(slot),
262
        // We inject the nimbus digest as well. Crutial to be able to verify signatures
263
        <DigestItem as NimbusCompatibleDigestItem>::nimbus_pre_digest(
264
            // TODO remove this unwrap through trait reqs
265
            nimbus_primitives::NimbusId::from_slice(claim.as_ref()).unwrap(),
266
        ),
267
    ]
268
}
269

            
270
#[derive(Debug)]
271
pub struct SlotClaim<Pub> {
272
    author_pub: Pub,
273
    pre_digest: Vec<DigestItem>,
274
    slot: Slot,
275
}
276

            
277
impl<Pub: Clone> SlotClaim<Pub> {
278
    pub fn unchecked<P>(author_pub: Pub, slot: Slot) -> Self
279
    where
280
        P: Pair<Public = Pub>,
281
        P::Public: Codec,
282
        P::Signature: Codec,
283
    {
284
        SlotClaim {
285
            author_pub: author_pub.clone(),
286
            pre_digest: pre_digest_data::<P>(slot, author_pub),
287
            slot,
288
        }
289
    }
290

            
291
    /// Get the author's public key.
292
    pub fn author_pub(&self) -> &Pub {
293
        &self.author_pub
294
    }
295

            
296
    /// Get the pre-digest.
297
    pub fn pre_digest(&self) -> &Vec<DigestItem> {
298
        &self.pre_digest
299
    }
300

            
301
    /// Get the slot assigned to this claim.
302
    pub fn slot(&self) -> Slot {
303
        self.slot
304
    }
305
}
306

            
307
#[derive(Debug, PartialEq, Eq, Copy, Clone)]
308
pub enum ClaimMode {
309
    ForceAuthoring,
310
    NormalAuthoring,
311
    ParathreadCoreBuying { drift_permitted: Slot },
312
}
313

            
314
/// Attempt to claim a slot locally.
315
pub fn tanssi_claim_slot<P, B>(
316
    aux_data: OrchestratorAuraWorkerAuxData<P>,
317
    chain_head: &B::Header,
318
    slot: Slot,
319
    claim_mode: ClaimMode,
320
    keystore: &KeystorePtr,
321
) -> Option<SlotClaim<P::Public>>
322
where
323
    P: Pair + Send + Sync + 'static,
324
    P::Public: Codec + std::fmt::Debug,
325
    P::Signature: Codec,
326
    B: BlockT,
327
{
328
    let author_pub = claim_slot_inner::<P>(slot, &aux_data.authorities, keystore, claim_mode)?;
329

            
330
    if is_parathread_and_should_skip_slot::<P, B>(&aux_data, chain_head, slot, claim_mode) {
331
        return None;
332
    }
333

            
334
    Some(SlotClaim::unchecked::<P>(author_pub, slot))
335
}
336

            
337
/// Returns true if this container chain is a parathread and the collator should skip this slot and not produce a block
338
pub fn is_parathread_and_should_skip_slot<P, B>(
339
    aux_data: &OrchestratorAuraWorkerAuxData<P>,
340
    chain_head: &B::Header,
341
    slot: Slot,
342
    claim_mode: ClaimMode,
343
) -> bool
344
where
345
    P: Pair + Send + Sync + 'static,
346
    P::Public: Codec + std::fmt::Debug,
347
    P::Signature: Codec,
348
    B: BlockT,
349
{
350
    if slot.is_zero() {
351
        // Always produce on slot 0 (for tests)
352
        return false;
353
    }
354
    if let Some(slot_freq) = &aux_data.slot_freq {
355
        if let Ok(chain_head_slot) = find_pre_digest::<B, P::Signature>(chain_head) {
356
            // TODO: this doesn't take into account force authoring.
357
            // So a node with `force_authoring = true` will not propose a block for a parathread until the
358
            // `min_slot_freq` has elapsed.
359
            match claim_mode {
360
                ClaimMode::NormalAuthoring | ClaimMode::ForceAuthoring => {
361
                    !slot_freq.should_parathread_author_block(slot, chain_head_slot)
362
                }
363
                ClaimMode::ParathreadCoreBuying { drift_permitted } => {
364
                    !slot_freq.should_parathread_buy_core(slot, drift_permitted, chain_head_slot)
365
                }
366
            }
367
        } else {
368
            // In case of error always propose
369
            false
370
        }
371
    } else {
372
        // Not a parathread: always propose
373
        false
374
    }
375
}
376

            
377
/// Attempt to claim a slot using a keystore.
378
pub fn claim_slot_inner<P>(
379
    slot: Slot,
380
    authorities: &Vec<AuthorityId<P>>,
381
    keystore: &KeystorePtr,
382
    claim_mode: ClaimMode,
383
) -> Option<P::Public>
384
where
385
    P: Pair,
386
    P::Public: Codec + std::fmt::Debug,
387
    P::Signature: Codec,
388
{
389
    let expected_author = crate::slot_author::<P>(slot, authorities.as_slice());
390
    // if running with force-authoring, as long as you are in the authority set, propose
391
    if claim_mode == ClaimMode::ForceAuthoring {
392
        authorities
393
            .iter()
394
            .find(|key| keystore.has_keys(&[(key.to_raw_vec(), NIMBUS_KEY_ID)]))
395
            .cloned()
396
    }
397
    // if not running with force-authoring, just do the usual slot check
398
    else {
399
        expected_author.and_then(|p| {
400
            if keystore.has_keys(&[(p.to_raw_vec(), NIMBUS_KEY_ID)]) {
401
                Some(p.clone())
402
            } else {
403
                None
404
            }
405
        })
406
    }
407
}
408

            
409
/// Seal a block with a signature in the header.
410
/// This is a copy of [`cumulus_client_consensus_aura::collator::seal`] but using Nimbus instead of
411
/// Aura for the signature.
412
pub fn seal_tanssi<B: BlockT, P>(
413
    pre_sealed: B,
414
    storage_changes: StorageChanges<HashingFor<B>>,
415
    author_pub: &P::Public,
416
    keystore: &KeystorePtr,
417
) -> Result<BlockImportParams<B>, Box<dyn Error + Send + Sync + 'static>>
418
where
419
    P: Pair,
420
    P::Signature: Codec + TryFrom<Vec<u8>>,
421
    P::Public: AppPublic,
422
{
423
    let (pre_header, body) = pre_sealed.deconstruct();
424
    let pre_hash = pre_header.hash();
425
    let block_number = *pre_header.number();
426

            
427
    // sign the pre-sealed hash of the block and then
428
    // add it to a digest item.
429
    let signature = Keystore::sign_with(
430
        keystore,
431
        <AuthorityId<P> as AppCrypto>::ID,
432
        <AuthorityId<P> as AppCrypto>::CRYPTO_ID,
433
        author_pub.as_slice(),
434
        pre_hash.as_ref(),
435
    )
436
    .map_err(|e| sp_consensus::Error::CannotSign(format!("{}. Key: {:?}", e, author_pub)))?
437
    .ok_or_else(|| {
438
        sp_consensus::Error::CannotSign(format!(
439
            "Could not find key in keystore. Key: {:?}",
440
            author_pub
441
        ))
442
    })?;
443
    let signature = signature
444
        .as_slice()
445
        .try_into()
446
        .map_err(|_| sp_consensus::Error::InvalidSignature(signature, author_pub.to_raw_vec()))?;
447

            
448
    let signature_digest_item = <DigestItem as NimbusCompatibleDigestItem>::nimbus_seal(signature);
449

            
450
    // seal the block.
451
    let block_import_params = {
452
        let mut block_import_params = BlockImportParams::new(BlockOrigin::Own, pre_header);
453
        block_import_params.post_digests.push(signature_digest_item);
454
        block_import_params.body = Some(body);
455
        block_import_params.state_action =
456
            StateAction::ApplyChanges(sc_consensus::StorageChanges::Changes(storage_changes));
457
        block_import_params.fork_choice = Some(ForkChoiceStrategy::LongestChain);
458
        block_import_params
459
    };
460
    let post_hash = block_import_params.post_hash();
461

            
462
    tracing::info!(
463
        target: crate::LOG_TARGET,
464
        "🔖 Pre-sealed block for proposal at {}. Hash now {:?}, previously {:?}.",
465
        block_number,
466
        post_hash,
467
        pre_hash,
468
    );
469

            
470
    Ok(block_import_params)
471
}