1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
mod ws_client;
18

            
19
use {
20
    async_trait::async_trait,
21
    core::pin::Pin,
22
    dc_orchestrator_chain_interface::{
23
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
24
        NimbusId, OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult,
25
        PHash, PHeader,
26
    },
27
    dp_core::ParaId,
28
    futures::{Stream, StreamExt},
29
    jsonrpsee::{core::params::ArrayParams, rpc_params},
30
    sc_client_api::{StorageData, StorageProof},
31
    sc_rpc_api::state::ReadProof,
32
    sc_service::TaskManager,
33
    serde::de::DeserializeOwned,
34
    sp_core::{Decode, Encode},
35
    sp_state_machine::StorageValue,
36
    sp_storage::StorageKey,
37
    tokio::sync::{mpsc, oneshot},
38
    url::Url,
39
    ws_client::{JsonRpcRequest, WsClientRequest},
40
};
41

            
42
const LOG_TARGET: &str = "orchestrator-rpc-client";
43
const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
44

            
45
/// Format url and force addition of a port
46
fn url_to_string_with_port(url: Url) -> Option<String> {
47
    // This is already validated on CLI side, just defensive here
48
    if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
49
        tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
50
        return None;
51
    }
52

            
53
    // Either we have a user-supplied port or use the default for 'ws' or 'wss' here
54
    Some(format!(
55
        "{}://{}:{}{}{}",
56
        url.scheme(),
57
        url.host_str()?,
58
        url.port_or_known_default()?,
59
        url.path(),
60
        url.query()
61
            .map(|query| format!("?{}", query))
62
            .unwrap_or_default()
63
    ))
64
}
65

            
66
pub async fn create_client_and_start_worker(
67
    urls: Vec<Url>,
68
    task_manager: &mut TaskManager,
69
    overseer_handle: Option<polkadot_overseer::Handle>,
70
) -> OrchestratorChainResult<OrchestratorChainRpcClient> {
71
    let urls: Vec<_> = urls
72
        .into_iter()
73
        .filter_map(url_to_string_with_port)
74
        .collect();
75
    let (worker, request_sender) = ws_client::ReconnectingWsClientWorker::new(urls)
76
        .await
77
        .map_err(|_| {
78
            OrchestratorChainError::GenericError(
79
                "Failed to connect to all provided Orchestrator chain RPC endpoints".to_string(),
80
            )
81
        })?;
82

            
83
    task_manager
84
        .spawn_essential_handle()
85
        .spawn("orchestrator-rpc-worker", None, worker.run());
86

            
87
    let client = OrchestratorChainRpcClient {
88
        request_sender,
89
        overseer_handle,
90
    };
91

            
92
    Ok(client)
93
}
94

            
95
#[derive(Clone)]
96
pub struct OrchestratorChainRpcClient {
97
    request_sender: mpsc::Sender<WsClientRequest>,
98
    overseer_handle: Option<polkadot_overseer::Handle>,
99
}
100

            
101
impl OrchestratorChainRpcClient {
102
    /// Call a call to `state_call` rpc method.
103
    pub async fn call_remote_runtime_function<R: Decode>(
104
        &self,
105
        method_name: &str,
106
        hash: PHash,
107
        payload: Option<impl Encode>,
108
    ) -> OrchestratorChainResult<R> {
109
        let payload_bytes =
110
            payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
111
        let params = rpc_params! {
112
            method_name,
113
            payload_bytes,
114
            hash
115
        };
116
        let res = self
117
            .request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
118
                tracing::debug!(
119
                    target: LOG_TARGET,
120
                    %method_name,
121
                    %hash,
122
                    error = %err,
123
                    "Error during call to 'state_call'.",
124
                );
125
            })
126
            .await?;
127
        Decode::decode(&mut &*res.0).map_err(Into::into)
128
    }
129

            
130
    async fn request<'a, R>(
131
        &self,
132
        method: &'a str,
133
        params: ArrayParams,
134
    ) -> OrchestratorChainResult<R>
