1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
mod cli;
18
pub mod watch_assignment;
19

            
20
use {
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_client_service::build_relay_chain_interface,
23
    cumulus_primitives_core::ParaId,
24
    cumulus_relay_chain_interface::RelayChainInterface,
25
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
26
    sc_cli::SubstrateCli,
27
    sc_service::{Configuration, KeystoreContainer, TaskManager},
28
    sc_telemetry::TelemetryWorker,
29
    std::{marker::PhantomData, sync::Arc},
30
    tc_service_container_chain_spawner::{
31
        cli::{ContainerChainCli, ContainerChainRunCmd},
32
        rpc::generate_rpc_builder::GenerateRpcBuilder,
33
        service::MinimalContainerRuntimeApi,
34
        spawner::{ContainerChainSpawnParams, ContainerChainSpawner},
35
    },
36
    tc_service_orchestrator_chain::solochain::EnableContainerChainSpawner,
37
    url::Url,
38
};
39

            
40
/// Watches for an assignment and provide data preservers services for assigned chain.
41
#[derive(Clone, Debug, clap::Parser)]
42
pub struct DataPreserverCmd {
43
    /// Arguments to run a container chain node.
44
    #[command(flatten)]
45
    pub container_run: ContainerChainRunCmd,
46

            
47
    /// Profile id associated with the node, whose assignements will be followed to provide RPC services.
48
    #[arg(long)]
49
    pub profile_id: u64,
50

            
51
    /// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node.
52
    /// If this list is empty, a local embeded orchestrator node is started.
53
    #[arg(long)]
54
    pub orchestrator_endpoints: Vec<Url>,
55

            
56
    /// If running an embeded node, will run it as a solochain orchestrator.
57
    /// If not present, will run it as a parachain orchestrator.
58
    #[arg(long)]
59
    pub solochain: bool,
60

            
61
    /// Either:
62
    /// - `relay chain args`
63
    /// - `orchestrator chain args -- relay chain args`
64
    #[arg(raw = true)]
65
    pub extra_args: Vec<String>,
66
}
67

            
68
impl DataPreserverCmd {
69
    fn split_extra_args_at_first_dashdash(&self) -> (&[String], &[String]) {
70
        let index_of_dashdash = self.extra_args.iter().position(|x| *x == "--");
71

            
72
        if let Some(i) = index_of_dashdash {
73
            let (orchestrator_chain_args, extra_extra) = self.extra_args.split_at(i);
74
            (&extra_extra[1..], orchestrator_chain_args)
75
        } else {
76
            // Only relay chain args
77
            (&self.extra_args, &[])
78
        }
79
    }
80

            
81
    pub fn relaychain_args(&self) -> &[String] {
82
        let (relay_chain_args, _) = self.split_extra_args_at_first_dashdash();
83

            
84
        relay_chain_args
85
    }
86

            
87
    pub fn orchestrator_chain_args(&self) -> &[String] {
88
        let (_, orchestrator_chain_args) = self.split_extra_args_at_first_dashdash();
89

            
90
        orchestrator_chain_args
91
    }
92
}
93

            
94
pub struct DataPreserverMode<PolkaCli, GRB, RuntimeApi, DVC> {
95
    /// General configuration made from container chain arguments
96
    pub config: Configuration,
97
    pub provider_profile_id: u64,
98

            
99
    /// Run arguments specific for the orchestrator.
100
    /// If `None` then the orchestrator is a solochain, it is also the relay chain.
101
    pub orchestrator_cli: Option<cumulus_client_cli::RunCmd>,
102

            
103
    /// List of URLs to connect to remote orchestrator nodes. If empty, starts an
104
    /// embeded orchestrator node.
105
    pub orchestrator_endpoints: Vec<Url>,
106

            
107
    pub collator_options: CollatorOptions,
108

            
109
    /// Run arguments for the relaychain. Will also be the orchestrator chain if
110
    /// `orchestrator_cli` is `None`.
111
    pub polkadot_cli: PolkaCli,
112

            
113
    /// Run arguments for container chains.
114
    pub container_chain_cli: ContainerChainCli,
115

            
116
    /// Generator for the RPC builder, customizable to implement custom RPCs.
117
    pub generate_rpc_builder: GRB,
118

            
119
    pub phantom: PhantomData<(RuntimeApi, DVC)>,
120
}
121

            
122
impl<PolkaCli, GRB, RuntimeApi, DVC> DataPreserverMode<PolkaCli, GRB, RuntimeApi, DVC>
123
where
124
    DVC: sc_cli::DefaultConfigurationValues,
