1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! A collator for Tanssi Aura that looks ahead of the most recently included parachain block
18
//! when determining what to build upon.
19
//!
20
//! This collator also builds additional blocks when the maximum backlog is not saturated.
21
//! The size of the backlog is determined by invoking a runtime API. If that runtime API
22
//! is not supported, this assumes a maximum backlog size of 1.
23
//!
24
//! This takes more advantage of asynchronous backing, though not complete advantage.
25
//! When the backlog is not saturated, this approach lets the backlog temporarily 'catch up'
26
//! with periods of higher throughput. When the backlog is saturated, we typically
27
//! fall back to the limited cadence of a single parachain block per relay-chain block.
28
//!
29
//! Despite this, the fact that there is a backlog at all allows us to spend more time
30
//! building the block, as there is some buffer before it can get posted to the relay-chain.
31
//! The main limitation is block propagation time - i.e. the new blocks created by an author
32
//! must be propagated to the next author before their turn.
33

            
34
use {
35
    crate::{
36
        collators::{self as collator_util, tanssi_claim_slot, ClaimMode, SlotClaim},
37
        consensus_orchestrator::RetrieveAuthoritiesFromOrchestrator,
38
        OrchestratorAuraWorkerAuxData,
39
    },
40
    async_backing_primitives::UnincludedSegmentApi,
41
    cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface,
42
    cumulus_client_consensus_common::{
43
        self as consensus_common, load_abridged_host_configuration, ParentSearchParams,
44
    },
45
    cumulus_client_consensus_proposer::ProposerInterface,
46
    cumulus_primitives_core::{
47
        relay_chain::{AsyncBackingParams, CoreIndex, Hash as PHash},
48
        PersistedValidationData,
49
    },
50
    cumulus_relay_chain_interface::{CoreState, RelayChainInterface},
51
    futures::{channel::oneshot, prelude::*},
52
    nimbus_primitives::NimbusId,
53
    pallet_xcm_core_buyer_runtime_api::{BuyingError, XCMCoreBuyerApi},
54
    parity_scale_codec::{Codec, Encode},
55
    polkadot_node_primitives::SubmitCollationParams,
56
    polkadot_node_subsystem::messages::{
57
        CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
58
    },
59
    polkadot_overseer::Handle as OverseerHandle,
60
    polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption},
61
    sc_client_api::{backend::AuxStore, BlockBackend, BlockOf},
62
    sc_consensus::BlockImport,
63
    sc_consensus_slots::InherentDataProviderExt,
64
    sc_transaction_pool_api::TransactionPool,
65
    sp_api::{ApiError, ProvideRuntimeApi},
66
    sp_blockchain::{HeaderBackend, HeaderMetadata},
67
    sp_consensus::SyncOracle,
68
    sp_consensus_aura::{Slot, SlotDuration},
69
    sp_core::crypto::Pair,
70
    sp_inherents::CreateInherentDataProviders,
71
    sp_keystore::KeystorePtr,
72
    sp_runtime::{
73
        traits::{Block as BlockT, BlockIdTo, Header as HeaderT, Member},
74
        transaction_validity::TransactionSource,
75
    },
76
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
77
    std::{convert::TryFrom, error::Error, sync::Arc, time::Duration},
78
    tokio::select,
79
    tokio_util::sync::CancellationToken,
80
    tp_xcm_core_buyer::{BuyCollatorProofCreationError, BuyCoreCollatorProof},
81
};
82

            
83
#[derive(Debug)]
84
pub enum BuyCoreError<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> {
85
    NotAParathread,
86
    UnableToClaimSlot,
87
    UnableToFindKeyForSigning,
88
    SlotDriftConversionOverflow,
89
    ApiError(ApiError),
90
    BuyingValidationError(BuyingError<BlockNumber>),
91
    UnableToCreateProof(BuyCollatorProofCreationError),
92
    TxSubmissionError(PoolError),
93
}
94

            
95
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug>
96
    BuyCoreError<BlockNumber, PoolError>
