1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! A collator for Tanssi Aura that looks ahead of the most recently included parachain block
18
//! when determining what to build upon.
19
//!
20
//! This collator also builds additional blocks when the maximum backlog is not saturated.
21
//! The size of the backlog is determined by invoking a runtime API. If that runtime API
22
//! is not supported, this assumes a maximum backlog size of 1.
23
//!
24
//! This takes more advantage of asynchronous backing, though not complete advantage.
25
//! When the backlog is not saturated, this approach lets the backlog temporarily 'catch up'
26
//! with periods of higher throughput. When the backlog is saturated, we typically
27
//! fall back to the limited cadence of a single parachain block per relay-chain block.
28
//!
29
//! Despite this, the fact that there is a backlog at all allows us to spend more time
30
//! building the block, as there is some buffer before it can get posted to the relay-chain.
31
//! The main limitation is block propagation time - i.e. the new blocks created by an author
32
//! must be propagated to the next author before their turn.
33

            
34
use {
35
    crate::{
36
        collators::{self as collator_util, tanssi_claim_slot, ClaimMode, SlotClaim},
37
        consensus_orchestrator::RetrieveAuthoritiesFromOrchestrator,
38
        OrchestratorAuraWorkerAuxData,
39
    },
40
    async_backing_primitives::UnincludedSegmentApi,
41
    cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface,
42
    cumulus_client_consensus_common::{
43
        self as consensus_common, load_abridged_host_configuration, ParentSearchParams,
44
    },
45
    cumulus_client_consensus_proposer::ProposerInterface,
46
    cumulus_primitives_core::{
47
        relay_chain::{AsyncBackingParams, CoreIndex, CoreState, Hash as PHash},
48
        PersistedValidationData,
49
    },
50
    cumulus_relay_chain_interface::RelayChainInterface,
51
    futures::{channel::oneshot, prelude::*},
52
    nimbus_primitives::NimbusId,
53
    pallet_xcm_core_buyer_runtime_api::{BuyingError, XCMCoreBuyerApi},
54
    parity_scale_codec::{Codec, Encode},
55
    polkadot_node_primitives::SubmitCollationParams,
56
    polkadot_node_subsystem::messages::{
57
        CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
58
    },
59
    polkadot_overseer::Handle as OverseerHandle,
60
    polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption},
61
    sc_client_api::{backend::AuxStore, BlockBackend, BlockOf},
62
    sc_consensus::BlockImport,
63
    sc_consensus_slots::InherentDataProviderExt,
64
    sc_transaction_pool_api::TransactionPool,
65
    sp_api::{ApiError, ProvideRuntimeApi},
66
    sp_blockchain::{HeaderBackend, HeaderMetadata},
67
    sp_consensus::SyncOracle,
68
    sp_consensus_aura::{Slot, SlotDuration},
69
    sp_core::crypto::Pair,
70
    sp_inherents::CreateInherentDataProviders,
71
    sp_keystore::KeystorePtr,
72
    sp_runtime::{
73
        traits::{Block as BlockT, BlockIdTo, Header as HeaderT, Member},
74
        transaction_validity::TransactionSource,
75
    },
76
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
77
    std::{convert::TryFrom, error::Error, sync::Arc, time::Duration},
78
    tokio::select,
79
    tokio_util::sync::CancellationToken,
80
    tp_xcm_core_buyer::{BuyCollatorProofCreationError, BuyCoreCollatorProof},
81
};
82

            
83
#[derive(Debug)]
84
pub enum BuyCoreError<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> {
85
    NotAParathread,
86
    UnableToClaimSlot,
87
    UnableToFindKeyForSigning,
88
    SlotDriftConversionOverflow,
89
    ApiError(ApiError),
90
    BuyingValidationError(BuyingError<BlockNumber>),
91
    UnableToCreateProof(BuyCollatorProofCreationError),
92
    TxSubmissionError(PoolError),
93
}
94

            
95
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug>
96
    BuyCoreError<BlockNumber, PoolError>
