1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
mod ws_client;
18

            
19
use {
20
    async_trait::async_trait,
21
    core::pin::Pin,
22
    dc_orchestrator_chain_interface::{
23
        BlockNumber, ContainerChainGenesisData, DataPreserverAssignment, DataPreserverProfileId,
24
        NimbusId, OrchestratorChainError, OrchestratorChainInterface, OrchestratorChainResult,
25
        PHash, PHeader,
26
    },
27
    dp_core::ParaId,
28
    futures::{Stream, StreamExt},
29
    jsonrpsee::{core::params::ArrayParams, rpc_params},
30
    sc_client_api::{StorageData, StorageProof},
31
    sc_rpc_api::state::ReadProof,
32
    sc_service::TaskManager,
33
    serde::de::DeserializeOwned,
34
    sp_core::{Decode, Encode},
35
    sp_state_machine::StorageValue,
36
    sp_storage::StorageKey,
37
    tokio::sync::{mpsc, oneshot},
38
    url::Url,
39
    ws_client::{JsonRpcRequest, WsClientRequest},
40
};
41

            
42
const LOG_TARGET: &str = "orchestrator-rpc-client";
43
const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
44

            
45
/// Format url and force addition of a port
46
fn url_to_string_with_port(url: Url) -> Option<String> {
47
    // This is already validated on CLI side, just defensive here
48
    if (url.scheme() != "ws" && url.scheme() != "wss") || url.host_str().is_none() {
49
        tracing::warn!(target: LOG_TARGET, ?url, "Non-WebSocket URL or missing host.");
50
        return None;
51
    }
52

            
53
    // Either we have a user-supplied port or use the default for 'ws' or 'wss' here
54
    Some(format!(
55
        "{}://{}:{}{}{}",
56
        url.scheme(),
57
        url.host_str()?,
58
        url.port_or_known_default()?,
59
        url.path(),
60
        url.query()
61
            .map(|query| format!("?{}", query))
62
            .unwrap_or_default()
63
    ))
64
}
65

            
66
pub async fn create_client_and_start_worker(
67
    urls: Vec<Url>,
68
    task_manager: &mut TaskManager,
69
    overseer_handle: Option<polkadot_overseer::Handle>,
70
) -> OrchestratorChainResult<OrchestratorChainRpcClient> {
71
    let urls: Vec<_> = urls
72
        .into_iter()
73
        .filter_map(url_to_string_with_port)
74
        .collect();
75
    let (worker, request_sender) = ws_client::ReconnectingWsClientWorker::new(urls)
76
        .await
77
        .map_err(|_| {
78
            OrchestratorChainError::GenericError(
79
                "Failed to connect to all provided Orchestrator chain RPC endpoints".to_string(),
80
            )
81
        })?;
82

            
83
    task_manager
84
        .spawn_essential_handle()
85
        .spawn("orchestrator-rpc-worker", None, worker.run());
86

            
87
    let client = OrchestratorChainRpcClient {
88
        request_sender,
89
        overseer_handle,
90
    };
91

            
92
    Ok(client)
93
}
94

            
95
#[derive(Clone)]
96
pub struct OrchestratorChainRpcClient {
97
    request_sender: mpsc::Sender<WsClientRequest>,
98
    overseer_handle: Option<polkadot_overseer::Handle>,
99
}
100

            
101
impl OrchestratorChainRpcClient {
102
    /// Call a call to `state_call` rpc method.
103
    pub async fn call_remote_runtime_function<R: Decode>(
104
        &self,
105
        method_name: &str,
106
        hash: PHash,
107
        payload: Option<impl Encode>,
108
    ) -> OrchestratorChainResult<R> {
109
        let payload_bytes =
110
            payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
111
        let params = rpc_params! {
112
            method_name,
113
            payload_bytes,
114
            hash
115
        };
116
        let res = self
117
            .request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
118
                tracing::debug!(
119
                    target: LOG_TARGET,
120
                    %method_name,
121
                    %hash,
122
                    error = %err,
123
                    "Error during call to 'state_call'.",
124
                );
125
            })
126
            .await?;
127
        Decode::decode(&mut &*res.0).map_err(Into::into)
