1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! A collator for Tanssi Aura that looks ahead of the most recently included parachain block
18
//! when determining what to build upon.
19
//!
20
//! This collator also builds additional blocks when the maximum backlog is not saturated.
21
//! The size of the backlog is determined by invoking a runtime API. If that runtime API
22
//! is not supported, this assumes a maximum backlog size of 1.
23
//!
24
//! This takes more advantage of asynchronous backing, though not complete advantage.
25
//! When the backlog is not saturated, this approach lets the backlog temporarily 'catch up'
26
//! with periods of higher throughput. When the backlog is saturated, we typically
27
//! fall back to the limited cadence of a single parachain block per relay-chain block.
28
//!
29
//! Despite this, the fact that there is a backlog at all allows us to spend more time
30
//! building the block, as there is some buffer before it can get posted to the relay-chain.
31
//! The main limitation is block propagation time - i.e. the new blocks created by an author
32
//! must be propagated to the next author before their turn.
33

            
34
use {
35
    crate::{
36
        collators::{self as collator_util, tanssi_claim_slot, ClaimMode, SlotClaim},
37
        consensus_orchestrator::RetrieveAuthoritiesFromOrchestrator,
38
        OrchestratorAuraWorkerAuxData,
39
    },
40
    async_backing_primitives::UnincludedSegmentApi,
41
    cumulus_client_collator::service::ServiceInterface as CollatorServiceInterface,
42
    cumulus_client_consensus_common::{
43
        self as consensus_common, load_abridged_host_configuration, ParentSearchParams,
44
    },
45
    cumulus_client_consensus_proposer::ProposerInterface,
46
    cumulus_primitives_core::{
47
        relay_chain::{AsyncBackingParams, CoreIndex, Hash as PHash},
48
        PersistedValidationData,
49
    },
50
    cumulus_relay_chain_interface::{CoreState, RelayChainInterface},
51
    futures::{channel::oneshot, prelude::*},
52
    nimbus_primitives::NimbusId,
53
    pallet_xcm_core_buyer_runtime_api::{BuyingError, XCMCoreBuyerApi},
54
    parity_scale_codec::{Codec, Encode},
55
    polkadot_node_primitives::SubmitCollationParams,
56
    polkadot_node_subsystem::messages::{
57
        CollationGenerationMessage, RuntimeApiMessage, RuntimeApiRequest,
58
    },
59
    polkadot_overseer::Handle as OverseerHandle,
60
    polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption},
61
    sc_client_api::{backend::AuxStore, BlockBackend, BlockOf},
62
    sc_consensus::BlockImport,
63
    sc_consensus_slots::InherentDataProviderExt,
64
    sc_transaction_pool_api::TransactionPool,
65
    sp_api::{ApiError, ProvideRuntimeApi},
66
    sp_blockchain::{HeaderBackend, HeaderMetadata},
67
    sp_consensus::SyncOracle,
68
    sp_consensus_aura::{Slot, SlotDuration},
69
    sp_core::crypto::Pair,
70
    sp_inherents::CreateInherentDataProviders,
71
    sp_keystore::KeystorePtr,
72
    sp_runtime::{
73
        traits::{Block as BlockT, BlockIdTo, Header as HeaderT, Member},
74
        transaction_validity::TransactionSource,
75
    },
76
    sp_transaction_pool::runtime_api::TaggedTransactionQueue,
77
    std::{convert::TryFrom, error::Error, sync::Arc, time::Duration},
78
    tokio::select,
79
    tokio_util::sync::CancellationToken,
80
    tp_xcm_core_buyer::{BuyCollatorProofCreationError, BuyCoreCollatorProof},
81
};
82

            
83
#[derive(Debug)]
84
pub enum BuyCoreError<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> {
85
    NotAParathread,
86
    UnableToClaimSlot,
87
    UnableToFindKeyForSigning,
88
    SlotDriftConversionOverflow,
89
    ApiError(ApiError),
90
    BuyingValidationError(BuyingError<BlockNumber>),
91
    UnableToCreateProof(BuyCollatorProofCreationError),
92
    TxSubmissionError(PoolError),
93
}
94

            
95
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug>
96
    BuyCoreError<BlockNumber, PoolError>
