1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
mod cli;
18
pub mod watch_assignment;
19

            
20
use {
21
    cumulus_client_cli::CollatorOptions,
22
    cumulus_primitives_core::ParaId,
23
    cumulus_relay_chain_interface::RelayChainInterface,
24
    dc_orchestrator_chain_interface::OrchestratorChainInterface,
25
    sc_cli::SubstrateCli,
26
    sc_service::{Configuration, KeystoreContainer, TaskManager},
27
    sc_telemetry::TelemetryWorker,
28
    std::{marker::PhantomData, sync::Arc},
29
    tc_service_container_chain_spawner::{
30
        cli::{ContainerChainCli, ContainerChainRunCmd},
31
        rpc::generate_rpc_builder::GenerateRpcBuilder,
32
        service::MinimalContainerRuntimeApi,
33
        spawner::{ContainerChainSpawnParams, ContainerChainSpawner},
34
    },
35
    tc_service_orchestrator_chain::solochain::{
36
        build_relay_chain_interface_solochain, EnableContainerChainSpawner,
37
    },
38
    url::Url,
39
};
40

            
41
/// Watches for an assignment and provide data preservers services for assigned chain.
42
#[derive(Clone, Debug, clap::Parser)]
43
pub struct DataPreserverCmd {
44
    /// Arguments to run a container chain node.
45
    #[command(flatten)]
46
    pub container_run: ContainerChainRunCmd,
47

            
48
    /// Profile id associated with the node, whose assignements will be followed to provide RPC services.
49
    #[arg(long)]
50
    pub profile_id: u64,
51

            
52
    /// Endpoints to connect to orchestrator nodes, avoiding to start a local orchestrator node.
53
    /// If this list is empty, a local embeded orchestrator node is started.
54
    #[arg(long)]
55
    pub orchestrator_endpoints: Vec<Url>,
56

            
57
    /// If running an embeded node, will run it as a solochain orchestrator.
58
    /// If not present, will run it as a parachain orchestrator.
59
    #[arg(long)]
60
    pub solochain: bool,
61

            
62
    /// Either:
63
    /// - `relay chain args`
64
    /// - `orchestrator chain args -- relay chain args`
65
    #[arg(raw = true)]
66
    pub extra_args: Vec<String>,
67
}
68

            
69
impl DataPreserverCmd {
70
    fn split_extra_args_at_first_dashdash(&self) -> (&[String], &[String]) {
71
        let index_of_dashdash = self.extra_args.iter().position(|x| *x == "--");
72

            
73
        if let Some(i) = index_of_dashdash {
74
            let (orchestrator_chain_args, extra_extra) = self.extra_args.split_at(i);
75
            (&extra_extra[1..], orchestrator_chain_args)
76
        } else {
77
            // Only relay chain args
78
            (&self.extra_args, &[])
79
        }
80
    }
81

            
82
    pub fn relaychain_args(&self) -> &[String] {
83
        let (relay_chain_args, _) = self.split_extra_args_at_first_dashdash();
84

            
85
        relay_chain_args
86
    }
87

            
88
    pub fn orchestrator_chain_args(&self) -> &[String] {
89
        let (_, orchestrator_chain_args) = self.split_extra_args_at_first_dashdash();
90

            
91
        orchestrator_chain_args
92
    }
93
}
94

            
95
pub struct DataPreserverMode<PolkaCli, GRB, RuntimeApi, DVC> {
96
    /// General configuration made from container chain arguments
97
    pub config: Configuration,
98
    pub provider_profile_id: u64,
99

            
100
    /// Run arguments specific for the orchestrator.
101
    /// If `None` then the orchestrator is a solochain, it is also the relay chain.
102
    pub orchestrator_cli: Option<cumulus_client_cli::RunCmd>,
103

            
104
    /// List of URLs to connect to remote orchestrator nodes. If empty, starts an
105
    /// embeded orchestrator node.
106
    pub orchestrator_endpoints: Vec<Url>,
107

            
108
    pub collator_options: CollatorOptions,
109

            
110
    /// Run arguments for the relaychain. Will also be the orchestrator chain if
111
    /// `orchestrator_cli` is `None`.
112
    pub polkadot_cli: PolkaCli,
113

            
114
    /// Run arguments for container chains.
115
    pub container_chain_cli: ContainerChainCli,
116

            
117
    /// Generator for the RPC builder, customizable to implement custom RPCs.
118
    pub generate_rpc_builder: GRB,
119

            
120
    pub phantom: PhantomData<(RuntimeApi, DVC)>,
121
}
122

            
123
impl<PolkaCli, GRB, RuntimeApi, DVC> DataPreserverMode<PolkaCli, GRB, RuntimeApi, DVC>
124
where
125
    DVC: sc_cli::DefaultConfigurationValues,