125
    PolkaCli: sc_cli::CliConfiguration<DVC> + sc_cli::SubstrateCli,
126
    RuntimeApi: MinimalContainerRuntimeApi,
127
    GRB: GenerateRpcBuilder<RuntimeApi> + 'static,
128
{
129
    pub async fn run(self) -> sc_cli::Result<TaskManager> {
130
        let mut task_manager;
131
        let orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>;
132
        let relay_chain_interface: Arc<dyn RelayChainInterface>;
133
        let keystore;
134

            
135
        if self.orchestrator_endpoints.is_empty() {
136
            // Embeded node
137

            
138
            if let Some(orchestrator_cli) = self.orchestrator_cli {
139
                log::info!("Starting embeded orchestrator parachain node ...");
140

            
141
                let orchestrator_cli = cli::EmbededParachainOrchestratorCli(orchestrator_cli);
142

            
143
                let tokio_handle = self.config.tokio_handle.clone();
144
                let orchestrator_config = SubstrateCli::create_configuration(
145
                    &orchestrator_cli,
146
                    &orchestrator_cli,
147
                    tokio_handle.clone(),
148
                )
149
                .map_err(|err| format!("Orchestrator chain argument error: {}", err))?;
150

            
151
                let polkadot_config = SubstrateCli::create_configuration(
152
                    &self.polkadot_cli,
153
                    &self.polkadot_cli,
154
                    tokio_handle,
155
                )
156
                .map_err(|err| format!("Relay chain argument error: {}", err))?;
157

            
158
                let para_id =
159
                    node_common::chain_spec::Extensions::try_get(&*orchestrator_config.chain_spec)
160
                        .map(|e| e.para_id)
161
                        .ok_or("Could not find parachain ID in chain-spec.")?;
162

            
163
                let para_id = ParaId::from(para_id);
164

            
165
                let started = match self.config.network.network_backend {
166
                    sc_network::config::NetworkBackendType::Libp2p => {
167
                        tc_service_orchestrator_chain::parachain::start_parachain_node::<
168
                            sc_network::NetworkWorker<_, _>,
169
                        >(
170
                            orchestrator_config,
171
                            polkadot_config,
172
                            None, // container_config, we don't use it as we manage that ourselfves.
173
                            self.collator_options,
174
                            para_id,
175
                            None, // no hwbench,
176
                            None, // no max_pov_percentage
177
                        )
178
                        .await?
179
                    }
180
                    sc_network::config::NetworkBackendType::Litep2p => {
181
                        tc_service_orchestrator_chain::parachain::start_parachain_node::<
182
                            sc_network::Litep2pNetworkBackend,
183
                        >(
184
                            orchestrator_config,
185
                            polkadot_config,
186
                            None, // container_config, we don't use it as we manage that ourselfves.
187
                            self.collator_options,
188
                            para_id,
189
                            None, // no hwbench,
190
                            None, // no max_pov_percentage
191
                        )
192
                        .await?
193
                    }
194
                };
195

            
196
                task_manager = started.task_manager;
197
                relay_chain_interface = started.relay_chain_interface;
198
                orchestrator_chain_interface = started.orchestrator_chain_interface;
199
                keystore = started.keystore;
200
            } else {
201
                log::info!("Starting embeded orchestrator solochain node ...");
202

            
203
                let tokio_handle = self.config.tokio_handle.clone();
204

            
205
                let polkadot_config = SubstrateCli::create_configuration(
206
                    &self.polkadot_cli,
207
                    &self.polkadot_cli,
208
                    tokio_handle,
209
                )
210
                .map_err(|err| format!("Relay chain argument error: {}", err))?;
211

            
212
                let started = tc_service_orchestrator_chain::solochain::start_solochain_node(
213
                    polkadot_config,
214
                    self.container_chain_cli.clone(),
215
                    self.collator_options,
216
                    None,                            // no hwbench
217
                    EnableContainerChainSpawner::No, // we manage this ourselves
218
                )
219
                .await?;
220

            
221
                task_manager = started.task_manager;
222
                relay_chain_interface = started.relay_chain_interface;
223
                orchestrator_chain_interface = started.orchestrator_chain_interface;
224
                keystore = started.keystore;
225
            }
226
        } else {
227
            log::info!("Connecting to remote orchestrator node(s) ...");
228

            
229
            task_manager = TaskManager::new(self.config.tokio_handle.clone(), None)
230
                .map_err(|e| sc_cli::Error::Application(Box::new(e)))?;
231

            
232
            // Orchestrator
233
            orchestrator_chain_interface =
234
                tc_orchestrator_chain_interface_through_rpc::create_client_and_start_worker(
235
                    self.orchestrator_endpoints.clone(),
236
                    &mut task_manager,
237
                    None,
238
                )
239
                .await
240
                .map(Arc::new)
241
                .map_err(|e| sc_cli::Error::Application(Box::new(e)))?;
242

            
243
            // Relay
244
            let collator_options = self.collator_options;
245

            
246
            let tokio_handle = self.config.tokio_handle.clone();
247
            let polkadot_config = sc_cli::SubstrateCli::create_configuration(
248
                &self.polkadot_cli,
249
                &self.polkadot_cli,
250
                tokio_handle,
251
            )
252
            .map_err(|err| format!("Relay chain argument error: {}", err))?;
253

            
254
            let telemetry = self
255
                .config
256
                .telemetry_endpoints
257
                .clone()
258
                .filter(|x| !x.is_empty())
259
                .map(|endpoints| -> std::result::Result<_, sc_telemetry::Error> {
260
                    let worker = TelemetryWorker::new(16)?;
261
                    let telemetry = worker.handle().new_telemetry(endpoints);
262
                    Ok((worker, telemetry))
263
                })
264
                .transpose()
265
                .map_err(sc_service::Error::Telemetry)?;
266

            
267
            let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
268

            
269
            relay_chain_interface = build_relay_chain_interface(
270
                polkadot_config,
271
                &self.config,
272
                telemetry_worker_handle,
273
                &mut task_manager,
274
                collator_options,
275
                None,
276
            )
277
            .await
278
            .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?
279
            .0;
280

            
281
            let keystore_container = KeystoreContainer::new(&self.config.keystore)?;
282
            keystore = keystore_container.keystore();
283
        }
284

            
285
        log::info!("Starting container chain spawner and assignment watcher ...");
286

            
287
        let relay_chain = self
288
            .polkadot_cli
289
            .chain_id(false)
290
            .expect("to get relay chain id");
291

            
292
        let container_chain_spawner = ContainerChainSpawner {
293
            params: ContainerChainSpawnParams {
294
                orchestrator_chain_interface,
295
                container_chain_cli: self.container_chain_cli.clone(),
296
                tokio_handle: self.config.tokio_handle.clone(),
297
                chain_type: self.config.chain_spec.chain_type(),
298
                relay_chain,
299
                relay_chain_interface,
300
                sync_keystore: keystore,
301
                collation_params: None,
302
                spawn_handle: task_manager.spawn_handle().clone(),
303
                data_preserver: true,
304
                generate_rpc_builder: self.generate_rpc_builder,
305
                override_sync_mode: None,
306
                phantom: PhantomData,
307
            },
308
            state: Default::default(),
309
            // db cleanup task disabled here because it uses collator assignment to decide
310
            // which folders to keep and this is not a collator, this is an rpc node
311
            db_folder_cleanup_done: true,
312
            collate_on_tanssi: Arc::new(|| {
313
                panic!("Called collate_on_tanssi outside of Tanssi node")
314
            }),
315
            collation_cancellation_constructs: None,
316
        };
317
        let state = container_chain_spawner.state.clone();
318

            
319
        task_manager.spawn_essential_handle().spawn(
320
            "container-chain-assignment-watcher",
321
            None,
322
            crate::watch_assignment::task_watch_assignment(
323
                container_chain_spawner,
324
                self.provider_profile_id,
325
            ),
326
        );
327

            
328
        task_manager.spawn_essential_handle().spawn(
329
            "container-chain-spawner-debug-state",
330
            None,
331
            tc_service_container_chain_spawner::monitor::monitor_task(state),
332
        );
333

            
334
        Ok(task_manager)
335
    }
336
}