97
{
98
    fn log_error<Blockhash: std::fmt::Debug>(
99
        &self,
100
        slot: Slot,
101
        para_id: ParaId,
102
        relay_parent: Blockhash,
103
    ) {
104
        match self {
105
            BuyCoreError::NotAParathread => {
106
                tracing::trace!(
107
                    target: crate::LOG_TARGET,
108
                    ?relay_parent,
109
                    ?para_id,
110
                    ?slot,
111
                    "Para is not a parathread, skipping an attempt to buy core",
112
                );
113
            }
114
            BuyCoreError::UnableToClaimSlot => {
115
                tracing::trace!(
116
                    target: crate::LOG_TARGET,
117
                    ?relay_parent,
118
                    ?para_id,
119
                    ?slot,
120
                    "Unable to claim slot for parathread, skipping attempt to buy the core.",
121
                );
122
            }
123
            BuyCoreError::UnableToFindKeyForSigning => {
124
                tracing::error!(
125
                    target: crate::LOG_TARGET,
126
                    ?relay_parent,
127
                    ?para_id,
128
                    ?slot,
129
                    "Unable to generate buy core proof as unable to find corresponding key",
130
                );
131
            }
132
            BuyCoreError::SlotDriftConversionOverflow => {
133
                tracing::error!(
134
                    target: crate::LOG_TARGET,
135
                    ?relay_parent,
136
                    ?para_id,
137
                    ?slot,
138
                    "Unable to calculate container chain slot drift from orchestrator chain's slot drift",
139
                );
140
            }
141
            BuyCoreError::ApiError(api_error) => {
142
                tracing::error!(
143
                    target: crate::LOG_TARGET,
144
                    ?relay_parent,
145
                    ?para_id,
146
                    ?slot,
147
                    ?api_error,
148
                    "Unable to call orchestrator runtime api",
149
                );
150
            }
151
            BuyCoreError::BuyingValidationError(buying_error) => {
152
                tracing::error!(
153
                    target: crate::LOG_TARGET,
154
                    ?relay_parent,
155
                    ?para_id,
156
                    ?buying_error,
157
                    ?slot,
158
                    "Buying core is not allowed right now",
159
                );
160
            }
161
            BuyCoreError::UnableToCreateProof(proof_error) => {
162
                tracing::error!(
163
                    target: crate::LOG_TARGET,
164
                    ?relay_parent,
165
                    ?para_id,
166
                    ?slot,
167
                    ?proof_error,
168
                    "Unable to generate buy core proof due to an error",
169
                );
170
            }
171
            BuyCoreError::TxSubmissionError(pool_error) => {
172
                tracing::error!(
173
                    target: crate::LOG_TARGET,
174
                    ?relay_parent,
175
                    ?para_id,
176
                    ?slot,
177
                    ?pool_error,
178
                    "Unable to send buy core unsigned extrinsic through orchestrator tx pool",
179
                );
180
            }
181
        }
182
    }
183
}
184

            
185
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyingError<BlockNumber>>
186
    for BuyCoreError<BlockNumber, PoolError>
187
{
188
    fn from(buying_error: BuyingError<BlockNumber>) -> Self {
189
        BuyCoreError::BuyingValidationError(buying_error)
190
    }
191
}
192

            
193
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<ApiError>
194
    for BuyCoreError<BlockNumber, PoolError>
195
{
196
    fn from(api_error: ApiError) -> Self {
197
        BuyCoreError::ApiError(api_error)
198
    }
199
}
200

            
201
impl<BlockNumber: std::fmt::Debug, PoolError: std::fmt::Debug> From<BuyCollatorProofCreationError>
202
    for BuyCoreError<BlockNumber, PoolError>
203
{
204
    fn from(proof_creation_error: BuyCollatorProofCreationError) -> Self {
205
        BuyCoreError::UnableToCreateProof(proof_creation_error)
206
    }
207
}
208

            
209
pub async fn try_to_buy_core<Block, OBlock, OBlockNumber, P, CIDP, TxPool, OClient>(
210
    para_id: ParaId,
211
    aux_data: OrchestratorAuraWorkerAuxData<P>,
212
    inherent_providers: CIDP::InherentDataProviders,
213
    keystore: &KeystorePtr,
214
    orchestrator_client: Arc<OClient>,
215
    orchestrator_tx_pool: Arc<TxPool>,
216
    parent_header: <Block as BlockT>::Header,
217
    orchestrator_slot_duration: SlotDuration,
218
    container_slot_duration: SlotDuration,
219
) -> Result<
220
    <TxPool as TransactionPool>::Hash,
221
    BuyCoreError<
222
        <<OBlock as BlockT>::Header as HeaderT>::Number,
223
        <TxPool as TransactionPool>::Error,
224
    >,
225
>
226
where
227
    Block: BlockT,
228
    OBlock: BlockT,
229
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
230
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
231
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
232
        + Send
233
        + 'static
234
        + Clone,
235
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
236
    OClient: ProvideRuntimeApi<OBlock>
237
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
238
        + HeaderBackend<OBlock>
239
        + BlockBackend<OBlock>