97
{
98
    fn log_error<Blockhash: std::fmt::Debug>(
99
        &self,
100
        slot: Slot,
101
        para_id: ParaId,
102
        relay_parent: Blockhash,
103
    ) {
104
        match self {
105
            BuyCoreError::NotAParathread => {
106
                tracing::trace!(
107
                    target: crate::LOG_TARGET,
108
                    ?relay_parent,
109
                    ?para_id,
110
                    ?slot,
111
                    "Para is not a parathread, skipping an attempt to buy core",
112
                );
113
            }
114
            BuyCoreError::UnableToClaimSlot => {
115
                tracing::trace!(
116
                    target: crate::LOG_TARGET,
117
                    ?relay_parent,
118
                    ?para_id,
119
                    ?slot,
120
                    "Unable to claim slot for parathread, skipping attempt to buy the core.",
121
                );
122
            }
123
            BuyCoreError::UnableToFindKeyForSigning => {
124
                tracing::error!(
125
                    target: crate::LOG_TARGET,
126
                    ?relay_parent,
127
                    ?para_id,
128
                    ?slot,
129
                    "Unable to generate buy core proof as unable to find corresponding key",
130
                );
131
            }
132
            BuyCoreError::SlotDriftConversionOverflow => {
133
                tracing::error!(
134
                    target: crate::LOG_TARGET,
135
                    ?relay_parent,
136
                    ?para_id,
137
                    ?slot,
138
                    "Unable to calculate container chain slot drift from orchestrator chain's slot drift",
139
                );
140
            }
141
            BuyCoreError::ApiError(api_error) => {
142
                tracing::error!(
143
                    target: crate::LOG_TARGET,
144
                    ?relay_parent,
145
                    ?para_id,
146
                    ?slot,
147
                    ?api_error,
148
                    "Unable to call orchestrator runtime api",
149
                );
150
            }
151
            BuyCoreError::BuyingValidationError(buying_error) => {
152
                tracing::error!(
153
                    target: crate::LOG_TARGET,
154
                    ?relay_parent,
155
                    ?para_id,
156
                    ?buying_error,
157
                    ?slot,
158
                    "Buying core is not allowed right now",
159
                );
160
            }
161
            BuyCoreError::UnableToCreateProof(proof_error) => {
162
                tracing::error!(
163
                    target: crate::LOG_TARGET,
164
                    ?relay_parent,
165
                    ?para_id,
166
                    ?slot,
167
                    ?proof_error,
168
                    "Unable to generate buy core proof due to an error",
169
                );
170
            }
171
            BuyCoreError::TxSubmissionError(pool_error) => {
172
                tracing::error!(
173
                    target: crate::LOG_TARGET,
174
                    ?relay_parent,
175
                    ?para_id,
176
                    ?slot,
177
                    ?pool_error,
178
                    "Unable to send buy core unsigned extrinsic through orchestrator tx pool",
179
                );
180
            }
181
        }
182
    }
183
}
184

            
185
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyingError<BlockNumber>>
186
    for BuyCoreError<BlockNumber, PoolError>
187
{
188
    fn from(buying_error: BuyingError<BlockNumber>) -> Self {
189
        BuyCoreError::BuyingValidationError(buying_error)
190
    }
191
}
192

            
193
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<ApiError>
194
    for BuyCoreError<BlockNumber, PoolError>
195
{
196
    fn from(api_error: ApiError) -> Self {
197
        BuyCoreError::ApiError(api_error)
198
    }
199
}
200

            
201
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyCollatorProofCreationError>
202
    for BuyCoreError<BlockNumber, PoolError>
203
{
204
    fn from(proof_creation_error: BuyCollatorProofCreationError) -> Self {
205
        BuyCoreError::UnableToCreateProof(proof_creation_error)
206
    }
207
}
208

            
209
pub async fn try_to_buy_core<Block, OBlock, OBlockNumber, P, CIDP, TxPool, OClient>(
210
    para_id: ParaId,
211
    aux_data: OrchestratorAuraWorkerAuxData<P>,
212
    inherent_providers: CIDP::InherentDataProviders,
213
    keystore: &KeystorePtr,
214
    orchestrator_client: Arc<OClient>,
215
    orchestrator_tx_pool: Arc<TxPool>,
216
    parent_header: <Block as BlockT>::Header,
217
    orchestrator_slot_duration: SlotDuration,
218
    container_slot_duration: SlotDuration,
219
) -> Result<
220
    <TxPool as TransactionPool>::Hash,
221
    BuyCoreError<
222
        <<OBlock as BlockT>::Header as HeaderT>::Number,
223
        <TxPool as TransactionPool>::Error,
224
    >,
225
>
226
where
227
    Block: BlockT,
228
    OBlock: BlockT,
229
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
230
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
231
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
232
        + Send
233
        + 'static
234
        + Clone,
235
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
236
    OClient: ProvideRuntimeApi<OBlock>
237
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
238
        + HeaderBackend<OBlock>
239
        + BlockBackend<OBlock>
240
        + BlockIdTo<OBlock>