97
{
98
    fn log_error<Blockhash: std::fmt::Debug>(
99
        &self,
100
        slot: Slot,
101
        para_id: ParaId,
102
        relay_parent: Blockhash,
103
    ) {
104
        match self {
105
            BuyCoreError::NotAParathread => {
106
                tracing::trace!(
107
                    target: crate::LOG_TARGET,
108
                    ?relay_parent,
109
                    ?para_id,
110
                    ?slot,
111
                    "Para is not a parathread, skipping an attempt to buy core",
112
                );
113
            }
114
            BuyCoreError::UnableToClaimSlot => {
115
                tracing::trace!(
116
                    target: crate::LOG_TARGET,
117
                    ?relay_parent,
118
                    ?para_id,
119
                    ?slot,
120
                    "Unable to claim slot for parathread, skipping attempt to buy the core.",
121
                );
122
            }
123
            BuyCoreError::UnableToFindKeyForSigning => {
124
                tracing::error!(
125
                    target: crate::LOG_TARGET,
126
                    ?relay_parent,
127
                    ?para_id,
128
                    ?slot,
129
                    "Unable to generate buy core proof as unable to find corresponding key",
130
                );
131
            }
132
            BuyCoreError::SlotDriftConversionOverflow => {
133
                tracing::error!(
134
                    target: crate::LOG_TARGET,
135
                    ?relay_parent,
136
                    ?para_id,
137
                    ?slot,
138
                    "Unable to calculate container chain slot drift from orchestrator chain's slot drift",
139
                );
140
            }
141
            BuyCoreError::ApiError(api_error) => {
142
                tracing::error!(
143
                    target: crate::LOG_TARGET,
144
                    ?relay_parent,
145
                    ?para_id,
146
                    ?slot,
147
                    ?api_error,
148
                    "Unable to call orchestrator runtime api",
149
                );
150
            }
151
            BuyCoreError::BuyingValidationError(buying_error) => {
152
                tracing::error!(
153
                    target: crate::LOG_TARGET,
154
                    ?relay_parent,
155
                    ?para_id,
156
                    ?buying_error,
157
                    ?slot,
158
                    "Buying core is not allowed right now",
159
                );
160
            }
161
            BuyCoreError::UnableToCreateProof(proof_error) => {
162
                tracing::error!(
163
                    target: crate::LOG_TARGET,
164
                    ?relay_parent,
165
                    ?para_id,
166
                    ?slot,
167
                    ?proof_error,
168
                    "Unable to generate buy core proof due to an error",
169
                );
170
            }
171
            BuyCoreError::TxSubmissionError(pool_error) => {
172
                tracing::error!(
173
                    target: crate::LOG_TARGET,
174
                    ?relay_parent,
175
                    ?para_id,
176
                    ?slot,
177
                    ?pool_error,
178
                    "Unable to send buy core unsigned extrinsic through orchestrator tx pool",
179
                );
180
            }
181
        }
182
    }
183
}
184

            
185
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyingError<BlockNumber>>
186
    for BuyCoreError<BlockNumber, PoolError>
187
{
188
    fn from(buying_error: BuyingError<BlockNumber>) -> Self {
189
        BuyCoreError::BuyingValidationError(buying_error)
190
    }
191
}
192

            
193
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<ApiError>
194
    for BuyCoreError<BlockNumber, PoolError>
195
{
196
    fn from(api_error: ApiError) -> Self {
197
        BuyCoreError::ApiError(api_error)
198
    }
199
}
200

            
201
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyCollatorProofCreationError>
202
    for BuyCoreError<BlockNumber, PoolError>
203
{
204
    fn from(proof_creation_error: BuyCollatorProofCreationError) -> Self {
205
        BuyCoreError::UnableToCreateProof(proof_creation_error)
206
    }
207
}
208

            
209
pub async fn try_to_buy_core<Block, OBlock, OBlockNumber, P, CIDP, TxPool, OClient>(
210
    para_id: ParaId,
211
    aux_data: OrchestratorAuraWorkerAuxData<P>,
212
    inherent_providers: CIDP::InherentDataProviders,
213
    keystore: &KeystorePtr,
214
    orchestrator_client: Arc<OClient>,
215
    orchestrator_tx_pool: Arc<TxPool>,
216
    parent_header: <Block as BlockT>::Header,
217
    orchestrator_slot_duration: SlotDuration,
218
    container_slot_duration: SlotDuration,
219
) -> Result<
220
    <TxPool as TransactionPool>::Hash,
221
    BuyCoreError<
222
        <<OBlock as BlockT>::Header as HeaderT>::Number,
223
        <TxPool as TransactionPool>::Error,
224
    >,
225
>
226
where
227
    Block: BlockT,
228
    OBlock: BlockT,
229
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
230
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
231
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
232
        + Send
233
        + 'static
234
        + Clone,
235
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
236
    OClient: ProvideRuntimeApi<OBlock>