128
    }
129

            
130
    async fn request<R>(&self, method: &str, params: ArrayParams) -> OrchestratorChainResult<R>
131
    where
132
        R: DeserializeOwned + std::fmt::Debug,
133
    {
134
        self.request_tracing(
135
            method,
136
            params,
137
            |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
138
        ).await
139
    }
140

            
141
    fn send_register_message(
142
        &self,
143
        message_builder: impl FnOnce(mpsc::Sender<dp_core::Header>) -> WsClientRequest,
144
    ) -> OrchestratorChainResult<mpsc::Receiver<dp_core::Header>> {
145
        let (tx, rx) = mpsc::channel(NOTIFICATION_CHANNEL_SIZE_LIMIT);
146
        self.request_sender
147
            .try_send(message_builder(tx))
148
            .map_err(|e| OrchestratorChainError::WorkerCommunicationError(e.to_string()))?;
149
        Ok(rx)
150
    }
151

            
152
    /// Send a request to the RPC worker and awaits for a response. The worker is responsible
153
    /// for retrying requests if connection dies.
154
    async fn request_tracing<R, OR>(
155
        &self,
156
        method: &str,
157
        params: ArrayParams,
158
        trace_error: OR,
159
    ) -> OrchestratorChainResult<R>
160
    where
161
        R: DeserializeOwned + std::fmt::Debug,
162
        OR: Fn(&OrchestratorChainError),
163
    {
164
        let (response_sender, response_receiver) = oneshot::channel();
165

            
166
        let request = WsClientRequest::JsonRpcRequest(JsonRpcRequest {
167
            method: method.into(),
168
            params,
169
            response_sender,
170
        });
171
        self.request_sender.send(request).await.map_err(|err| {
172
            OrchestratorChainError::WorkerCommunicationError(format!(
173
                "Unable to send message to RPC worker: {}",
174
                err
175
            ))
176
        })?;
177

            
178
        let response = response_receiver.await.map_err(|err| {
179
			OrchestratorChainError::WorkerCommunicationError(format!(
180
				"RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
181
				err
182
			))
183
		})??;
184

            
185
        serde_json::from_value(response).map_err(|_| {
186
            trace_error(&OrchestratorChainError::GenericError(
187
                "Unable to deserialize value".to_string(),
188
            ));
189
            OrchestratorChainError::RpcCallError(
190
                method.to_string(),
191
                "failed to decode returned value".to_string(),
192
            )
193
        })
194
    }
195

            
196
    /// Retrieve storage item at `storage_key`
197
    pub async fn state_get_storage(
198
        &self,
199
        storage_key: StorageKey,
200
        at: Option<PHash>,
201
    ) -> OrchestratorChainResult<Option<StorageData>> {
202
        let params = rpc_params![storage_key, at];
203
        self.request("state_getStorage", params).await
204
    }
205

            
206
    /// Get read proof for `storage_keys`
207
    pub async fn state_get_read_proof(
208
        &self,
209
        storage_keys: Vec<StorageKey>,
210
        at: Option<PHash>,
211
    ) -> OrchestratorChainResult<ReadProof<PHash>> {
212
        let params = rpc_params![storage_keys, at];
213
        self.request("state_getReadProof", params).await
214
    }