241
        + 'static,
242
    OClient::Api: TaggedTransactionQueue<OBlock>
243
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
244
    TxPool: TransactionPool<Block = OBlock>,
245
{
246
    // We do nothing if this is not a parathread
247
    if aux_data.slot_freq.is_none() {
248
        return Err(BuyCoreError::NotAParathread);
249
    }
250

            
251
    let orchestrator_best_hash = orchestrator_client.info().best_hash;
252
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
253

            
254
    let buy_core_slot_drift = orchestrator_client
255
        .runtime_api()
256
        .get_buy_core_slot_drift(orchestrator_best_hash)?;
257

            
258
    // Convert drift in terms of container chain slots for parity between client side calculation and
259
    // orchestrator runtime calculation
260
    // We tried to made this calculation as generic as possible so that it can handle
261
    // arbitrary slot timings as well and won't assume anything.
262
    // Formula is: (Slot_drift_in_orchestrator_slot * orchestrator_slot_duration) / container_slot_duration
263
    let buy_core_container_slot_drift = buy_core_slot_drift
264
        .checked_mul(orchestrator_slot_duration.as_millis())
265
        .and_then(|intermediate_result| {
266
            intermediate_result.checked_div(container_slot_duration.as_millis())
267
        })
268
        .ok_or(BuyCoreError::SlotDriftConversionOverflow)?;
269

            
270
    let current_container_slot = inherent_providers.slot();
271

            
272
    let slot_claim = tanssi_claim_slot::<P, Block>(
273
        aux_data,
274
        &parent_header,
275
        current_container_slot,
276
        ClaimMode::ParathreadCoreBuying {
277
            drift_permitted: buy_core_container_slot_drift.into(),
278
        },
279
        keystore,
280
    )
281
    .ok_or(BuyCoreError::UnableToClaimSlot)?;
282

            
283
    let pubkey = slot_claim.author_pub;
284

            
285
    orchestrator_runtime_api.is_core_buying_allowed(
286
        orchestrator_best_hash,
287
        para_id,
288
        pubkey.clone(),
289
    )??;
290

            
291
    let nonce =
292
        orchestrator_runtime_api.get_buy_core_signature_nonce(orchestrator_best_hash, para_id)?;
293

            
294
    let collator_buy_core_proof =
295
        BuyCoreCollatorProof::new_with_keystore(nonce, para_id, pubkey, keystore)?
296
            .ok_or(BuyCoreError::UnableToFindKeyForSigning)?;
297

            
298
    let extrinsic = orchestrator_runtime_api.create_buy_core_unsigned_extrinsic(
299
        orchestrator_best_hash,
300
        para_id,
301
        collator_buy_core_proof,
302
    )?;
303

            
304
    orchestrator_tx_pool
305
        .submit_one(orchestrator_best_hash, TransactionSource::Local, *extrinsic)
306
        .await
307
        .map_err(BuyCoreError::TxSubmissionError)
308
}
309

            
310
/// Parameters for [`run`].
311
pub struct Params<
312
    GSD,
313
    BI,
314
    CIDP,
315
    Client,
316
    Backend,
317
    RClient,
318
    CHP,
319
    SO,
320
    Proposer,
321
    CS,
322
    GOH,
323
    TxPool,
324
    OClient,
325
> {
326
    pub get_current_slot_duration: GSD,
327
    pub create_inherent_data_providers: CIDP,
328
    pub get_orchestrator_aux_data: GOH,
329
    pub block_import: BI,
330
    pub para_client: Arc<Client>,
331
    pub para_backend: Arc<Backend>,
332
    pub relay_client: RClient,
333
    pub code_hash_provider: CHP,
334
    pub sync_oracle: SO,
335
    pub keystore: KeystorePtr,
336
    pub collator_key: CollatorPair,
337
    pub para_id: ParaId,
338
    pub overseer_handle: OverseerHandle,
339
    pub orchestrator_slot_duration: SlotDuration,
340
    pub relay_chain_slot_duration: Duration,
341
    pub proposer: Proposer,
342
    pub collator_service: CS,
343
    pub authoring_duration: Duration,
344
    pub force_authoring: bool,
345
    pub cancellation_token: CancellationToken,
346
    pub buy_core_params: BuyCoreParams<TxPool, OClient>,
347
    /// The maximum percentage of the maximum PoV size that the collator can use.
348
    /// It will be removed once <https://github.com/paritytech/polkadot-sdk/issues/6020> is fixed.
349
    pub max_pov_percentage: Option<u32>,
350
}
351

            
352
pub enum BuyCoreParams<TxPool, OClient> {
353
    Orchestrator {
354
        orchestrator_tx_pool: Arc<TxPool>,
355
        orchestrator_client: Arc<OClient>,
356
    },
357
    Solochain {
358
        // TODO: relay_tx_pool
359
    },
360
}
361

            
362
impl<TxPool, OClient> Clone for BuyCoreParams<TxPool, OClient> {
363
    fn clone(&self) -> Self {
364
        match self {
365
            Self::Orchestrator {
366
                orchestrator_tx_pool,
367
                orchestrator_client,
368
            } => Self::Orchestrator {
369
                orchestrator_tx_pool: orchestrator_tx_pool.clone(),
370
                orchestrator_client: orchestrator_client.clone(),
371
            },
372
            Self::Solochain {} => Self::Solochain {},
373
        }
374
    }
375
}
376

            
377
/// Run async-backing-friendly for Tanssi Aura.
378
pub fn run<
379
    GSD,