237
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
238
        + HeaderBackend<OBlock>
239
        + BlockBackend<OBlock>
240
        + BlockIdTo<OBlock>
241
        + 'static,
242
    OClient::Api: TaggedTransactionQueue<OBlock>
243
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
244
    TxPool: TransactionPool<Block = OBlock>,
245
{
246
    // We do nothing if this is not a parathread
247
    if aux_data.slot_freq.is_none() {
248
        return Err(BuyCoreError::NotAParathread);
249
    }
250

            
251
    let orchestrator_best_hash = orchestrator_client.info().best_hash;
252
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
253

            
254
    let buy_core_slot_drift = orchestrator_client
255
        .runtime_api()
256
        .get_buy_core_slot_drift(orchestrator_best_hash)?;
257

            
258
    // Convert drift in terms of container chain slots for parity between client side calculation and
259
    // orchestrator runtime calculation
260
    // We tried to made this calculation as generic as possible so that it can handle
261
    // arbitrary slot timings as well and won't assume anything.
262
    // Formula is: (Slot_drift_in_orchestrator_slot * orchestrator_slot_duration) / container_slot_duration
263
    let buy_core_container_slot_drift = buy_core_slot_drift
264
        .checked_mul(orchestrator_slot_duration.as_millis())
265
        .and_then(|intermediate_result| {
266
            intermediate_result.checked_div(container_slot_duration.as_millis())
267
        })
268
        .ok_or(BuyCoreError::SlotDriftConversionOverflow)?;
269

            
270
    let current_container_slot = inherent_providers.slot();
271

            
272
    let slot_claim = tanssi_claim_slot::<P, Block>(
273
        aux_data,
274
        &parent_header,
275
        current_container_slot,
276
        ClaimMode::ParathreadCoreBuying {
277
            drift_permitted: buy_core_container_slot_drift.into(),
278
        },
279
        keystore,
280
    )
281
    .ok_or(BuyCoreError::UnableToClaimSlot)?;
282

            
283
    let pubkey = slot_claim.author_pub;
284

            
285
    orchestrator_runtime_api.is_core_buying_allowed(
286
        orchestrator_best_hash,
287
        para_id,
288
        pubkey.clone(),
289
    )??;
290

            
291
    let nonce =
292
        orchestrator_runtime_api.get_buy_core_signature_nonce(orchestrator_best_hash, para_id)?;
293

            
294
    let collator_buy_core_proof =
295
        BuyCoreCollatorProof::new_with_keystore(nonce, para_id, pubkey, keystore)?
296
            .ok_or(BuyCoreError::UnableToFindKeyForSigning)?;
297

            
298
    let extrinsic = orchestrator_runtime_api.create_buy_core_unsigned_extrinsic(
299
        orchestrator_best_hash,
300
        para_id,
301
        collator_buy_core_proof,
302
    )?;
303

            
304
    orchestrator_tx_pool
305
        .submit_one(orchestrator_best_hash, TransactionSource::Local, *extrinsic)
306
        .await
307
        .map_err(BuyCoreError::TxSubmissionError)
308
}
309

            
310
/// Parameters for [`run`].
311
pub struct Params<
312
    GSD,
313
    BI,
314
    CIDP,
315
    Client,
316
    Backend,
317
    RClient,
318
    CHP,
319
    SO,
320
    Proposer,
321
    CS,
322
    GOH,
323
    TxPool,
324
    OClient,