240
        + BlockIdTo<OBlock>
241
        + 'static,
242
    OClient::Api: TaggedTransactionQueue<OBlock>
243
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
244
    TxPool: TransactionPool<Block = OBlock>,
245
{
246
    // We do nothing if this is not a parathread
247
    if aux_data.slot_freq.is_none() {
248
        return Err(BuyCoreError::NotAParathread);
249
    }
250

            
251
    let orchestrator_best_hash = orchestrator_client.info().best_hash;
252
    let orchestrator_runtime_api = orchestrator_client.runtime_api();
253

            
254
    let buy_core_slot_drift = orchestrator_client
255
        .runtime_api()
256
        .get_buy_core_slot_drift(orchestrator_best_hash)?;
257

            
258
    // Convert drift in terms of container chain slots for parity between client side calculation and
259
    // orchestrator runtime calculation
260
    // We tried to made this calculation as generic as possible so that it can handle
261
    // arbitrary slot timings as well and won't assume anything.
262
    // Formula is: (Slot_drift_in_orchestrator_slot * orchestrator_slot_duration) / container_slot_duration
263
    let buy_core_container_slot_drift = buy_core_slot_drift
264
        .checked_mul(orchestrator_slot_duration.as_millis())
265
        .and_then(|intermediate_result| {
266
            intermediate_result.checked_div(container_slot_duration.as_millis())
267
        })
268
        .ok_or(BuyCoreError::SlotDriftConversionOverflow)?;
269

            
270
    let current_container_slot = inherent_providers.slot();
271

            
272
    let slot_claim = tanssi_claim_slot::<P, Block>(
273
        aux_data,
274
        &parent_header,
275
        current_container_slot,
276
        ClaimMode::ParathreadCoreBuying {
277
            drift_permitted: buy_core_container_slot_drift.into(),
278
        },
279
        keystore,
280
    )
281
    .ok_or(BuyCoreError::UnableToClaimSlot)?;
282

            
283
    let pubkey = slot_claim.author_pub;
284

            
285
    orchestrator_runtime_api.is_core_buying_allowed(
286
        orchestrator_best_hash,
287
        para_id,
288
        pubkey.clone(),
289
    )??;
290

            
291
    let nonce =
292
        orchestrator_runtime_api.get_buy_core_signature_nonce(orchestrator_best_hash, para_id)?;
293

            
294
    let collator_buy_core_proof =
295
        BuyCoreCollatorProof::new_with_keystore(nonce, para_id, pubkey, keystore)?
296
            .ok_or(BuyCoreError::UnableToFindKeyForSigning)?;
297

            
298
    let extrinsic = orchestrator_runtime_api.create_buy_core_unsigned_extrinsic(
299
        orchestrator_best_hash,
300
        para_id,
301
        collator_buy_core_proof,
302
    )?;
303

            
304
    orchestrator_tx_pool
305
        .submit_one(orchestrator_best_hash, TransactionSource::Local, *extrinsic)
306
        .await
307
        .map_err(BuyCoreError::TxSubmissionError)
308
}
309

            
310
/// Parameters for [`run`].
311
pub struct Params<
312
    GSD,
313
    BI,
314
    CIDP,
315
    Client,
316
    Backend,
317
    RClient,
318
    CHP,
319
    SO,
320
    Proposer,
321
    CS,
322
    GOH,
323
    TxPool,
324
    OClient,
325
> {
326
    pub get_current_slot_duration: GSD,
327
    pub create_inherent_data_providers: CIDP,
328
    pub get_orchestrator_aux_data: GOH,
329
    pub block_import: BI,
330
    pub para_client: Arc<Client>,
331
    pub para_backend: Arc<Backend>,
332
    pub relay_client: RClient,
333
    pub code_hash_provider: CHP,
334
    pub sync_oracle: SO,
335
    pub keystore: KeystorePtr,
336
    pub collator_key: CollatorPair,
337
    pub para_id: ParaId,
338
    pub overseer_handle: OverseerHandle,
339
    pub orchestrator_slot_duration: SlotDuration,
340
    pub relay_chain_slot_duration: Duration,
341
    pub proposer: Proposer,
342
    pub collator_service: CS,
343
    pub authoring_duration: Duration,
344
    pub force_authoring: bool,
345
    pub cancellation_token: CancellationToken,
346
    pub buy_core_params: BuyCoreParams<TxPool, OClient>,
347
    /// The maximum percentage of the maximum PoV size that the collator can use.
348
    /// It will be removed once <https://github.com/paritytech/polkadot-sdk/issues/6020> is fixed.
349
    pub max_pov_percentage: Option<u32>,
350
}
351

            
352
pub enum BuyCoreParams<TxPool, OClient> {
353
    Orchestrator {
354
        orchestrator_tx_pool: Arc<TxPool>,
355
        orchestrator_client: Arc<OClient>,
356
    },
357
    Solochain {
358
        // TODO: relay_tx_pool
359
    },
360
}
361

            
362
impl<TxPool, OClient> Clone for BuyCoreParams<TxPool, OClient> {
363
    fn clone(&self) -> Self {
364
        match self {
365
            Self::Orchestrator {
366
                orchestrator_tx_pool,
367
                orchestrator_client,
368
            } => Self::Orchestrator {
369
                orchestrator_tx_pool: orchestrator_tx_pool.clone(),
370
                orchestrator_client: orchestrator_client.clone(),
371
            },
372
            Self::Solochain {} => Self::Solochain {},
373
        }
374
    }
375
}
376

            
377
/// Run async-backing-friendly for Tanssi Aura.
378
pub fn run<
379
    GSD,