380
    Block,
381
    P,
382
    BI,
383
    CIDP,
384
    Client,
385
    Backend,
386
    RClient,
387
    CHP,
388
    SO,
389
    Proposer,
390
    CS,
391
    GOH,
392
    TxPool,
393
    OClient,
394
    OBlock,
395
>(
396
    mut params: Params<
397
        GSD,
398
        BI,
399
        CIDP,
400
        Client,
401
        Backend,
402
        RClient,
403
        CHP,
404
        SO,
405
        Proposer,
406
        CS,
407
        GOH,
408
        TxPool,
409
        OClient,
410
    >,
411
) -> (
412
    impl Future<Output = ()> + Send + 'static,
413
    oneshot::Receiver<()>,
414
)
415
where
416
    Block: BlockT,
417
    Client: ProvideRuntimeApi<Block>
418
        + BlockOf
419
        + AuxStore
420
        + HeaderBackend<Block>
421
        + BlockBackend<Block>
422
        + Send
423
        + Sync
424
        + 'static,
425
    Client::Api: UnincludedSegmentApi<Block>,
426
    Backend: sc_client_api::Backend<Block> + 'static,
427
    RClient: RelayChainInterface + Clone + 'static,
428
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
429
        + Send
430
        + 'static
431
        + Clone,
432
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
433
    BI: BlockImport<Block> + Send + Sync + 'static,
434
    SO: SyncOracle + Send + Sync + Clone + 'static,
435
    Proposer: ProposerInterface<Block> + Send + Sync + 'static,
436
    CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
437
    CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
438
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
439
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
440
    GOH: RetrieveAuthoritiesFromOrchestrator<
441
            Block,
442
            (PHash, PersistedValidationData),
443
            OrchestratorAuraWorkerAuxData<P>,
444
        >
445
        + 'static
446
        + Sync
447
        + Send,
448
    OBlock: BlockT,
449
    OClient: ProvideRuntimeApi<OBlock>
450
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
451
        + HeaderBackend<OBlock>
452
        + BlockBackend<OBlock>
453
        + BlockIdTo<OBlock>
454
        + 'static,
455
    OClient::Api: TaggedTransactionQueue<OBlock>
456
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
457
    TxPool: TransactionPool<Block = OBlock> + 'static,
458
    GSD: Fn(<Block as BlockT>::Hash) -> SlotDuration + Send + 'static,
