1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    crate::{
19
        collators as collator_util, collators::ClaimMode,
20
        consensus_orchestrator::RetrieveAuthoritiesFromOrchestrator, OrchestratorAuraWorkerAuxData,
21
    },
22
    cumulus_client_collator::{
23
        relay_chain_driven::CollationRequest, service::ServiceInterface as CollatorServiceInterface,
24
    },
25
    cumulus_client_consensus_proposer::ProposerInterface,
26
    cumulus_primitives_core::{
27
        relay_chain::{BlockId as RBlockId, Hash as PHash, OccupiedCoreAssumption},
28
        PersistedValidationData,
29
    },
30
    cumulus_relay_chain_interface::RelayChainInterface,
31
    futures::{channel::mpsc::Receiver, prelude::*},
32
    parity_scale_codec::{Codec, Decode},
33
    polkadot_node_primitives::CollationResult,
34
    polkadot_overseer::Handle as OverseerHandle,
35
    polkadot_primitives::{CollatorPair, Id as ParaId},
36
    sc_client_api::{backend::AuxStore, BlockBackend, BlockOf},
37
    sc_consensus::BlockImport,
38
    sc_consensus_slots::InherentDataProviderExt,
39
    sp_api::ProvideRuntimeApi,
40
    sp_application_crypto::AppPublic,
41
    sp_blockchain::HeaderBackend,
42
    sp_consensus::SyncOracle,
43
    sp_consensus_aura::SlotDuration,
44
    sp_core::crypto::Pair,
45
    sp_inherents::CreateInherentDataProviders,
46
    sp_keystore::KeystorePtr,
47
    sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member},
48
    std::{convert::TryFrom, sync::Arc, time::Duration},
49
};
50

            
51
/// Parameters for [`run`].
52
pub struct Params<BI, CIDP, Client, RClient, SO, Proposer, CS, GOH> {
53
    pub create_inherent_data_providers: CIDP,
54
    pub get_orchestrator_aux_data: GOH,
55
    pub block_import: BI,
56
    pub para_client: Arc<Client>,
57
    pub relay_client: RClient,
58
    pub sync_oracle: SO,
59
    pub keystore: KeystorePtr,
60
    pub collator_key: CollatorPair,
61
    pub para_id: ParaId,
62
    pub overseer_handle: OverseerHandle,
63
    pub slot_duration: SlotDuration,
64
    pub relay_chain_slot_duration: Duration,
65
    pub proposer: Proposer,
66
    pub collator_service: CS,
67
    pub authoring_duration: Duration,
68
    pub force_authoring: bool,
69
    pub collation_request_receiver: Option<Receiver<CollationRequest>>,
70
}
71

            
72
/// Run tanssi Aura consensus as a relay-chain-driven collator.
73
pub async fn run<Block, P, BI, CIDP, Client, RClient, SO, Proposer, CS, GOH>(
74
    params: Params<BI, CIDP, Client, RClient, SO, Proposer, CS, GOH>,
75
) where
76
    Block: BlockT + Send,
77
    Client: ProvideRuntimeApi<Block>
78
        + BlockOf
79
        + AuxStore
80
        + HeaderBackend<Block>
81
        + BlockBackend<Block>
82
        + Send
83
        + Sync
84
        + 'static,
85
    RClient: RelayChainInterface + Send + Clone + 'static,
86
    CIDP: CreateInherentDataProviders<Block, (PHash, PersistedValidationData)>
87
        + Send
88
        + 'static
89
        + Clone,
90
    CIDP::InherentDataProviders: Send + InherentDataProviderExt,
91
    BI: BlockImport<Block> + Send + Sync + 'static,
92
    SO: SyncOracle + Send + Sync + Clone + 'static,
93
    Proposer: ProposerInterface<Block> + Send + Sync + 'static,
94
    CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
95
    P: Pair + Sync + Send + 'static,
96
    P::Public: AppPublic + Member + Codec,
97
    P::Signature: TryFrom<Vec<u8>> + Member + Codec,
98
    GOH: RetrieveAuthoritiesFromOrchestrator<
99
            Block,
100
            (PHash, PersistedValidationData),
101
            OrchestratorAuraWorkerAuxData<P>,
102
        >
103
        + 'static
104
        + Sync
105
        + Send,