380
    Block,
381
    P,
382
    BI,
383
    CIDP,
384
    Client,
385
    Backend,
386
    RClient,
387
    CHP,
388
    SO,
389
    Proposer,
390
    CS,
391
    GOH,
392
    TxPool,
393
    OClient,
394
    OBlock,
395
>(
396
    mut params: Params<
397
        GSD,
398
        BI,
399
        CIDP,
400
        Client,
401
        Backend,
402
        RClient,
403
        CHP,
404
        SO,
405
        Proposer,
406
        CS,
407
        GOH,
408
        TxPool,
409
        OClient,
410
    >,
411
) -> (
412
    impl Future<Output = ()> + Send + 'static,
413
    oneshot::Receiver<()>,
414
)
415
where
416
    Block: BlockT,
417
    Client: ProvideRuntimeApi<Block>
418
        + BlockOf
419
        + AuxStore
420
        + HeaderBackend<Block>
421
        + BlockBackend<Block>
422
        + Send
423
        + Sync
424
        + 'static,
425
    Client::Api: UnincludedSegmentApi<Block>,
426
    Backend: sc_client_api::Backend<Block> + 'static,
427
    RClient: RelayChainInterface + Clone + 'static,
428
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
429
        + Send
430
        + 'static
431
        + Clone,
432
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
433
    BI: BlockImport<Block> + Send + Sync + 'static,
434
    SO: SyncOracle + Send + Sync + Clone + 'static,
435
    Proposer: ProposerInterface<Block> + Send + Sync + 'static,
436
    CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
437
    CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
438
    P: Pair<Public = NimbusId> + Sync + Send + 'static,
439
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
440
    GOH: RetrieveAuthoritiesFromOrchestrator<
441
            Block,
442
            (PHash, PersistedValidationData),
443
            OrchestratorAuraWorkerAuxData<P>,
444
        >
445
        + 'static
446
        + Sync
447
        + Send,
448
    OBlock: BlockT,
449
    OClient: ProvideRuntimeApi<OBlock>
450
        + HeaderMetadata<OBlock, Error = sp_blockchain::Error>
451
        + HeaderBackend<OBlock>
452
        + BlockBackend<OBlock>
453
        + BlockIdTo<OBlock>
454
        + 'static,
455
    OClient::Api: TaggedTransactionQueue<OBlock>
456
        + XCMCoreBuyerApi<OBlock, <<OBlock as BlockT>::Header as HeaderT>::Number, ParaId, NimbusId>,
457
    TxPool: TransactionPool<Block = OBlock> + 'static,
458
    GSD: Fn(<Block as BlockT>::Hash) -> SlotDuration + Send + 'static,