459
{
460
    // This is an arbitrary value which is likely guaranteed to exceed any reasonable
461
    // limit, as it would correspond to 10 non-included blocks.
462
    //
463
    // Since we only search for parent blocks which have already been imported,
464
    // we can guarantee that all imported blocks respect the unincluded segment
465
    // rules specified by the parachain's runtime and thus will never be too deep.
466
    const PARENT_SEARCH_DEPTH: usize = 10;
467

            
468
    let (exit_notification_sender, exit_notification_receiver) = oneshot::channel();
469

            
470
    let aura_fut = async move {
471
        cumulus_client_collator::initialize_collator_subsystems(
472
            &mut params.overseer_handle,
473
            params.collator_key,
474
            params.para_id,
475
            true,
476
        )
477
        .await;
478

            
479
        let mut import_notifications = match params.relay_client.import_notification_stream().await
480
        {
481
            Ok(s) => s,
482
            Err(err) => {
483
                tracing::error!(
484
                    target: crate::LOG_TARGET,
485
                    ?err,
486
                    "Failed to initialize consensus: no relay chain import notification stream"
487
                );
488

            
489
                return;
490
            }
491
        };
492

            
493
        let mut collator = {
494
            let params = collator_util::Params {
495
                create_inherent_data_providers: params.create_inherent_data_providers.clone(),
496
                block_import: params.block_import,
497
                relay_client: params.relay_client.clone(),
498
                keystore: params.keystore.clone(),
499
                para_id: params.para_id,
500
                proposer: params.proposer,
501
                collator_service: params.collator_service,
502
            };
503

            
504
            collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
505
        };
506

            
507
        loop {
508
            select! {
509
                maybe_relay_parent_header = import_notifications.next() => {
510
                    if maybe_relay_parent_header.is_none() {
511
                        break;
512
                    }
513

            
514
                    let relay_parent_header = maybe_relay_parent_header.expect("relay_parent_header must exists as we checked for None variant above; qed");
515
                    let relay_parent = relay_parent_header.hash();
516

            
517
                    let max_pov_size = match params
518
                        .relay_client
519
                        .persisted_validation_data(
520
                            relay_parent,
521
                            params.para_id,
522
                            OccupiedCoreAssumption::Included,
523
                        )
524
                        .await
525
                    {
526
                        Ok(None) => continue,
527
                        Ok(Some(pvd)) => pvd.max_pov_size,
528
                        Err(err) => {
529
                            tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
530
                            continue;
531
                        }
532
                    };
533

            
534
                    let parent_search_params = ParentSearchParams {
535
                        relay_parent,
536
                        para_id: params.para_id,
537
                        ancestry_lookback: max_ancestry_lookback(relay_parent, &params.relay_client).await,
538
                        max_depth: PARENT_SEARCH_DEPTH,
539
                        ignore_alternative_branches: true,
540
                    };
541

            
542
                    let potential_parents =
543
                        cumulus_client_consensus_common::find_potential_parents::<Block>(
544
                            parent_search_params,
545
                            &*params.para_backend,
546
                            &params.relay_client,
547
                        )
548
                        .await;
549

            
550
                    let mut potential_parents = match potential_parents {
551
                        Err(e) => {
552
                            tracing::error!(
553
                                target: crate::LOG_TARGET,
554
                                ?relay_parent,
555
                                err = ?e,
556
                                "Could not fetch potential parents to build upon"
557
                            );
558

            
559
                            continue;
560
                        }
561
                        Ok(x) => x,
562
                    };
563

            
564
                    let included_block = match potential_parents.iter().find(|x| x.depth == 0) {
565
                        None => continue, // also serves as an `is_empty` check.
566
                        Some(b) => b.hash,
567
                    };
568

            
569
                    let para_client = &*params.para_client;
570
                    let keystore = &params.keystore;
571
                    let can_build_upon = |slot_now, block_hash, aux_data| {
572
                        can_build_upon::<_, _, P>(
573
                            slot_now,
574
                            aux_data,
575
                            block_hash,
576
                            included_block,
577
                            params.force_authoring,
578
                            para_client,
579
                            keystore,
580
                        )
581
                    };
582

            
583
                    // Sort by depth, ascending, to choose the longest chain.
584
                    //
585
                    // If the longest chain has space, build upon that. Otherwise, don't
586
                    // build at all.
587
                    potential_parents.sort_by_key(|a| a.depth);
588
                    let initial_parent = match potential_parents.pop() {
589
                        None => continue,
590
                        Some(p) => p,
591
                    };
592

            
593
                    // Build in a loop until not allowed. Note that the authorities can change
594
                    // at any block, so we need to re-claim our slot every time.
595
                    let mut parent_hash = initial_parent.hash;
596
                    let mut parent_header = initial_parent.header;
597

            
598
                    let core_indices = cores_scheduled_for_para(
599
                        relay_parent,
600
                        params.para_id,
601
                        &mut params.overseer_handle,
602
                        &params.relay_client,
603
                    )
604
                    .await;
605

            
606
                    let overseer_handle = &mut params.overseer_handle;
607

            
608
                    // This needs to change to support elastic scaling, but for continuously
609
                    // scheduled chains this ensures that the backlog will grow steadily.
610
                    for n_built in 0..2 {
611
                        let validation_data = PersistedValidationData {
612
                            parent_head: parent_header.encode().into(),
613
                            relay_parent_number: *relay_parent_header.number(),
614
                            relay_parent_storage_root: *relay_parent_header.state_root(),
615
                            max_pov_size,
616
                        };
617

            
618
                        // Retrieve authorities that are able to produce the block
619
                        let aux_data = match params
620
                            .get_orchestrator_aux_data
621
                            .retrieve_authorities_from_orchestrator(
622
                                parent_hash,
623
                                (relay_parent_header.hash(), validation_data.clone()),
624
                            )
625
                            .await
626
                        {
627
                            Err(e) => {
628
                                tracing::error!(target: crate::LOG_TARGET, ?e);
629
                                break;
630
                            }
631
                            Ok(h) => h,
632
                        };
633

            
634
                        let inherent_providers = match params
635
                            .create_inherent_data_providers
636
                            .create_inherent_data_providers(
637
                                parent_hash,
638
                                (relay_parent_header.hash(), validation_data.clone()),
639
                            )
640
                            .await
641
                        {
642
                            Err(e) => {
643
                                tracing::error!(target: crate::LOG_TARGET, ?e);
644
                                break;
645
                            }
646
                            Ok(h) => h,
647
                        };
648

            
649
                        // TODO: Currently we use just the first core here, but for elastic scaling
650
                        // we iterate and build on all of the cores returned.
651
                        // More info: https://github.com/paritytech/polkadot-sdk/issues/1829
652
                        let (is_parachain, core_index) = match (&aux_data.slot_freq, core_indices.first()) {
653
                            (None, None) => {
654
                                tracing::warn!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parachain and we do not have core allocated, nothing to do");
655
                                break;
656
                            }, // We are parachain and we do not have core allocated, nothing to do,
657
                            (None, Some(core_index)) => {
658
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, ?core_index, "We are parachain and we core allocated, let's collate the block");
659
                                (true, core_index)
660
                            }, // We are parachain and we have core allocated, let's continue
661
                            (Some(_slot_frequency), None) => { // We are parathread and core is not allocated. Let's try to buy core
662
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parathread and we do not have core allocated, let's try to buy the core");
663
                                let slot = inherent_providers.slot();
664
                                let container_chain_slot_duration = (params.get_current_slot_duration)(parent_header.hash());
665

            
666
                                let buy_core_result = match &params.buy_core_params {
667
                                    BuyCoreParams::Orchestrator {
668
                                        orchestrator_client,
669
                                        orchestrator_tx_pool,
670
                                    } => {
671
                                        try_to_buy_core::<_, _, <<OBlock as BlockT>::Header as HeaderT>::Number, _, CIDP, _, _>(params.para_id, aux_data, inherent_providers, &params.keystore, orchestrator_client.clone(), orchestrator_tx_pool.clone(), parent_header, params.orchestrator_slot_duration, container_chain_slot_duration).await
672
                                    }
673
                                    BuyCoreParams::Solochain {
674

            
675
                                    } => {
676
                                        // TODO: implement parathread support for solochain
677
                                        log::warn!("Unimplemented: cannot buy core for parathread in solochain");
678
                                        break;
679
                                    }
680
                                };
681
                                match buy_core_result {
682
                                    Ok(block_hash) => {
683
                                        tracing::trace!(target: crate::LOG_TARGET, ?block_hash, "Sent unsigned extrinsic to buy the core");
684
                                    },
685
                                    Err(buy_core_error) => {
686
                                        buy_core_error.log_error(slot, params.para_id, relay_parent);
687
                                    }
688
                                };
689
                                break; // No point in continuing as we need to wait for few relay blocks in order for our core to be available.
690
                            },
691
                            (Some(_slot_frequency), Some(core_index)) => { // We are parathread and we do have core, let's continue
692
                                tracing::trace!(target: crate::LOG_TARGET, ?core_index, "We are parathread and we do have core allocated, let's collate the block");
693
                                (false, core_index)
694
                            }
695
                        };
696

            
697
                        let mut slot_claim = match can_build_upon(
698
                            inherent_providers.slot(),
699
                            parent_header.clone(),
700
                            aux_data,
701
                        )
702
                        .await
703
                        {
704
                            Ok(None) => break,
705
                            Err(e) => {
706
                                tracing::error!(target: crate::LOG_TARGET, ?e);
707
                                break;
708
                            }
709
                            Ok(Some(c)) => c,
710
                        };
711

            
712
                        tracing::debug!(
713
                            target: crate::LOG_TARGET,
714
                            ?relay_parent,
715
                            unincluded_segment_len = initial_parent.depth + n_built,
716
                            "Slot claimed. Building"
717
                        );
718

            
719
                        // Build and announce collations recursively until
720
                        // `can_build_upon` fails or building a collation fails.
721
                        let (parachain_inherent_data, other_inherent_data) = match collator
722
                            .create_inherent_data(relay_parent, &validation_data, parent_hash, None, None)
723
                            .await
724
                        {
725
                            Err(err) => {
726
                                tracing::error!(target: crate::LOG_TARGET, ?err);
727
                                break;
728
                            }
729
                            Ok(x) => x,
730
                        };
731

            
732
                        let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash)
733
                        {
734
                            None => {
735
                                tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
736
                                break;
737
                            }
738
                            Some(v) => v,
739
                        };
740

            
741
                        let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
742
                            validation_data.max_pov_size * max_pov_percentage / 100
743
                        } else {
744
                            // Set the block limit to 85% of the maximum PoV size.
745
                            //
746
                            // Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
747
                            // fixed, the reservation should be removed.
748
                            validation_data.max_pov_size * 85 / 100
749
                        } as usize;
750

            
751
                        match collator
752
                            .collate(
753
                                &parent_header,
754
                                &mut slot_claim,
755
                                None,
756
                                (parachain_inherent_data, other_inherent_data),
757
                                params.authoring_duration,
758
                                allowed_pov_size,
759
                            )
760
                            .await
761
                        {
762
                            Ok(Some((collation, block_data))) => {
763
                                let Some(new_block_header) =
764
                                    block_data.blocks().first().map(|b| b.header().clone())
765
                                else {
766
                                    tracing::error!(target: crate::LOG_TARGET,  "Produced PoV doesn't contain any blocks");
767
                                    break
768
                                };
769
                                let new_block_hash = new_block_header.hash();
770

            
771
                                // Here we are assuming that the import logic protects against equivocations
772
                                // and provides sybil-resistance, as it should.
773
                                collator
774
                                    .collator_service()
775
                                    .announce_block(new_block_hash, None);
776

            
777
                                // Send a submit-collation message to the collation generation subsystem,
778
                                // which then distributes this to validators.
779
                                //
780
                                // Here we are assuming that the leaf is imported, as we've gotten an
781
                                // import notification.
782
                                overseer_handle
783
                                    .send_msg(
784
                                        CollationGenerationMessage::SubmitCollation(
785
                                            SubmitCollationParams {
786
                                                relay_parent,
787
                                                collation,
788
                                                parent_head: parent_header.encode().into(),
789
                                                validation_code_hash,
790
                                                result_sender: None,
791
                                                core_index: *core_index
792
                                            },
793
                                        ),
794
                                        "SubmitCollation",
795
                                    )
796
                                    .await;
797

            
798
                                parent_hash = new_block_hash;
799
                                parent_header = new_block_header;
800
                            }
801
                            Ok(None) => {
802
                                tracing::debug!(target: crate::LOG_TARGET, "Lookahead collator: No block proposal");
803
                            }
804
                            Err(err) => {
805
                                tracing::error!(target: crate::LOG_TARGET, ?err);
806
                                break;
807
                            }
808
                        }
809

            
810
                        // If it is parathread, no point in async backing as we would have to do
811
                        // buy core first
812
                        if !is_parachain {
813
                            tracing::trace!(target: crate::LOG_TARGET, "Not a parachain so terminated at {:?}", n_built);
814
                            break;
815
                        }
816
                    }
817
                },
818
                _ = params.cancellation_token.cancelled() => {
819
                    log::info!("Stopping lookahead collator");
820
                    break;
821
                }
822
            }