325
> {
326
    pub get_current_slot_duration: GSD,
327
    pub create_inherent_data_providers: CIDP,
328
    pub get_orchestrator_aux_data: GOH,
329
    pub block_import: BI,
330
    pub para_client: Arc<Client>,
331
    pub para_backend: Arc<Backend>,
332
    pub relay_client: RClient,
333
    pub code_hash_provider: CHP,
334
    pub sync_oracle: SO,
335
    pub keystore: KeystorePtr,
336
    pub collator_key: CollatorPair,
337
    pub para_id: ParaId,
338
    pub overseer_handle: OverseerHandle,
339
    pub orchestrator_slot_duration: SlotDuration,
340
    pub relay_chain_slot_duration: Duration,
341
    pub proposer: Proposer,
342
    pub collator_service: CS,
343
    pub authoring_duration: Duration,
344
    pub force_authoring: bool,
345
    pub cancellation_token: CancellationToken,
346
    pub buy_core_params: BuyCoreParams<TxPool, OClient>,
347
}
348

            
349
pub enum BuyCoreParams<TxPool, OClient> {
350
    Orchestrator {
351
        orchestrator_tx_pool: Arc<TxPool>,
352
        orchestrator_client: Arc<OClient>,
353
    },
354
    Solochain {
355
        // TODO: relay_tx_pool
356
    },
357
}
358

            
359
impl<TxPool, OClient> Clone for BuyCoreParams<TxPool, OClient> {
360
    fn clone(&self) -> Self {
361
        match self {
362
            Self::Orchestrator {
363
                orchestrator_tx_pool,
364
                orchestrator_client,
365
            } => Self::Orchestrator {
366
                orchestrator_tx_pool: orchestrator_tx_pool.clone(),
367
                orchestrator_client: orchestrator_client.clone(),
368
            },
369
            Self::Solochain {} => Self::Solochain {},
370
        }
371
    }
372
}
373

            
374
/// Run async-backing-friendly for Tanssi Aura.
375
pub fn run<
376
    GSD,
377
    Block,
378
    P,
379
    BI,
380
    CIDP,
381
    Client,
382
    Backend,
383
    RClient,
384
    CHP,
385
    SO,
386
    Proposer,
387
    CS,
388
    GOH,
389
    TxPool,
390
    OClient,
391
    OBlock,
392
>(
393
    mut params: Params<
394
        GSD,
395
        BI,
396
        CIDP,
397
        Client,
398
        Backend,
399
        RClient,
400
        CHP,
401
        SO,
402
        Proposer,
403
        CS,
404
        GOH,
405
        TxPool,
406
        OClient,
407
    >,
408
) -> (
409
    impl Future<Output = ()> + Send + 'static,
410
    oneshot::Receiver<()>,
411
)
412
where
413
    Block: BlockT,
414
    Client: ProvideRuntimeApi<Block>
415
        + BlockOf
416
        + AuxStore
417
        + HeaderBackend<Block>
418
        + BlockBackend<Block>
419
        + Send
420
        + Sync
421
        + 'static,
422
    Client::Api: UnincludedSegmentApi<Block>,
423
    Backend: sc_client_api::Backend<Block> + 'static,
424
    RClient: RelayChainInterface + Clone + 'static,
425
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
426
        + Send
427
        + 'static
428
        + Clone,
429
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
430
    BI: BlockImport<Block> + Send + Sync + 'static,
431
    SO: SyncOracle + Send + Sync + Clone + 'static,
432
    Proposer: ProposerInterface<Block> + Send + Sync + 'static,
433
    CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
434
    CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
435
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
436
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
437
    GOH: RetrieveAuthoritiesFromOrchestrator<
438
            Block,
439
            (PHash, PersistedValidationData),
440
            OrchestratorAuraWorkerAuxData<P>,
441
        >
442
        + 'static
443
        + Sync
444
        + Send,
445
    OBlock: BlockT,
446
    OClient: ProvideRuntimeApi<OBlock>
447
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
448
        + HeaderBackend<OBlock>
449
        + BlockBackend<OBlock>
450
        + BlockIdTo<OBlock>
451
        + 'static,
452
    OClient::Api: TaggedTransactionQueue<OBlock>
453
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
454
    TxPool: TransactionPool<Block = OBlock> + 'static,
455
    GSD: Fn(<Block as BlockT>::Hash) -> SlotDuration + Send + 'static,