459
{
460
    // This is an arbitrary value which is likely guaranteed to exceed any reasonable
461
    // limit, as it would correspond to 10 non-included blocks.
462
    //
463
    // Since we only search for parent blocks which have already been imported,
464
    // we can guarantee that all imported blocks respect the unincluded segment
465
    // rules specified by the parachain's runtime and thus will never be too deep.
466
    const PARENT_SEARCH_DEPTH: usize = 10;
467

            
468
    let (exit_notification_sender, exit_notification_receiver) = oneshot::channel();
469

            
470
    let aura_fut = async move {
471
        cumulus_client_collator::initialize_collator_subsystems(
472
            &mut params.overseer_handle,
473
            params.collator_key,
474
            params.para_id,
475
            true,
476
        )
477
        .await;
478

            
479
        let mut import_notifications = match params.relay_client.import_notification_stream().await
480
        {
481
            Ok(s) => s,
482
            Err(err) => {
483
                tracing::error!(
484
                    target: crate::LOG_TARGET,
485
                    ?err,
486
                    "Failed to initialize consensus: no relay chain import notification stream"
487
                );
488

            
489
                return;
490
            }
491
        };
492

            
493
        let mut collator = {
494
            let params = collator_util::Params {
495
                create_inherent_data_providers: params.create_inherent_data_providers.clone(),
496
                block_import: params.block_import,
497
                relay_client: params.relay_client.clone(),
498
                keystore: params.keystore.clone(),
499
                para_id: params.para_id,
500
                proposer: params.proposer,
501
                collator_service: params.collator_service,
502
            };
503

            
504
            collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
505
        };
506

            
507
        loop {
508
            select! {
509
                maybe_relay_parent_header = import_notifications.next() => {
510
                    if maybe_relay_parent_header.is_none() {
511
                        break;
512
                    }
513

            
514
                    let relay_parent_header = maybe_relay_parent_header.expect("relay_parent_header must exists as we checked for None variant above; qed");
515
                    let relay_parent = relay_parent_header.hash();
516

            
517
                    let max_pov_size = match params
518
                        .relay_client
519
                        .persisted_validation_data(
520
                            relay_parent,
521
                            params.para_id,
522
                            OccupiedCoreAssumption::Included,
523
                        )
524
                        .await
525
                    {
526
                        Ok(None) => continue,
527
                        Ok(Some(pvd)) => pvd.max_pov_size,
528
                        Err(err) => {
529
                            tracing::error!(target: crate::LOG_TARGET, ?err, "Failed to gather information from relay-client");
530
                            continue;
531
                        }
532
                    };
533

            
534
                    let parent_search_params = ParentSearchParams {
535
                        relay_parent,
536
                        para_id: params.para_id,
537
                        ancestry_lookback: max_ancestry_lookback(relay_parent, &params.relay_client).await,
538
                        max_depth: PARENT_SEARCH_DEPTH,
539
                        ignore_alternative_branches: true,
540
                    };
541

            
542
                    let potential_parents =
543
                        cumulus_client_consensus_common::find_potential_parents::<Block>(
544
                            parent_search_params,
545
                            &*params.para_backend,
546
                            &params.relay_client,
547
                        )
548
                        .await;
549

            
550
                    let mut potential_parents = match potential_parents {
551
                        Err(e) => {
552
                            tracing::error!(
553
                                target: crate::LOG_TARGET,
554
                                ?relay_parent,
555
                                err = ?e,
556
                                "Could not fetch potential parents to build upon"
557
                            );
558

            
559
                            continue;
560
                        }
561
                        Ok(x) => x,
562
                    };
563

            
564
                    let included_block = match potential_parents.iter().find(|x| x.depth == 0) {
565
                        None => continue, // also serves as an `is_empty` check.
566
                        Some(b) => b.hash,
567
                    };
568

            
569
                    let para_client = &*params.para_client;
570
                    let keystore = &params.keystore;
571
                    let can_build_upon = |slot_now, block_hash, aux_data| {
572
                        can_build_upon::<_, _, P>(
573
                            slot_now,
574
                            aux_data,
575
                            block_hash,
576
                            included_block,
577
                            params.force_authoring,
578
                            para_client,
579
                            keystore,
580
                        )
581
                    };
582

            
583
                    // Sort by depth, ascending, to choose the longest chain.
584
                    //
585
                    // If the longest chain has space, build upon that. Otherwise, don't
586
                    // build at all.
587
                    potential_parents.sort_by_key(|a| a.depth);
588
                    let initial_parent = match potential_parents.pop() {
589
                        None => continue,
590
                        Some(p) => p,
591
                    };
592

            
593
                    // Build in a loop until not allowed. Note that the authorities can change
594
                    // at any block, so we need to re-claim our slot every time.
595
                    let mut parent_hash = initial_parent.hash;
596
                    let mut parent_header = initial_parent.header;
597

            
598
                    let core_indices = cores_scheduled_for_para(
599
                        relay_parent,
600
                        params.para_id,
601
                        &mut params.overseer_handle,
602
                        &params.relay_client,
603
                    )
604
                    .await;
605

            
606
                    let overseer_handle = &mut params.overseer_handle;
607

            
608
                    // This needs to change to support elastic scaling, but for continuously
609
                    // scheduled chains this ensures that the backlog will grow steadily.
610
                    for n_built in 0..2 {
611
                        let validation_data = PersistedValidationData {
612
                            parent_head: parent_header.encode().into(),
613
                            relay_parent_number: *relay_parent_header.number(),
614
                            relay_parent_storage_root: *relay_parent_header.state_root(),
615
                            max_pov_size,
616
                        };
617

            
618
                        // Retrieve authorities that are able to produce the block
619
                        let aux_data = match params
620
                            .get_orchestrator_aux_data
621
                            .retrieve_authorities_from_orchestrator(
622
                                parent_hash,
623
                                (relay_parent_header.hash(), validation_data.clone()),
624
                            )
625
                            .await
626
                        {
627
                            Err(e) => {
628
                                tracing::error!(target: crate::LOG_TARGET, ?e);
629
                                break;
630
                            }
631
                            Ok(h) => h,
632
                        };
633

            
634
                        let inherent_providers = match params
635
                            .create_inherent_data_providers
636
                            .create_inherent_data_providers(
637
                                parent_hash,
638
                                (relay_parent_header.hash(), validation_data.clone()),
639
                            )
640
                            .await
641
                        {
642
                            Err(e) => {
643
                                tracing::error!(target: crate::LOG_TARGET, ?e);
644
                                break;
645
                            }
646
                            Ok(h) => h,
647
                        };
648

            
649
                        // TODO: Currently we use just the first core here, but for elastic scaling
650
                        // we iterate and build on all of the cores returned.
651
                        // More info: https://github.com/paritytech/polkadot-sdk/issues/1829
652
                        let (is_parachain, core_index) = match (&aux_data.slot_freq, core_indices.first()) {
653
                            (None, None) => {
654
                                tracing::warn!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parachain and we do not have core allocated, nothing to do");
655
                                break;
656
                            }, // We are parachain and we do not have core allocated, nothing to do,
657
                            (None, Some(core_index)) => {
658
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, ?core_index, "We are parachain and we core allocated, let's collate the block");
659
                                (true, core_index)
660
                            }, // We are parachain and we have core allocated, let's continue
661
                            (Some(_slot_frequency), None) => { // We are parathread and core is not allocated. Let's try to buy core
662
                                tracing::trace!(target: crate::LOG_TARGET, para_id = ?params.para_id, "We are parathread and we do not have core allocated, let's try to buy the core");
663
                                let slot = inherent_providers.slot();
664
                                let container_chain_slot_duration = (params.get_current_slot_duration)(parent_header.hash());
665

            
666
                                let buy_core_result = match &params.buy_core_params {
667
                                    BuyCoreParams::Orchestrator {
668
                                        orchestrator_client,
669
                                        orchestrator_tx_pool,
670
                                    } => {
671
                                        try_to_buy_core::<_, _, <<OBlock as BlockT>::Header as HeaderT>::Number, _, CIDP, _, _>(params.para_id, aux_data, inherent_providers, &params.keystore, orchestrator_client.clone(), orchestrator_tx_pool.clone(), parent_header, params.orchestrator_slot_duration, container_chain_slot_duration).await
672
                                    }
673
                                    BuyCoreParams::Solochain {
674

            
675
                                    } => {
676
                                        // TODO: implement parathread support for solochain
677
                                        log::warn!("Unimplemented: cannot buy core for parathread in solochain");
678
                                        break;
679
                                    }
680
                                };
681
                                match buy_core_result {
682
                                    Ok(block_hash) => {
683
                                        tracing::trace!(target: crate::LOG_TARGET, ?block_hash, "Sent unsigned extrinsic to buy the core");
684
                                    },
685
                                    Err(buy_core_error) => {
686
                                        buy_core_error.log_error(slot, params.para_id, relay_parent);
687
                                    }
688
                                };
689
                                break; // No point in continuing as we need to wait for few relay blocks in order for our core to be available.
690
                            },
691
                            (Some(_slot_frequency), Some(core_index)) => { // We are parathread and we do have core, let's continue
692
                                tracing::trace!(target: crate::LOG_TARGET, ?core_index, "We are parathread and we do have core allocated, let's collate the block");
693
                                (false, core_index)
694
                            }
695
                        };
696

            
697
                        let mut slot_claim = match can_build_upon(
698
                            inherent_providers.slot(),
699
                            parent_header.clone(),
700
                            aux_data,
701
                        )
702
                        .await
703
                        {
704
                            Ok(None) => break,
705
                            Err(e) => {
706
                                tracing::error!(target: crate::LOG_TARGET, ?e);
707
                                break;
708
                            }
709
                            Ok(Some(c)) => c,
710
                        };
711

            
712
                        tracing::debug!(
713
                            target: crate::LOG_TARGET,
714
                            ?relay_parent,
715
                            unincluded_segment_len = initial_parent.depth + n_built,
716
                            "Slot claimed. Building"
717
                        );
718

            
719
                        // Build and announce collations recursively until
720
                        // `can_build_upon` fails or building a collation fails.
721
                        let (parachain_inherent_data, other_inherent_data) = match collator
722
                            .create_inherent_data(relay_parent, &validation_data, parent_hash, None)
723
                            .await
724
                        {
725
                            Err(err) => {
726
                                tracing::error!(target: crate::LOG_TARGET, ?err);
727
                                break;
728
                            }
729
                            Ok(x) => x,
730
                        };
731

            
732
                        let validation_code_hash = match params.code_hash_provider.code_hash_at(parent_hash)
733
                        {
734
                            None => {
735
                                tracing::error!(target: crate::LOG_TARGET, ?parent_hash, "Could not fetch validation code hash");
736
                                break;
737
                            }
738
                            Some(v) => v,
739
                        };
740

            
741
                        let allowed_pov_size = if let Some(max_pov_percentage) = params.max_pov_percentage {
742
                            validation_data.max_pov_size * max_pov_percentage / 100
743
                        } else {
744
                            // Set the block limit to 85% of the maximum PoV size.
745
                            //
746
                            // Once https://github.com/paritytech/polkadot-sdk/issues/6020 issue is
747
                            // fixed, the reservation should be removed.
748
                            validation_data.max_pov_size * 85 / 100
749
                        } as usize;
750

            
751
                        match collator
752
                            .collate(
753
                                &parent_header,
754
                                &mut slot_claim,
755
                                None,
756
                                (parachain_inherent_data, other_inherent_data),
757
                                params.authoring_duration,
758
                                allowed_pov_size,
759
                            )
760
                            .await
761
                        {
762
                            Ok(Some((collation, block_data, new_block_hash))) => {
763
                                // Here we are assuming that the import logic protects against equivocations
764
                                // and provides sybil-resistance, as it should.
765
                                collator
766
                                    .collator_service()
767
                                    .announce_block(new_block_hash, None);
768

            
769
                                // Send a submit-collation message to the collation generation subsystem,
770
                                // which then distributes this to validators.
771
                                //
772
                                // Here we are assuming that the leaf is imported, as we've gotten an
773
                                // import notification.
774
                                overseer_handle
775
                                    .send_msg(
776
                                        CollationGenerationMessage::SubmitCollation(
777
                                            SubmitCollationParams {
778
                                                relay_parent,
779
                                                collation,
780
                                                parent_head: parent_header.encode().into(),
781
                                                validation_code_hash,
782
                                                result_sender: None,
783
                                                core_index: *core_index
784
                                            },
785
                                        ),
786
                                        "SubmitCollation",
787
                                    )
788
                                    .await;
789

            
790
                                parent_hash = new_block_hash;
791
                                parent_header = block_data.into_header();
792
                            }
793
                            Ok(None) => {
794
                                tracing::debug!(target: crate::LOG_TARGET, "Lookahead collator: No block proposal");
795
                            }
796
                            Err(err) => {
797
                                tracing::error!(target: crate::LOG_TARGET, ?err);
798
                                break;
799
                            }
800
                        }
801

            
802
                        // If it is parathread, no point in async backing as we would have to do
803
                        // buy core first
804
                        if !is_parachain {
805
                            tracing::trace!(target: crate::LOG_TARGET, "Not a parachain so terminated at {:?}", n_built);
806
                            break;
807
                        }
808
                    }
809
                },
810
                _ = params.cancellation_token.cancelled() => {
811
                    log::info!("Stopping lookahead collator");
812
                    break;
813
                }
814
            }
815
        }
816

            
817
        // Notifying that we have exited
818
        let _ = exit_notification_sender.send(());
819
    };