135
    where
136
        R: DeserializeOwned + std::fmt::Debug,
137
    {
138
        self.request_tracing(
139
            method,
140
            params,
141
            |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
142
        ).await
143
    }
144

            
145
    fn send_register_message(
146
        &self,
147
        message_builder: impl FnOnce(mpsc::Sender<dp_core::Header>) -> WsClientRequest,
148
    ) -> OrchestratorChainResult<mpsc::Receiver<dp_core::Header>> {
149
        let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_SIZE_LIMIT);
150
        self.request_sender
151
            .try_send(message_builder(tx))
152
            .map_err(|e| OrchestratorChainError::WorkerCommunicationError(e.to_string()))?;
153
        Ok(rx)
154
    }
155

            
156
    /// Send a request to the RPC worker and awaits for a response. The worker is responsible
157
    /// for retrying requests if connection dies.
158
    async fn request_tracing<'a, R, OR>(
159
        &self,
160
        method: &'a str,
161
        params: ArrayParams,
162
        trace_error: OR,
163
    ) -> OrchestratorChainResult<R>
164
    where
165
        R: DeserializeOwned + std::fmt::Debug,
166
        OR: Fn(&OrchestratorChainError),
167
    {
168
        let (response_sender, response_receiver) = oneshot::channel();
169

            
170
        let request = WsClientRequest::JsonRpcRequest(JsonRpcRequest {
171
            method: method.into(),
172
            params,
173
            response_sender,
174
        });
175
        self.request_sender.send(request).await.map_err(|err| {
176
            OrchestratorChainError::WorkerCommunicationError(format!(
177
                "Unable to send message to RPC worker: {}",
178
                err
179
            ))
180
        })?;
181

            
182
        let response = response_receiver.await.map_err(|err| {
183
			OrchestratorChainError::WorkerCommunicationError(format!(
184
				"RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
185
				err
186
			))
187
		})??;
188

            
189
        serde_json::from_value(response).map_err(|_| {
190
            trace_error(&OrchestratorChainError::GenericError(
191
                "Unable to deserialize value".to_string(),
192
            ));
193
            OrchestratorChainError::RpcCallError(
194
                method.to_string(),
195
                "failed to decode returned value".to_string(),
196
            )
197
        })
198
    }
199

            
200
    /// Retrieve storage item at `storage_key`
201
    pub async fn state_get_storage(
202
        &self,
203
        storage_key: StorageKey,
204
        at: Option<PHash>,
205
    ) -> OrchestratorChainResult<Option<StorageData>> {
206
        let params = rpc_params![storage_key, at];
207
        self.request("state_getStorage", params).await
208
    }
209

            
210
    /// Get read proof for `storage_keys`
211
    pub async fn state_get_read_proof(
212
        &self,
213
        storage_keys: Vec<StorageKey>,
214
        at: Option<PHash>,
215
    ) -> OrchestratorChainResult<ReadProof<PHash>> {
216
        let params = rpc_params![storage_keys, at];
217
        self.request("state_getReadProof", params).await
218
    }