106
{
107
    let mut collation_requests = match params.collation_request_receiver {
108
        Some(receiver) => receiver,
109
        None => {
110
            cumulus_client_collator::relay_chain_driven::init(
111
                params.collator_key,
112
                params.para_id,
113
                params.overseer_handle,
114
            )
115
            .await
116
        }
117
    };
118

            
119
    let mut collator = {
120
        let params = collator_util::Params {
121
            create_inherent_data_providers: params.create_inherent_data_providers.clone(),
122
            block_import: params.block_import,
123
            relay_client: params.relay_client.clone(),
124
            keystore: params.keystore.clone(),
125
            para_id: params.para_id,
126
            proposer: params.proposer,
127
            collator_service: params.collator_service,
128
        };
129

            
130
        collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
131
    };
132

            
133
    let mut last_processed_slot = 0;
134
    let mut last_relay_chain_block = Default::default();
135

            
136
    while let Some(request) = collation_requests.next().await {
137
        macro_rules! reject_with_error {
138
				($err:expr) => {{
139
					request.complete(None);
140
					tracing::error!(target: crate::LOG_TARGET, err = ?{ $err });
141
					continue;
142
				}};
143
			}
144

            
145
        macro_rules! try_request {
146
            ($x:expr) => {{
147
                match $x {
148
                    Ok(x) => x,
149
                    Err(e) => reject_with_error!(e),
150
                }
151
            }};
152
        }
153

            
154
        let validation_data = request.persisted_validation_data();
155

            
156
        let parent_header = try_request!(Block::Header::decode(
157
            &mut &validation_data.parent_head.0[..]
158
        ));
159

            
160
        let parent_hash = parent_header.hash();
161

            
162
        // Evaluate whether we can build on top
163
        // The requirement is that the parent_hash is the last included block in the relay
164
        let can_build = can_build_upon_included::<Block, _>(
165
            parent_hash,
166
            &collator.relay_client,
167
            params.para_id,
168
            *request.relay_parent(),
169
        )
170
        .await;
171
        if !can_build {
172
            continue;
173
        }
174

            
175
        // Check whether we can build upon this block
176
        if !collator
177
            .collator_service()
178
            .check_block_status(parent_hash, &parent_header)
179
        {
180
            continue;
181
        }
182

            
183
        let relay_parent_header = match params
184
            .relay_client
185
            .header(RBlockId::hash(*request.relay_parent()))
186
            .await
187
        {
188
            Err(e) => reject_with_error!(e),
189
            Ok(None) => continue, // sanity: would be inconsistent to get `None` here
190
            Ok(Some(h)) => h,
191
        };
192

            
193
        // Retrieve authorities that are able to produce the block
194
        let authorities = match params
195
            .get_orchestrator_aux_data
196
            .retrieve_authorities_from_orchestrator(
197
                parent_hash,
198
                (relay_parent_header.hash(), validation_data.clone()),
199
            )
200
            .await
201
        {
202
            Err(e) => reject_with_error!(e),
203
            Ok(h) => h,
204
        };
205

            
206
        let inherent_providers = match params
207
            .create_inherent_data_providers
208
            .create_inherent_data_providers(
209
                parent_hash,
210
                (*request.relay_parent(), validation_data.clone()),
211
            )
212
            .await
213
        {
214
            Err(e) => reject_with_error!(e),
215
            Ok(h) => h,
216
        };
217

            
218
        let claim_mode = if params.force_authoring {
219
            ClaimMode::ForceAuthoring
220
        } else {
221
            ClaimMode::NormalAuthoring
222
        };
223

            
224
        let mut claim = match collator_util::tanssi_claim_slot::<P, Block>(
225
            authorities,
226
            &parent_header,
227
            inherent_providers.slot(),
228
            claim_mode,
229
            &params.keystore,
230
        ) {
231
            None => continue,
232
            Some(h) => h,
233
        };
234

            
235
        // With async backing this function will be called every relay chain block.
236
        //
237
        // Most parachains currently run with 12 seconds slots and thus, they would try to
238
        // produce multiple blocks per slot which very likely would fail on chain. Thus, we have
239
        // this "hack" to only produce on block per slot.
240
        //
241
        // With https://github.com/paritytech/polkadot-sdk/issues/3168 this implementation will be
242
        // obsolete and also the underlying issue will be fixed.
243
        if last_processed_slot >= *claim.slot()
244
            && last_relay_chain_block < *relay_parent_header.number()
245
        {
246
            continue;
247
        }
248

            
249
        let (parachain_inherent_data, other_inherent_data) = try_request!(
250
            collator
251
                .create_inherent_data(
252
                    *request.relay_parent(),
253
                    validation_data,
254
                    parent_hash,
255
                    None,
256
                    None
257
                )
258
                .await
259
        );
260

            
261
        // Set the block limit to 50% of the maximum PoV size.
262
        //
263
        // TODO: If we got benchmarking that includes the proof size,
264
        // we should be able to use the maximum pov size.
265
        let allowed_pov_size = (validation_data.max_pov_size / 2) as usize;
266

            
267
        let maybe_collation = try_request!(
268
            collator
269
                .collate(
270
                    &parent_header,
271
                    &mut claim,
272
                    None,
273
                    (parachain_inherent_data, other_inherent_data),
274
                    params.authoring_duration,
275
                    allowed_pov_size,
276
                )
277
                .await
278
        );
279

            
280
        if let Some((collation, block_data)) = maybe_collation {
281
            let Some(block_hash) = block_data.blocks().first().map(|b| b.hash()) else {
282
                continue;
283
            };
284
            let result_sender = Some(
285
                collator
286
                    .collator_service()
287
                    .announce_with_barrier(block_hash),
288
            );
289
            request.complete(Some(CollationResult {
290
                collation,
291
                result_sender,
292
            }));
293
        } else {
294
            request.complete(None);
295
            tracing::debug!(target: crate::LOG_TARGET, "No block proposal");
296
        }
297
        last_processed_slot = *claim.slot();
298
        last_relay_chain_block = *relay_parent_header.number();
299
    }
300
}
301

            
302
// Checks whether we can build upon the last included block
303
// Essentially checks that the latest head we are trying to build
304
// is the one included in the relay
305
async fn can_build_upon_included<Block: BlockT, RClient>(
306
    parent_hash: <Block as BlockT>::Hash,
307
    relay_client: &RClient,
308
    para_id: ParaId,
309
    relay_parent: PHash,
310
) -> bool
311
where
312
    RClient: RelayChainInterface + Send + Clone + 'static,
313
{
314
    let included_header = relay_client
315
        .persisted_validation_data(relay_parent, para_id, OccupiedCoreAssumption::TimedOut)
316
        .await;
317

            
318
    if let Ok(Some(included_header)) = included_header {
319
        let decoded = Block::Header::decode(&mut &included_header.parent_head.0[..]).ok();
320
        if let Some(decoded_header) = decoded {
321
            let included_hash = decoded_header.hash();
322
            if parent_hash == included_hash {
323
                return true;
324
            }
325
        }
326
    }
327
    false
328
}