820

            
821
    (aura_fut, exit_notification_receiver)
822
}
823

            
824
// Checks if we own the slot at the given block and whether there
825
// is space in the unincluded segment.
826
async fn can_build_upon<Block: BlockT, Client, P>(
827
    slot: Slot,
828
    aux_data: OrchestratorAuraWorkerAuxData<P>,
829
    parent_header: Block::Header,
830
    included_block: <Block as BlockT>::Hash,
831
    force_authoring: bool,
832
    client: &Client,
833
    keystore: &KeystorePtr,
834
) -> Result<Option<SlotClaim<P::Public>>, Box<dyn Error>>
835
where
836
    Client: ProvideRuntimeApi<Block>,
837
    Client::Api: UnincludedSegmentApi<Block>,
838
    P: Pair + Send + Sync + 'static,
839
    P::Public: Codec + std::fmt::Debug,
840
    P::Signature: Codec,
841
{
842
    let runtime_api = client.runtime_api();
843

            
844
    let claim_mode = if force_authoring {
845
        ClaimMode::ForceAuthoring
846
    } else {
847
        ClaimMode::NormalAuthoring
848
    };
849

            
850
    let slot_claim =
851
        tanssi_claim_slot::<P, Block>(aux_data, &parent_header, slot, claim_mode, keystore);
852

            
853
    // Here we lean on the property that building on an empty unincluded segment must always
854
    // be legal. Skipping the runtime API query here allows us to seamlessly run this
855
    // collator against chains which have not yet upgraded their runtime.
856
    if parent_header.hash() != included_block
857
        && !runtime_api.can_build_upon(parent_header.hash(), included_block, slot)?
858
    {
859
        return Ok(None);
860
    }
861

            
862
    Ok(slot_claim)
863
}
864

            
865
/// Reads allowed ancestry length parameter from the relay chain storage at the given relay parent.
866
///
867
/// Falls back to 0 in case of an error.
868
async fn max_ancestry_lookback(
869
    relay_parent: PHash,
870
    relay_client: &impl RelayChainInterface,
871
) -> usize {
872
    match load_abridged_host_configuration(relay_parent, relay_client).await {
873
        Ok(Some(config)) => config.async_backing_params.allowed_ancestry_len as usize,
874
        Ok(None) => {
875
            tracing::error!(
876
                target: crate::LOG_TARGET,
877
                "Active config is missing in relay chain storage",
878
            );
879
            0
880
        }
881
        Err(err) => {
882
            tracing::error!(
883
                target: crate::LOG_TARGET,
884
                ?err,
885
                ?relay_parent,
886
                "Failed to read active config from relay chain client",
887
            );
888
            0
889
        }
890
    }
891
}
892

            
893
// Checks if there exists a scheduled core for the para at the provided relay parent.
894
//
895
// Falls back to `false` in case of an error.
896
async fn cores_scheduled_for_para(
897
    relay_parent: PHash,
898
    para_id: ParaId,
899
    overseer_handle: &mut OverseerHandle,
900
    relay_client: &impl RelayChainInterface,
901
) -> Vec<CoreIndex> {
902
    let (tx, rx) = oneshot::channel();
903
    let request = RuntimeApiRequest::AvailabilityCores(tx);
904
    overseer_handle
905
        .send_msg(
906
            RuntimeApiMessage::Request(relay_parent, request),
907
            "LookaheadCollator",
908
        )
909
        .await;
910

            
911
    let max_candidate_depth = async_backing_params(relay_parent, relay_client)
912
        .await
913
        .map(|c| c.max_candidate_depth)
914
        .unwrap_or(0);
915

            
916
    let cores = match rx.await {
917
        Ok(Ok(cores)) => cores,
918
        Ok(Err(error)) => {
919
            tracing::error!(
920
                target: crate::LOG_TARGET,
921
                ?error,
922
                ?relay_parent,
923
                "Failed to query availability cores runtime API",
924
            );
925
            return Vec::new();
926
        }
927
        Err(oneshot::Canceled) => {
928
            tracing::error!(
929
                target: crate::LOG_TARGET,
930
                ?relay_parent,
931
                "Sender for availability cores runtime request dropped",
932
            );
933
            return Vec::new();
934
        }
935
    };
936

            
937
    cores
938
        .iter()
939
        .enumerate()
940
        .filter_map(|(index, core)| {
941
            let core_para_id = match core {
942
                CoreState::Scheduled(scheduled_core) => Some(scheduled_core.para_id),
943
                CoreState::Occupied(occupied_core) if max_candidate_depth >= 1 => occupied_core
944
                    .next_up_on_available
945
                    .as_ref()
946
                    .map(|scheduled_core| scheduled_core.para_id),
947
                CoreState::Free | CoreState::Occupied(_) => None,
948
            };
949

            
950
            if core_para_id == Some(para_id) {
951
                Some(CoreIndex(index as u32))
952
            } else {
953
                None
954
            }
955
        })
956
        .collect()
957
}
958

            
959
/// Reads async backing parameters from the relay chain storage at the given relay parent.
960
async fn async_backing_params(
961
    relay_parent: PHash,
962
    relay_client: &impl RelayChainInterface,
963
) -> Option<AsyncBackingParams> {
964
    match load_abridged_host_configuration(relay_parent, relay_client).await {
965
        Ok(Some(config)) => Some(config.async_backing_params),
966
        Ok(None) => {
967
            tracing::error!(
968
                target: crate::LOG_TARGET,
969
                "Active config is missing in relay chain storage",
970
            );
971
            None
972
        }
973
        Err(err) => {
974
            tracing::error!(
975
                target: crate::LOG_TARGET,
976
                ?err,
977
                ?relay_parent,
978
                "Failed to read active config from relay chain client",
979
            );
980
            None
981
        }
982
    }
983
}