126
    PolkaCli: sc_cli::CliConfiguration<DVC> + sc_cli::SubstrateCli,
127
    RuntimeApi: MinimalContainerRuntimeApi,
128
    GRB: GenerateRpcBuilder<RuntimeApi> + 'static,
129
{
130
    pub async fn run(self) -> sc_cli::Result<TaskManager> {
131
        let mut task_manager;
132
        let orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>;
133
        let relay_chain_interface: Arc<dyn RelayChainInterface>;
134
        let keystore;
135
        let start_bootnode_params;
136

            
137
        if self.orchestrator_endpoints.is_empty() {
138
            // Embeded node
139

            
140
            if let Some(orchestrator_cli) = self.orchestrator_cli {
141
                log::info!("Starting embeded orchestrator parachain node ...");
142

            
143
                let orchestrator_cli = cli::EmbededParachainOrchestratorCli(orchestrator_cli);
144

            
145
                let tokio_handle = self.config.tokio_handle.clone();
146
                let orchestrator_config = SubstrateCli::create_configuration(
147
                    &orchestrator_cli,
148
                    &orchestrator_cli,
149
                    tokio_handle.clone(),
150
                )
151
                .map_err(|err| format!("Orchestrator chain argument error: {}", err))?;
152

            
153
                let polkadot_config = SubstrateCli::create_configuration(
154
                    &self.polkadot_cli,
155
                    &self.polkadot_cli,
156
                    tokio_handle,
157
                )
158
                .map_err(|err| format!("Relay chain argument error: {}", err))?;
159

            
160
                let para_id =
161
                    node_common::chain_spec::Extensions::try_get(&*orchestrator_config.chain_spec)
162
                        .map(|e| e.para_id)
163
                        .ok_or("Could not find parachain ID in chain-spec.")?;
164

            
165
                let para_id = ParaId::from(para_id);
166

            
167
                let started = match self.config.network.network_backend {
168
                    sc_network::config::NetworkBackendType::Libp2p => {
169
                        tc_service_orchestrator_chain::parachain::start_parachain_node::<
170
                            sc_network::NetworkWorker<_, _>,
171
                        >(
172
                            orchestrator_config,
173
                            polkadot_config,
174
                            None, // container_config, we don't use it as we manage that ourselfves.
175
                            self.collator_options,
176
                            para_id,
177
                            None, // no hwbench,
178
                            None, // no max_pov_percentage
179
                        )
180
                        .await?
181
                    }
182
                    sc_network::config::NetworkBackendType::Litep2p => {
183
                        tc_service_orchestrator_chain::parachain::start_parachain_node::<
184
                            sc_network::Litep2pNetworkBackend,
185
                        >(
186
                            orchestrator_config,
187
                            polkadot_config,
188
                            None, // container_config, we don't use it as we manage that ourselfves.
189
                            self.collator_options,
190
                            para_id,
191
                            None, // no hwbench,
192
                            None, // no max_pov_percentage
193
                        )
194
                        .await?
195
                    }
196
                };
197

            
198
                task_manager = started.task_manager;
199
                relay_chain_interface = started.relay_chain_interface;
200
                orchestrator_chain_interface = started.orchestrator_chain_interface;
201
                keystore = started.keystore;
202
                start_bootnode_params = started.start_bootnode_params;
203
            } else {
204
                log::info!("Starting embeded orchestrator solochain node ...");
205

            
206
                let tokio_handle = self.config.tokio_handle.clone();
207

            
208
                let polkadot_config = SubstrateCli::create_configuration(
209
                    &self.polkadot_cli,
210
                    &self.polkadot_cli,
211
                    tokio_handle,
212
                )
213
                .map_err(|err| format!("Relay chain argument error: {}", err))?;
214

            
215
                let started = tc_service_orchestrator_chain::solochain::start_solochain_node(
216
                    polkadot_config,
217
                    self.container_chain_cli.clone(),
218
                    self.collator_options,
219
                    None,                            // no hwbench
220
                    EnableContainerChainSpawner::No, // we manage this ourselves
221
                )
222
                .await?;
223

            
224
                task_manager = started.task_manager;
225
                relay_chain_interface = started.relay_chain_interface;
226
                orchestrator_chain_interface = started.orchestrator_chain_interface;
227
                keystore = started.keystore;
228
                start_bootnode_params = started.start_bootnode_params;
229
            }
230
        } else {
231
            log::info!("Connecting to remote orchestrator node(s) ...");
232

            
233
            task_manager = TaskManager::new(self.config.tokio_handle.clone(), None)
234
                .map_err(|e| sc_cli::Error::Application(Box::new(e)))?;
235

            
236
            // Orchestrator
237
            orchestrator_chain_interface =
238
                tc_orchestrator_chain_interface_through_rpc::create_client_and_start_worker(
239
                    self.orchestrator_endpoints.clone(),
240
                    &mut task_manager,
241
                    None,
242
                )
243
                .await
244
                .map(Arc::new)
245
                .map_err(|e| sc_cli::Error::Application(Box::new(e)))?;
246

            
247
            // Relay
248
            let collator_options = self.collator_options;
249

            
250
            let tokio_handle = self.config.tokio_handle.clone();
251
            let polkadot_config = sc_cli::SubstrateCli::create_configuration(
252
                &self.polkadot_cli,
253
                &self.polkadot_cli,
254
                tokio_handle,
255
            )
256
            .map_err(|err| format!("Relay chain argument error: {}", err))?;
257

            
258
            let telemetry = self
259
                .config
260
                .telemetry_endpoints
261
                .clone()
262
                .filter(|x| !x.is_empty())
263
                .map(|endpoints| -> std::result::Result<_, sc_telemetry::Error> {
264
                    let worker = TelemetryWorker::new(16)?;
265
                    let telemetry = worker.handle().new_telemetry(endpoints);
266
                    Ok((worker, telemetry))
267
                })
268
                .transpose()
269
                .map_err(sc_service::Error::Telemetry)?;
270

            
271
            let telemetry_worker_handle = telemetry.as_ref().map(|(worker, _)| worker.handle());
272

            
273
            // This is not solochain mode but this function is just to return the start_bootnode_params
274
            let relay_chain_interface_parts = build_relay_chain_interface_solochain(
275
                &self.config,
276
                polkadot_config,
277
                collator_options,
278
                telemetry_worker_handle,
279
                &mut task_manager,
280
                None,
281
            )
282
            .await
283
            .map_err(|e| sc_service::Error::Application(Box::new(e) as Box<_>))?;
284
            relay_chain_interface = relay_chain_interface_parts.0;
285

            
286
            let keystore_container = KeystoreContainer::new(&self.config.keystore)?;
287
            keystore = keystore_container.keystore();
288
            start_bootnode_params = relay_chain_interface_parts.2;
289
        }
290

            
291
        log::info!("Starting container chain spawner and assignment watcher ...");
292

            
293
        let relay_chain = self
294
            .polkadot_cli
295
            .chain_id(false)
296
            .expect("to get relay chain id");
297

            
298
        let container_chain_spawner = ContainerChainSpawner {
299
            params: ContainerChainSpawnParams {
300
                orchestrator_chain_interface,
301
                container_chain_cli: self.container_chain_cli.clone(),
302
                tokio_handle: self.config.tokio_handle.clone(),
303
                chain_type: self.config.chain_spec.chain_type(),
304
                relay_chain,
305
                relay_chain_interface,
306
                sync_keystore: keystore,
307
                collation_params: None,
308
                spawn_handle: task_manager.spawn_handle().clone(),
309
                generate_rpc_builder: self.generate_rpc_builder,
310
                override_sync_mode: None,
311
                start_bootnode_params,
312
                data_preserver: true,
313
                phantom: PhantomData,
314
            },
315
            state: Default::default(),
316
            // db cleanup task disabled here because it uses collator assignment to decide
317
            // which folders to keep and this is not a collator, this is an rpc node
318
            db_folder_cleanup_done: true,
319
            collate_on_tanssi: Arc::new(|| {
320
                panic!("Called collate_on_tanssi outside of Tanssi node")
321
            }),
322
            collation_cancellation_constructs: None,
323
        };
324
        let state = container_chain_spawner.state.clone();
325

            
326
        task_manager.spawn_essential_handle().spawn(
327
            "container-chain-assignment-watcher",
328
            None,
329
            crate::watch_assignment::task_watch_assignment(
330
                container_chain_spawner,
331
                self.provider_profile_id,
332
            ),
333
        );
334

            
335
        task_manager.spawn_essential_handle().spawn(
336
            "container-chain-spawner-debug-state",
337
            None,
338
            tc_service_container_chain_spawner::monitor::monitor_task(state),
339
        );
340

            
341
        Ok(task_manager)
342
    }
343
}