456
{
457
    // This is an arbitrary value which is likely guaranteed to exceed any reasonable
458
    // limit, as it would correspond to 10 non-included blocks.
459
    //
460
    // Since we only search for parent blocks which have already been imported,
461
    // we can guarantee that all imported blocks respect the unincluded segment
462
    // rules specified by the parachain's runtime and thus will never be too deep.
463
    const PARENT_SEARCH_DEPTH: usize = 10;
464

            
465
    let (exit_notification_sender, exit_notification_receiver) = oneshot::channel();
466

            
467
    let aura_fut = async move {
468
        cumulus_client_collator::initialize_collator_subsystems(
469
            &mut params.overseer_handle,
470
            params.collator_key,
471
            params.para_id,
472
            true,
473
        )
474
        .await;
475

            
476
        let mut import_notifications = match params.relay_client.import_notification_stream().await
477
        {
478
            Ok(s) => s,
479
            Err(err) => {
480
                tracing::error!(
481
                    target: crate::LOG_TARGET,
482
                    ?err,
483
                    "Failed to initialize consensus: no relay chain import notification stream"
484
                );
485

            
486
                return;
487
            }
488
        };
489

            
490
        let mut collator = {
491
            let params = collator_util::Params {
492
                create_inherent_data_providers: params.create_inherent_data_providers.clone(),
493
                block_import: params.block_import,
494
                relay_client: params.relay_client.clone(),
495
                keystore: params.keystore.clone(),
496
                para_id: params.para_id,
497
                proposer: params.proposer,
498
                collator_service: params.collator_service,
499
            };
500

            
501
            collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
502
        };
503

            
504
        loop {
505
            select! {
506
                maybe_relay_parent_header = import_notifications.next() => {
507
                    if maybe_relay_parent_header.is_none() {
508
                        break;
509
                    }
510

            
511
                    let relay_parent_header = maybe_relay_parent_header.expect("relay_parent_header must exists as we checked for None variant above; qed");
512
                    let relay_parent = relay_parent_header.hash();
513

            
514
                    let max_pov_size = match params
515
                        .relay_client
516
                        .persisted_validation_data(
517
                            relay_parent,
518
                            params.para_id,
519
                            OccupiedCoreAssumption::Included,
520
                        )
521
                        .await
522
                    {
523
                        Ok(None) => continue,
524
                        Ok(Some(pvd)) => pvd.max_pov_size,
525
                        Err(err) => {
526
                            tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
527
                            continue;
528
                        }
529
                    };
530

            
531
                    let parent_search_params = ParentSearchParams {
532
                        relay_parent,
533
                        para_id: params.para_id,
534
                        ancestry_lookback: max_ancestry_lookback(relay_parent, &params.relay_client).await,
535
                        max_depth: PARENT_SEARCH_DEPTH,
536
                        ignore_alternative_branches: true,
537
                    };
538

            
539
                    let potential_parents =
540
                        cumulus_client_consensus_common::find_potential_parents::<Block>(
541
                            parent_search_params,
542
                            &*params.para_backend,
543
                            &params.relay_client,
544
                        )
545
                        .await;
546

            
547
                    let mut potential_parents = match potential_parents {
548
                        Err(e) => {
549
                            tracing::error!(
550
                                target: crate::LOG_TARGET,
551
                                ?relay_parent,
552
                                err = ?e,
553
                                "Could not fetch potential parents to build upon"
554
                            );
555

            
556
                            continue;
557
                        }
558
                        Ok(x) => x,
559
                    };
560

            
561
                    let included_block = match potential_parents.iter().find(|x| x.depth == 0) {
562
                        None => continue, // also serves as an `is_empty` check.
563
                        Some(b) => b.hash,
564
                    };
565

            
566
                    let para_client = &*params.para_client;
567
                    let keystore = &params.keystore;
568
                    let can_build_upon = |slot_now, block_hash, aux_data| {
569
                        can_build_upon::<_, _, P>(
570
                            slot_now,
571
                            aux_data,
572
                            block_hash,
573
                            included_block,
574
                            params.force_authoring,
575
                            para_client,
576
                            keystore,
577
                        )
578
                    };
579

            
580
                    // Sort by depth, ascending, to choose the longest chain.
581
                    //
582
                    // If the longest chain has space, build upon that. Otherwise, don't
583
                    // build at all.
584
                    potential_parents.sort_by_key(|a| a.depth);
585
                    let initial_parent = match potential_parents.pop() {
586
                        None => continue,
587
                        Some(p) => p,
588
                    };
589

            
590
                    // Build in a loop until not allowed. Note that the authorities can change
591
                    // at any block, so we need to re-claim our slot every time.
592
                    let mut parent_hash = initial_parent.hash;
593
                    let mut parent_header = initial_parent.header;
594

            
595
                    let core_indices = cores_scheduled_for_para(
596
                        relay_parent,
597
                        params.para_id,
598
                        &mut params.overseer_handle,
599
                        &params.relay_client,
600
                    )
601
                    .await;
602

            
603
                    let overseer_handle = &mut params.overseer_handle;
604

            
605
                    // This needs to change to support elastic scaling, but for continuously
606
                    // scheduled chains this ensures that the backlog will grow steadily.
607
                    for n_built in 0..2 {
608
                        let validation_data = PersistedValidationData {
609
                            parent_head: parent_header.encode().into(),
610
                            relay_parent_number: *relay_parent_header.number(),
611
                            relay_parent_storage_root: *relay_parent_header.state_root(),
612
                            max_pov_size,
613
                        };
614

            
615
                        // Retrieve authorities that are able to produce the block
616
                        let aux_data = match params
617
                            .get_orchestrator_aux_data
618
                            .retrieve_authorities_from_orchestrator(
619
                                parent_hash,
620
                                (relay_parent_header.hash(), validation_data.clone()),
621
                            )
622
                            .await
623
                        {
624
                            Err(e) => {
625
                                tracing::error!(target: crate::LOG_TARGET, ?e);
626
                                break;
627
                            }
628
                            Ok(h) => h,
629
                        };
630

            
631
                        let inherent_providers = match params
632
                            .create_inherent_data_providers
633
                            .create_inherent_data_providers(
634
                                parent_hash,
635
                                (relay_parent_header.hash(), validation_data.clone()),
636
                            )
637
                            .await
638
                        {
639
                            Err(e) => {
640
                                tracing::error!(target: crate::LOG_TARGET, ?e);
641
                                break;
642
                            }
643
                            Ok(h) => h,
644
                        };
645

            
646
                        // TODO: Currently we use just the first core here, but for elastic scaling
647
                        // we iterate and build on all of the cores returned.
648
                        // More info: https://github.com/paritytech/polkadot-sdk/issues/1829
649
                        let (is_parachain, core_index) = match (&aux_data.slot_freq, core_indices.first()) {
650
                            (None, None) => {
651
                                tracing::warn!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parachain and we do not have core allocated, nothing to do");
652
                                break;
653
                            }, // We are parachain and we do not have core allocated, nothing to do,
654
                            (None, Some(core_index)) => {
655
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, ?core_index, "We are parachain and we core allocated, let's collate the block");
656
                                (true, core_index)
657
                            }, // We are parachain and we have core allocated, let's continue
658
                            (Some(_slot_frequency), None) => { // We are parathread and core is not allocated. Let's try to buy core
659
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parathread and we do not have core allocated, let's try to buy the core");
660
                                let slot = inherent_providers.slot();
661
                                let container_chain_slot_duration = (params.get_current_slot_duration)(parent_header.hash());
662

            
663
                                let buy_core_result = match &params.buy_core_params {
664
                                    BuyCoreParams::Orchestrator {
665
                                        orchestrator_client,
666
                                        orchestrator_tx_pool,
667
                                    } => {
668
                                        try_to_buy_core::<_, _, <<OBlock as BlockT>::Header as HeaderT>::Number, _, CIDP, _, _>(params.para_id, aux_data, inherent_providers, &params.keystore, orchestrator_client.clone(), orchestrator_tx_pool.clone(), parent_header, params.orchestrator_slot_duration, container_chain_slot_duration).await
669
                                    }
670
                                    BuyCoreParams::Solochain {
671

            
672
                                    } => {
673
                                        // TODO: implement parathread support for solochain
674
                                        log::warn!("Unimplemented: cannot buy core for parathread in solochain");
675
                                        break;
676
                                    }
677
                                };
678
                                match buy_core_result {
679
                                    Ok(block_hash) => {
680
                                        tracing::trace!(target: crate::LOG_TARGET, ?block_hash, "Sent unsigned extrinsic to buy the core");
681
                                    },
682
                                    Err(buy_core_error) => {
683
                                        buy_core_error.log_error(slot, params.para_id, relay_parent);
684
                                    }
685
                                };
686
                                break; // No point in continuing as we need to wait for few relay blocks in order for our core to be available.
687
                            },
688
                            (Some(_slot_frequency), Some(core_index)) => { // We are parathread and we do have core, let's continue
689
                                tracing::trace!(target: crate::LOG_TARGET, ?core_index, "We are parathread and we do have core allocated, let's collate the block");
690
                                (false, core_index)
691
                            }
692
                        };
693

            
694
                        let mut slot_claim = match can_build_upon(
695
                            inherent_providers.slot(),
696
                            parent_header.clone(),
697
                            aux_data,
698
                        )
699
                        .await
700
                        {
701
                            Ok(None) => break,
702
                            Err(e) => {
703
                                tracing::error!(target: crate::LOG_TARGET, ?e);
704
                                break;
705
                            }
706
                            Ok(Some(c)) => c,
707
                        };
708

            
709
                        tracing::debug!(
710
                            target: crate::LOG_TARGET,
711
                            ?relay_parent,
712
                            unincluded_segment_len = initial_parent.depth + n_built,
713
                            "Slot claimed. Building"
714
                        );
715

            
716
                        // Build and announce collations recursively until
717
                        // `can_build_upon` fails or building a collation fails.
718
                        let (parachain_inherent_data, other_inherent_data) = match collator
719
                            .create_inherent_data(relay_parent, &validation_data, parent_hash, None)
720
                            .await
721
                        {
722
                            Err(err) => {
723
                                tracing::error!(target: crate::LOG_TARGET, ?err);
724
                                break;
725
                            }
726
                            Ok(x) => x,
727
                        };
728

            
729
                        let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash)
730
                        {
731
                            None => {
732
                                tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
733
                                break;
734
                            }
735
                            Some(v) => v,
736
                        };
737

            
738
                        match collator
739
                            .collate(
740
                                &parent_header,
741
                                &mut slot_claim,
742
                                None,
743
                                (parachain_inherent_data, other_inherent_data),
744
                                params.authoring_duration,
745
                                // Set the block limit to 50% of the maximum PoV size.
746
                                //
747
                                // TODO: If we got benchmarking that includes the proof size,
748
                                // we should be able to use the maximum pov size.
749
                                (validation_data.max_pov_size / 2) as usize,
750
                            )
751
                            .await
752
                        {
753
                            Ok(Some((collation, block_data, new_block_hash))) => {
754
                                // Here we are assuming that the import logic protects against equivocations
755
                                // and provides sybil-resistance, as it should.
756
                                collator
757
                                    .collator_service()
758
                                    .announce_block(new_block_hash, None);
759

            
760
                                // Send a submit-collation message to the collation generation subsystem,
761
                                // which then distributes this to validators.
762
                                //
763
                                // Here we are assuming that the leaf is imported, as we've gotten an
764
                                // import notification.
765
                                overseer_handle
766
                                    .send_msg(
767
                                        CollationGenerationMessage::SubmitCollation(
768
                                            SubmitCollationParams {
769
                                                relay_parent,
770
                                                collation,
771
                                                parent_head: parent_header.encode().into(),
772
                                                validation_code_hash,
773
                                                result_sender: None,
774
                                                core_index: *core_index
775
                                            },
776
                                        ),
777
                                        "SubmitCollation",
778
                                    )
779
                                    .await;
780

            
781
                                parent_hash = new_block_hash;
782
                                parent_header = block_data.into_header();
783
                            }
784
                            Ok(None) => {
785
                                tracing::debug!(target: crate::LOG_TARGET, "Lookahead collator: No block proposal");
786
                            }
787
                            Err(err) => {
788
                                tracing::error!(target: crate::LOG_TARGET, ?err);
789
                                break;
790
                            }
791
                        }
792

            
793
                        // If it is parathread, no point in async backing as we would have to do
794
                        // buy core first
795
                        if !is_parachain {
796
                            tracing::trace!(target: crate::LOG_TARGET, "Not a parachain so terminated at {:?}", n_built);
797
                            break;
798
                        }
799
                    }
800
                },
801
                _ = params.cancellation_token.cancelled() => {
802
                    log::info!("Stopping lookahead collator");
803
                    break;
804
                }
805
            }
806
        }