219
}
220

            
221
#[async_trait]
222
impl OrchestratorChainInterface for OrchestratorChainRpcClient {
223
    /// Fetch a storage item by key.
224
    async fn get_storage_by_key(
225
        &self,
226
        orchestrator_parent: PHash,
227
        key: &[u8],
228
    ) -> OrchestratorChainResult<Option<StorageValue>> {
229
        let storage_key = StorageKey(key.to_vec());
230
        self.state_get_storage(storage_key, Some(orchestrator_parent))
231
            .await
232
            .map(|storage_data| storage_data.map(|sv| sv.0))
233
    }
234

            
235
    /// Get a handle to the overseer.
236
    fn overseer_handle(&self) -> OrchestratorChainResult<polkadot_overseer::Handle> {
237
        self.overseer_handle
238
            .clone()
239
            .ok_or(OrchestratorChainError::GenericError(
240
                "OrchestratorChainRpcClient doesn't contain an Overseer Handle".to_string(),
241
            ))
242
    }
243

            
244
    /// Generate a storage read proof.
245
    async fn prove_read(
246
        &self,
247
        orchestrator_parent: PHash,
248
        relevant_keys: &Vec<Vec<u8>>,
249
    ) -> OrchestratorChainResult<StorageProof> {
250
        let mut cloned = Vec::new();
251
        cloned.extend_from_slice(relevant_keys);
252
        let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
253

            
254
        self.state_get_read_proof(storage_keys, Some(orchestrator_parent))
255
            .await
256
            .map(|read_proof| {
257
                StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
258
            })
259
    }
260

            
261
    /// Get a stream of import block notifications.
262
    async fn import_notification_stream(
263
        &self,
264
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
265
        let rx = self.send_register_message(WsClientRequest::RegisterImportListener)?;
266
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
267
        Ok(stream.boxed())
268
    }
269

            
270
    /// Get a stream of new best block notifications.
271
    async fn new_best_notification_stream(
272
        &self,
273
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
274
        let rx = self.send_register_message(WsClientRequest::RegisterBestHeadListener)?;
275
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
276
        Ok(stream.boxed())
277
    }
278

            
279
    /// Get a stream of finality notifications.
280
    async fn finality_notification_stream(
281
        &self,
282
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
283
        let rx = self.send_register_message(WsClientRequest::RegisterFinalizationListener)?;
284
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
285
        Ok(stream.boxed())
286
    }
287

            
288
    async fn genesis_data(
289
        &self,
290
        orchestrator_parent: PHash,
291
        para_id: ParaId,
292
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
293
        self.call_remote_runtime_function(
294
            "RegistrarApi_genesis_data",
295
            orchestrator_parent,
296
            Some(para_id),
297
        )
298
        .await
299
    }
300

            
301
    async fn boot_nodes(
302
        &self,
303
        orchestrator_parent: PHash,
304
        para_id: ParaId,
305
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
306
        self.call_remote_runtime_function(
307
            "RegistrarApi_boot_nodes",
308
            orchestrator_parent,
309
            Some(para_id),
310
        )
311
        .await
312
    }
313

            
314
    async fn latest_block_number(
315
        &self,
316
        orchestrator_parent: PHash,
317
        para_id: ParaId,
318
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
319
        self.call_remote_runtime_function(
320
            "AuthorNotingApi_latest_block_number",
321
            orchestrator_parent,
322
            Some(para_id),
323
        )
324
        .await
325
    }
326

            
327
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
328
        self.request("chain_getHead", rpc_params![]).await
329
    }
330

            
331
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
332
        self.request("chain_getFinalizedHead", rpc_params![]).await
333
    }
334

            
335
    async fn data_preserver_active_assignment(
336
        &self,
337
        orchestrator_parent: PHash,
338
        profile_id: DataPreserverProfileId,
339
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
340
        self.call_remote_runtime_function(
341
            "DataPreserversApi_get_active_assignment",
342
            orchestrator_parent,
343
            Some(profile_id),
344
        )
345
        .await
346
    }
347

            
348
    async fn check_para_id_assignment(
349
        &self,
350
        orchestrator_parent: PHash,
351
        authority: NimbusId,
352
    ) -> OrchestratorChainResult<Option<ParaId>> {
353
        self.call_remote_runtime_function(
354
            "TanssiAuthorityAssignmentApi_check_para_id_assignment",
355
            orchestrator_parent,
356
            Some(authority),
357
        )
358
        .await
359
    }
360

            
361
    async fn check_para_id_assignment_next_session(
362
        &self,
363
        orchestrator_parent: PHash,
364
        authority: NimbusId,
365
    ) -> OrchestratorChainResult<Option<ParaId>> {
366
        self.call_remote_runtime_function(
367
            "TanssiAuthorityAssignmentApi_check_para_id_assignment_next_session",
368
            orchestrator_parent,
369
            Some(authority),
370
        )
371
        .await
372
    }
373
}