215
}
216

            
217
#[async_trait]
218
impl OrchestratorChainInterface for OrchestratorChainRpcClient {
219
    /// Fetch a storage item by key.
220
    async fn get_storage_by_key(
221
        &self,
222
        orchestrator_parent: PHash,
223
        key: &[u8],
224
    ) -> OrchestratorChainResult<Option<StorageValue>> {
225
        let storage_key = StorageKey(key.to_vec());
226
        self.state_get_storage(storage_key, Some(orchestrator_parent))
227
            .await
228
            .map(|storage_data| storage_data.map(|sv| sv.0))
229
    }
230

            
231
    /// Get a handle to the overseer.
232
    fn overseer_handle(&self) -> OrchestratorChainResult<polkadot_overseer::Handle> {
233
        self.overseer_handle
234
            .clone()
235
            .ok_or(OrchestratorChainError::GenericError(
236
                "OrchestratorChainRpcClient doesn't contain an Overseer Handle".to_string(),
237
            ))
238
    }
239

            
240
    /// Generate a storage read proof.
241
    async fn prove_read(
242
        &self,
243
        orchestrator_parent: PHash,
244
        relevant_keys: &Vec<Vec<u8>>,
245
    ) -> OrchestratorChainResult<StorageProof> {
246
        let mut cloned = Vec::new();
247
        cloned.extend_from_slice(relevant_keys);
248
        let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
249

            
250
        self.state_get_read_proof(storage_keys, Some(orchestrator_parent))
251
            .await
252
            .map(|read_proof| {
253
                StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
254
            })
255
    }
256

            
257
    /// Get a stream of import block notifications.
258
    async fn import_notification_stream(
259
        &self,
260
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
261
        let rx = self.send_register_message(WsClientRequest::RegisterImportListener)?;
262
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
263
        Ok(stream.boxed())
264
    }
265

            
266
    /// Get a stream of new best block notifications.
267
    async fn new_best_notification_stream(
268
        &self,
269
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
270
        let rx = self.send_register_message(WsClientRequest::RegisterBestHeadListener)?;
271
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
272
        Ok(stream.boxed())
273
    }
274

            
275
    /// Get a stream of finality notifications.
276
    async fn finality_notification_stream(
277
        &self,
278
    ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
279
        let rx = self.send_register_message(WsClientRequest::RegisterFinalizationListener)?;
280
        let stream = tokio_stream::wrappers::ReceiverStream::new(rx);
281
        Ok(stream.boxed())
282
    }
283

            
284
    async fn genesis_data(
285
        &self,
286
        orchestrator_parent: PHash,
287
        para_id: ParaId,
288
    ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
289
        self.call_remote_runtime_function(
290
            "RegistrarApi_genesis_data",
291
            orchestrator_parent,
292
            Some(para_id),
293
        )
294
        .await
295
    }
296

            
297
    async fn boot_nodes(
298
        &self,
299
        orchestrator_parent: PHash,
300
        para_id: ParaId,
301
    ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
302
        self.call_remote_runtime_function(
303
            "RegistrarApi_boot_nodes",
304
            orchestrator_parent,
305
            Some(para_id),
306
        )
307
        .await
308
    }
309

            
310
    async fn latest_block_number(
311
        &self,
312
        orchestrator_parent: PHash,
313
        para_id: ParaId,
314
    ) -> OrchestratorChainResult<Option<BlockNumber>> {
315
        self.call_remote_runtime_function(
316
            "AuthorNotingApi_latest_block_number",
317
            orchestrator_parent,
318
            Some(para_id),
319
        )
320
        .await
321
    }
322

            
323
    async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
324
        self.request("chain_getHead", rpc_params![]).await
325
    }
326

            
327
    async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
328
        self.request("chain_getFinalizedHead", rpc_params![]).await
329
    }
330

            
331
    async fn data_preserver_active_assignment(
332
        &self,
333
        orchestrator_parent: PHash,
334
        profile_id: DataPreserverProfileId,
335
    ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
336
        self.call_remote_runtime_function(
337
            "DataPreserversApi_get_active_assignment",
338
            orchestrator_parent,
339
            Some(profile_id),
340
        )
341
        .await
342
    }
343

            
344
    async fn check_para_id_assignment(
345
        &self,
346
        orchestrator_parent: PHash,
347
        authority: NimbusId,
348
    ) -> OrchestratorChainResult<Option<ParaId>> {
349
        self.call_remote_runtime_function(
350
            "TanssiAuthorityAssignmentApi_check_para_id_assignment",
351
            orchestrator_parent,
352
            Some(authority),
353
        )
354
        .await
355
    }
356

            
357
    async fn check_para_id_assignment_next_session(
358
        &self,
359
        orchestrator_parent: PHash,
360
        authority: NimbusId,
361
    ) -> OrchestratorChainResult<Option<ParaId>> {
362
        self.call_remote_runtime_function(
363
            "TanssiAuthorityAssignmentApi_check_para_id_assignment_next_session",
364
            orchestrator_parent,
365
            Some(authority),
366
        )
367
        .await
368
    }
369
}