807

            
808
        // Notifying that we have exited
809
        let _ = exit_notification_sender.send(());
810
    };
811

            
812
    (aura_fut, exit_notification_receiver)
813
}
814

            
815
// Checks if we own the slot at the given block and whether there
816
// is space in the unincluded segment.
817
async fn can_build_upon<Block: BlockT, Client, P>(
818
    slot: Slot,
819
    aux_data: OrchestratorAuraWorkerAuxData<P>,
820
    parent_header: Block::Header,
821
    included_block: <Block as BlockT>::Hash,
822
    force_authoring: bool,
823
    client: &Client,
824
    keystore: &KeystorePtr,
825
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
826
where
827
    Client: ProvideRuntimeApi<Block>,
828
    Client::Api: UnincludedSegmentApi<Block>,
829
    P: Pair + Send + Sync + 'static,
830
    P::Public: Codec + std::fmt::Debug,
831
    P::Signature: Codec,
832
{
833
    let runtime_api = client.runtime_api();
834

            
835
    let claim_mode = if force_authoring {
836
        ClaimMode::ForceAuthoring
837
    } else {
838
        ClaimMode::NormalAuthoring
839
    };
840

            
841
    let slot_claim =
842
        tanssi_claim_slot::<P, Block>(aux_data, &parent_header, slot, claim_mode, keystore);
843

            
844
    // Here we lean on the property that building on an empty unincluded segment must always
845
    // be legal. Skipping the runtime API query here allows us to seamlessly run this
846
    // collator against chains which have not yet upgraded their runtime.
847
    if parent_header.hash() != included_block
848
        && !runtime_api.can_build_upon(parent_header.hash(), included_block, slot)?
849
    {
850
        return Ok(None);
851
    }
852

            
853
    Ok(slot_claim)
854
}
855

            
856
/// Reads allowed ancestry length parameter from the relay chain storage at the given relay parent.
857
///
858
/// Falls back to 0 in case of an error.
859
async fn max_ancestry_lookback(
860
    relay_parent: PHash,
861
    relay_client: &impl RelayChainInterface,
862
) -> usize {
863
    match load_abridged_host_configuration(relay_parent, relay_client).await {
864
        Ok(Some(config)) => config.async_backing_params.allowed_ancestry_len as usize,
865
        Ok(None) => {
866
            tracing::error!(
867
                target: crate::LOG_TARGET,
868
                "Active config is missing in relay chain storage",
869
            );
870
            0
871
        }
872
        Err(err) => {
873
            tracing::error!(
874
                target: crate::LOG_TARGET,
875
                ?err,
876
                ?relay_parent,
877
                "Failed to read active config from relay chain client",
878
            );
879
            0
880
        }
881
    }
882
}
883

            
884
// Checks if there exists a scheduled core for the para at the provided relay parent.
885
//
886
// Falls back to `false` in case of an error.
887
async fn cores_scheduled_for_para(
888
    relay_parent: PHash,
889
    para_id: ParaId,
890
    overseer_handle: &mut OverseerHandle,
891
    relay_client: &impl RelayChainInterface,
892
) -> Vec<CoreIndex> {
893
    let (tx, rx) = oneshot::channel();
894
    let request = RuntimeApiRequest::AvailabilityCores(tx);
895
    overseer_handle
896
        .send_msg(
897
            RuntimeApiMessage::Request(relay_parent, request),
898
            "LookaheadCollator",
899
        )
900
        .await;
901

            
902
    let max_candidate_depth = async_backing_params(relay_parent, relay_client)
903
        .await
904
        .map(|c| c.max_candidate_depth)
905
        .unwrap_or(0);
906

            
907
    let cores = match rx.await {
908
        Ok(Ok(cores)) => cores,
909
        Ok(Err(error)) => {
910
            tracing::error!(
911
                target: crate::LOG_TARGET,
912
                ?error,
913
                ?relay_parent,
914
                "Failed to query availability cores runtime API",
915
            );
916
            return Vec::new();
917
        }
918
        Err(oneshot::Canceled) => {
919
            tracing::error!(
920
                target: crate::LOG_TARGET,
921
                ?relay_parent,
922
                "Sender for availability cores runtime request dropped",
923
            );
924
            return Vec::new();
925
        }
926
    };
927

            
928
    cores
929
        .iter()
930
        .enumerate()
931
        .filter_map(|(index, core)| {
932
            let core_para_id = match core {
933
                CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
934
                CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
935
                    .next_up_on_available
936
                    .as_ref()
937
                    .map(|scheduled_core| scheduled_core.para_id),
938
                CoreState::Free | CoreState::Occupied(_) => None,
939
            };
940

            
941
            if core_para_id == Some(para_id) {
942
                Some(CoreIndex(index as u32))
943
            } else {
944
                None
945
            }
946
        })
947
        .collect()
948
}
949

            
950
/// Reads async backing parameters from the relay chain storage at the given relay parent.
951
async fn async_backing_params(
952
    relay_parent: PHash,
953
    relay_client: &impl RelayChainInterface,
954
) -> Option<AsyncBackingParams> {
955
    match load_abridged_host_configuration(relay_parent, relay_client).await {
956
        Ok(Some(config)) => Some(config.async_backing_params),
957
        Ok(None) => {
958
            tracing::error!(
959
                target: crate::LOG_TARGET,
960
                "Active config is missing in relay chain storage",
961
            );
962
            None
963
        }
964
        Err(err) => {
965
            tracing::error!(
966
                target: crate::LOG_TARGET,
967
                ?err,
968
                ?relay_parent,
969
                "Failed to read active config from relay chain client",
970
            );
971
            None
972
        }
973
    }
974
}