823
        }
824

            
825
        // Notifying that we have exited
826
        let _ = exit_notification_sender.send(());
827
    };
828

            
829
    (aura_fut, exit_notification_receiver)
830
}
831

            
832
// Checks if we own the slot at the given block and whether there
833
// is space in the unincluded segment.
834
async fn can_build_upon<Block: BlockT, Client, P>(
835
    slot: Slot,
836
    aux_data: OrchestratorAuraWorkerAuxData<P>,
837
    parent_header: Block::Header,
838
    included_block: <Block as BlockT>::Hash,
839
    force_authoring: bool,
840
    client: &Client,
841
    keystore: &KeystorePtr,
842
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
843
where
844
    Client: ProvideRuntimeApi<Block>,
845
    Client::Api: UnincludedSegmentApi<Block>,
846
    P: Pair + Send + Sync + 'static,
847
    P::Public: Codec + std::fmt::Debug,
848
    P::Signature: Codec,
849
{
850
    let runtime_api = client.runtime_api();
851

            
852
    let claim_mode = if force_authoring {
853
        ClaimMode::ForceAuthoring
854
    } else {
855
        ClaimMode::NormalAuthoring
856
    };
857

            
858
    let slot_claim =
859
        tanssi_claim_slot::<P, Block>(aux_data, &parent_header, slot, claim_mode, keystore);
860

            
861
    // Here we lean on the property that building on an empty unincluded segment must always
862
    // be legal. Skipping the runtime API query here allows us to seamlessly run this
863
    // collator against chains which have not yet upgraded their runtime.
864
    if parent_header.hash() != included_block
865
        && !runtime_api.can_build_upon(parent_header.hash(), included_block, slot)?
866
    {
867
        return Ok(None);
868
    }
869

            
870
    Ok(slot_claim)
871
}
872

            
873
/// Reads allowed ancestry length parameter from the relay chain storage at the given relay parent.
874
///
875
/// Falls back to 0 in case of an error.
876
async fn max_ancestry_lookback(
877
    relay_parent: PHash,
878
    relay_client: &impl RelayChainInterface,
879
) -> usize {
880
    match load_abridged_host_configuration(relay_parent, relay_client).await {
881
        Ok(Some(config)) => config.async_backing_params.allowed_ancestry_len as usize,
882
        Ok(None) => {
883
            tracing::error!(
884
                target: crate::LOG_TARGET,
885
                "Active config is missing in relay chain storage",
886
            );
887
            0
888
        }
889
        Err(err) => {
890
            tracing::error!(
891
                target: crate::LOG_TARGET,
892
                ?err,
893
                ?relay_parent,
894
                "Failed to read active config from relay chain client",
895
            );
896
            0
897
        }
898
    }
899
}
900

            
901
// Checks if there exists a scheduled core for the para at the provided relay parent.
902
//
903
// Falls back to `false` in case of an error.
904
async fn cores_scheduled_for_para(
905
    relay_parent: PHash,
906
    para_id: ParaId,
907
    overseer_handle: &mut OverseerHandle,
908
    relay_client: &impl RelayChainInterface,
909
) -> Vec<CoreIndex> {
910
    let (tx, rx) = oneshot::channel();
911
    let request = RuntimeApiRequest::AvailabilityCores(tx);
912
    overseer_handle
913
        .send_msg(
914
            RuntimeApiMessage::Request(relay_parent, request),
915
            "LookaheadCollator",
916
        )
917
        .await;
918

            
919
    let max_candidate_depth = async_backing_params(relay_parent, relay_client)
920
        .await
921
        .map(|c| c.max_candidate_depth)
922
        .unwrap_or(0);
923

            
924
    let cores = match rx.await {
925
        Ok(Ok(cores)) => cores,
926
        Ok(Err(error)) => {
927
            tracing::error!(
928
                target: crate::LOG_TARGET,
929
                ?error,
930
                ?relay_parent,
931
                "Failed to query availability cores runtime API",
932
            );
933
            return Vec::new();
934
        }
935
        Err(oneshot::Canceled) => {
936
            tracing::error!(
937
                target: crate::LOG_TARGET,
938
                ?relay_parent,
939
                "Sender for availability cores runtime request dropped",
940
            );
941
            return Vec::new();
942
        }
943
    };
944

            
945
    cores
946
        .iter()
947
        .enumerate()
948
        .filter_map(|(index, core)| {
949
            let core_para_id = match core {
950
                CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
951
                CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
952
                    .next_up_on_available
953
                    .as_ref()
954
                    .map(|scheduled_core| scheduled_core.para_id),
955
                CoreState::Free | CoreState::Occupied(_) => None,
956
            };
957

            
958
            if core_para_id == Some(para_id) {
959
                Some(CoreIndex(index as u32))
960
            } else {
961
                None
962
            }
963
        })
964
        .collect()
965
}
966

            
967
/// Reads async backing parameters from the relay chain storage at the given relay parent.
968
async fn async_backing_params(
969
    relay_parent: PHash,
970
    relay_client: &impl RelayChainInterface,
971
) -> Option<AsyncBackingParams> {
972
    match load_abridged_host_configuration(relay_parent, relay_client).await {
973
        Ok(Some(config)) => Some(config.async_backing_params),
974
        Ok(None) => {
975
            tracing::error!(
976
                target: crate::LOG_TARGET,
977
                "Active config is missing in relay chain storage",
978
            );
979
            None
980
        }
981
        Err(err) => {
982
            tracing::error!(
983
                target: crate::LOG_TARGET,
984
                ?err,
985
                ?relay_parent,
986
                "Failed to read active config from relay chain client",
987
            );
988
            None
989
        }
990
    }
991
}