1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! Container Chain Spawner
18
//!
19
//! Controls the starting and stopping of container chains.
20
//!
21
//! For more information about when the database is deleted, check the
22
//! [Keep db flowchart](https://raw.githubusercontent.com/moondance-labs/tanssi/master/docs/keep_db_flowchart.png)
23

            
24
use node_common::service::node_builder::StartBootnodeParams;
25
use {
26
    crate::{
27
        cli::ContainerChainCli,
28
        monitor::{SpawnedContainer, SpawnedContainersMonitor},
29
        rpc::generate_rpc_builder::GenerateRpcBuilder,
30
        service::{
31
            start_node_impl_container, ContainerChainClient, MinimalContainerRuntimeApi,
32
            ParachainClient,
33
        },
34
    },
35
    cumulus_primitives_core::ParaId,
36
    cumulus_relay_chain_interface::RelayChainInterface,
37
    dancebox_runtime::{opaque::Block as OpaqueBlock, Block},
38
    dc_orchestrator_chain_interface::{OrchestratorChainInterface, PHash},
39
    frame_support::{CloneNoBound, DefaultNoBound},
40
    fs2::FileExt,
41
    futures::FutureExt,
42
    node_common::command::generate_genesis_block,
43
    polkadot_primitives::CollatorPair,
44
    sc_cli::{Database, SyncMode},
45
    sc_network::config::MultiaddrWithPeerId,
46
    sc_service::SpawnTaskHandle,
47
    sc_transaction_pool::TransactionPoolHandle,
48
    sp_core::H256,
49
    sp_keystore::KeystorePtr,
50
    sp_runtime::traits::Block as BlockT,
51
    std::{
52
        any::Any,
53
        collections::{HashMap, HashSet},
54
        marker::PhantomData,
55
        path::{Path, PathBuf},
56
        sync::{Arc, Mutex},
57
        time::Instant,
58
    },
59
    tokio::{
60
        sync::{mpsc, oneshot},
61
        time::{sleep, Duration},
62
    },
63
    tokio_util::sync::CancellationToken,
64
};
65

            
66
/// Timeout to wait for the database to close before starting it again, used in `wait_for_paritydb_lock`.
67
/// This is the max timeout, if the db is closed in 1 second then that function will only wait 1 second.
68
const MAX_DB_RESTART_TIMEOUT: Duration = Duration::from_secs(60);
69

            
70
/// Block diff threshold above which we decide it will be faster to delete the database and
71
/// use warp sync, rather than using full sync to download a large number of blocks.
72
/// This is only needed because warp sync does not support syncing from a state that is not
73
/// genesis, it falls back to full sync in that case.
74
/// 30_000 blocks = 50 hours at 6s/block.
75
/// Assuming a syncing speed of 100 blocks per second, this will take 5 minutes to sync.
76
const MAX_BLOCK_DIFF_FOR_FULL_SYNC: u32 = 30_000;
77

            
78
/// Task that handles spawning a stopping container chains based on assignment.
79
/// The main loop is [rx_loop](ContainerChainSpawner::rx_loop).
80
pub struct ContainerChainSpawner<
81
    RuntimeApi: MinimalContainerRuntimeApi,
82
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
83
> {
84
    /// Start container chain params
85
    pub params: ContainerChainSpawnParams<RuntimeApi, TGenerateRpcBuilder>,
86

            
87
    /// State
88
    pub state: Arc<Mutex<ContainerChainSpawnerState>>,
89

            
90
    /// Before the first assignment, there is a db cleanup process that removes folders of container
91
    /// chains that we are no longer assigned to.
92
    pub db_folder_cleanup_done: bool,
93

            
94
    /// Async callback that enables collation on the orchestrator chain
95
    pub collate_on_tanssi:
96
        Arc<dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync>,
97
    /// Stores the cancellation token used to stop the orchestrator chain collator process.
98
    /// When this is None, the orchestrator collator is not running.
99
    pub collation_cancellation_constructs:
100
        Option<(CancellationToken, futures::channel::oneshot::Receiver<()>)>,
101
}
102

            
103
/// Struct with all the params needed to start a container chain node given the CLI arguments,
104
/// and creating the ChainSpec from on-chain data from the orchestrator chain.
105
/// These params must be the same for all container chains, params that change such as the
106
/// `container_chain_para_id` should be passed as separate arguments to the [try_spawn] function.
107
///
108
/// This struct MUST NOT contain types (outside of `Option<CollationParams>`) obtained through
109
/// running an embeded orchestrator node, as this will prevent spawning a container chain in a node
110
/// connected to an orchestrator node through WebSocket.
111
#[derive(CloneNoBound)]
112
pub struct ContainerChainSpawnParams<
113
    RuntimeApi: MinimalContainerRuntimeApi,
114
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
115
> {
116
    pub orchestrator_chain_interface: Arc<dyn OrchestratorChainInterface>,
117
    pub container_chain_cli: ContainerChainCli,
118
    pub tokio_handle: tokio::runtime::Handle,
119
    pub chain_type: sc_chain_spec::ChainType,
120
    pub relay_chain: String,
121
    pub relay_chain_interface: Arc<dyn RelayChainInterface>,
122
    pub sync_keystore: KeystorePtr,
123
    pub spawn_handle: SpawnTaskHandle,
124
    pub collation_params: Option<CollationParams>,
125
    pub data_preserver: bool,
126
    pub generate_rpc_builder: TGenerateRpcBuilder,
127
    pub override_sync_mode: Option<SyncMode>,
128
    pub start_bootnode_params: StartBootnodeParams,
129

            
130
    pub phantom: PhantomData<RuntimeApi>,
131
}
132

            
133
/// Params specific to collation. This struct can contain types obtained through running an
134
/// embeded orchestrator node.
135
#[derive(Clone)]
136
pub struct CollationParams {
137
    pub collator_key: CollatorPair,
138
    pub orchestrator_tx_pool: Option<Arc<TransactionPoolHandle<OpaqueBlock, ParachainClient>>>,
139
    pub orchestrator_client: Option<Arc<ParachainClient>>,
140
    pub orchestrator_para_id: ParaId,
141
    /// If this is `false`, then `orchestrator_tx_pool` and `orchestrator_client` must be `Some`.
142
    pub solochain: bool,
143
}
144

            
145
/// Mutable state for container chain spawner. Keeps track of running chains.
146
#[derive(DefaultNoBound)]
147
pub struct ContainerChainSpawnerState {
148
    spawned_container_chains: HashMap<ParaId, ContainerChainState>,
149
    assigned_para_id: Option<ParaId>,
150
    next_assigned_para_id: Option<ParaId>,
151
    failed_para_ids: HashSet<ParaId>,
152
    // For debugging and detecting errors
153
    pub spawned_containers_monitor: SpawnedContainersMonitor,
154
}
155

            
156
pub struct ContainerChainState {
157
    /// Handle that can be used to stop the container chain
158
    stop_handle: StopContainerChain,
159
    /// Database path
160
    db_path: PathBuf,
161
}
162

            
163
/// Stops a container chain when signal is sent. The bool means `keep_db`, whether to keep the
164
/// container chain database (true) or remove it (false).
165
pub struct StopContainerChain {
166
    signal: oneshot::Sender<bool>,
167
    id: usize,
168
}
169

            
170
/// Messages used to control the `ContainerChainSpawner`. This is needed because one of the fields
171
/// of `ContainerChainSpawner` is not `Sync`, so we cannot simply pass an
172
/// `Arc<ContainerChainSpawner>` to other threads.
173
#[derive(Debug)]
174
pub enum CcSpawnMsg {
175
    /// Update container chain assignment
176
    UpdateAssignment {
177
        current: Option<ParaId>,
178
        next: Option<ParaId>,
179
    },
180
}
181

            
182
// Separate function to allow using `?` to return a result, and also to avoid using `self` in an
183
// async function. Mutable state should be written by locking `state`.
184
// TODO: `state` should be an async mutex
185
async fn try_spawn<
186
    RuntimeApi: MinimalContainerRuntimeApi,
187
    TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
188
>(
189
    try_spawn_params: ContainerChainSpawnParams<RuntimeApi, TGenerateRpcBuilder>,
190
    state: Arc<Mutex<ContainerChainSpawnerState>>,
191
    container_chain_para_id: ParaId,
192
    start_collation: bool,
193
) -> sc_service::error::Result<()> {
194
    let ContainerChainSpawnParams {
195
        orchestrator_chain_interface,
196
        mut container_chain_cli,
197
        tokio_handle,
198
        chain_type,
199
        relay_chain,
200
        relay_chain_interface,
201
        sync_keystore,
202
        spawn_handle,
203
        mut collation_params,
204
        data_preserver,
205
        generate_rpc_builder,
206
        override_sync_mode,
207
        start_bootnode_params,
208
        phantom: _,
209
    } = try_spawn_params;
210
    // Preload genesis data from orchestrator chain storage.
211

            
212
    // TODO: the orchestrator chain node may not be fully synced yet,
213
    // in that case we will be reading an old state.
214
    let orchestrator_block_hash = orchestrator_chain_interface
215
        .finalized_block_hash()
216
        .await
217
        .map_err(|e| format!("Failed to get latest block hash: {e}"))?;
218

            
219
    log::info!(
220
        "Detected assignment for container chain {}",
221
        container_chain_para_id
222
    );
223

            
224
    let genesis_data = orchestrator_chain_interface
225
        .genesis_data(orchestrator_block_hash, container_chain_para_id)
226
        .await
227
        .map_err(|e| format!("Failed to call genesis_data runtime api: {}", e))?
228
        .ok_or_else(|| {
229
            format!(
230
                "No genesis data registered for container chain id {}",
231
                container_chain_para_id
232
            )
233
        })?;
234

            
235
    let boot_nodes_raw = orchestrator_chain_interface
236
        .boot_nodes(orchestrator_block_hash, container_chain_para_id)
237
        .await
238
        .map_err(|e| format!("Failed to call boot_nodes runtime api: {}", e))?;
239

            
240
    if boot_nodes_raw.is_empty() {
241
        log::warn!(
242
            "No boot nodes registered on-chain for container chain {}",
243
            container_chain_para_id
244
        );
245
    }
246
    let boot_nodes = parse_boot_nodes_ignore_invalid(boot_nodes_raw, container_chain_para_id);
247
    if boot_nodes.is_empty() {
248
        log::warn!(
249
            "No valid boot nodes for container chain {}",
250
            container_chain_para_id
251
        );
252
    }
253

            
254
    container_chain_cli
255
        .preload_chain_spec_from_genesis_data(
256
            container_chain_para_id.into(),
257
            genesis_data,
258
            chain_type.clone(),
259
            relay_chain.clone(),
260
            boot_nodes,
261
        )
262
        .map_err(|e| {
263
            format!(
264
                "failed to create container chain chain spec from on chain genesis data: {}",
265
                e
266
            )
267
        })?;
268

            
269
    log::info!(
270
        "Loaded chain spec for container chain {}",
271
        container_chain_para_id
272
    );
273

            
274
    if !data_preserver && !start_collation {
275
        log::info!("This is a syncing container chain, using random ports");
276

            
277
        collation_params = None;
278

            
279
        // Use random ports to avoid conflicts with the other running container chain
280
        let random_ports = [23456, 23457, 23458];
281

            
282
        container_chain_cli
283
            .base
284
            .base
285
            .prometheus_params
286
            .prometheus_port = Some(random_ports[0]);
287
        container_chain_cli.base.base.network_params.port = Some(random_ports[1]);
288
        container_chain_cli.base.base.rpc_params.rpc_port = Some(random_ports[2]);
289

            
290
        // Use a different network key for syncing the chain. This is to avoid full nodes banning collators
291
        // by mistake, with error:
292
        // Reason: Unsupported protocol. Banned, disconnecting.
293
        //
294
        // Store this new key in a new path to not conflict with the real network key.
295
        // The same key is used for all container chains, that doesn't seem to cause problems.
296

            
297
        // Collator-01/data/containers
298
        let mut syncing_network_key_path = container_chain_cli
299
            .base
300
            .base
301
            .shared_params
302
            .base_path
303
            .clone()
304
            .expect("base path always set");
305
        // Collator-01/data/containers/keystore/network_syncing/secret_ed25519
306
        syncing_network_key_path.push("keystore/network_syncing/secret_ed25519");
307

            
308
        // Clear network key_params. These will be used by the collating process, but not by the syncing process.
309
        container_chain_cli
310
            .base
311
            .base
312
            .network_params
313
            .node_key_params
314
            .node_key = None;
315
        container_chain_cli
316
            .base
317
            .base
318
            .network_params
319
            .node_key_params
320
            .node_key_file = Some(syncing_network_key_path);
321
        // Generate a new network key if it has not been generated already.
322
        // This is safe to enable if your node is not an authority. We use it only for syncing the network.
323
        container_chain_cli
324
            .base
325
            .base
326
            .network_params
327
            .node_key_params
328
            .unsafe_force_node_key_generation = true;
329
    }
330

            
331
    let validator = collation_params.is_some();
332

            
333
    // Update CLI params
334
    container_chain_cli.base.para_id = Some(container_chain_para_id.into());
335
    container_chain_cli
336
        .base
337
        .base
338
        .import_params
339
        .database_params
340
        .database = Some(Database::ParityDb);
341

            
342
    let keep_db = container_chain_cli.base.keep_db;
343

            
344
    // Get a closure that checks if db_path exists.Need this to know when to use full sync instead of warp sync.
345
    let check_db_exists = {
346
        // Get db_path from config
347
        let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
348
            &container_chain_cli,
349
            &container_chain_cli,
350
            tokio_handle.clone(),
351
        )
352
        .map_err(|err| format!("Container chain argument error: {}", err))?;
353

            
354
        // Change database path to make it depend on container chain para id
355
        // So instead of the usual "db/full" we have "db/full-container-2000"
356
        let mut db_path = container_chain_cli_config
357
            .database
358
            .path()
359
            .ok_or_else(|| "Failed to get database path".to_string())?
360
            .to_owned();
361
        db_path.set_file_name(format!("full-container-{}", container_chain_para_id));
362
        container_chain_cli_config.database.set_path(&db_path);
363

            
364
        // Return a closure because we may need to check if the db exists multiple times
365
        move || db_path.exists()
366
    };
367

            
368
    // Start container chain node. After starting, check if the database is good or needs to
369
    // be removed. If the db needs to be removed, this function will handle the node restart, and
370
    // return the components of a running container chain node.
371
    // This should be a separate function, but it has so many arguments that I prefer to have it as a closure for now
372
    let start_node_impl_container_with_restart = || async move {
373
        // Loop will run at most 2 times: 1 time if the db is good and 2 times if the db needs to be removed
374
        for _ in 0..2 {
375
            let db_existed_before = check_db_exists();
376

            
377
            if let Some(sync) = override_sync_mode {
378
                container_chain_cli.base.base.network_params.sync = sync;
379
            }
380
            log::info!(
381
                "Container chain sync mode: {:?}",
382
                container_chain_cli.base.base.network_params.sync
383
            );
384

            
385
            let mut container_chain_cli_config = sc_cli::SubstrateCli::create_configuration(
386
                &container_chain_cli,
387
                &container_chain_cli,
388
                tokio_handle.clone(),
389
            )
390
            .map_err(|err| format!("Container chain argument error: {}", err))?;
391

            
392
            // Change database path to make it depend on container chain para id
393
            // So instead of the usual "db/full" we have "db/full-container-2000"
394
            let mut db_path = container_chain_cli_config
395
                .database
396
                .path()
397
                .ok_or_else(|| "Failed to get database path".to_string())?
398
                .to_owned();
399
            db_path.set_file_name(format!("full-container-{}", container_chain_para_id));
400
            container_chain_cli_config.database.set_path(&db_path);
401

            
402
            let (container_chain_task_manager, container_chain_client, container_chain_db) =
403
                match container_chain_cli_config.network.network_backend {
404
                    sc_network::config::NetworkBackendType::Libp2p => {
405
                        start_node_impl_container::<_, _, sc_network::NetworkWorker<_, _>>(
406
                            container_chain_cli_config,
407
                            relay_chain_interface.clone(),
408
                            orchestrator_chain_interface.clone(),
409
                            sync_keystore.clone(),
410
                            container_chain_para_id,
411
                            collation_params.clone(),
412
                            generate_rpc_builder.clone(),
413
                            &container_chain_cli,
414
                            data_preserver,
415
                            start_bootnode_params.clone(),
416
                        )
417
                        .await?
418
                    }
419
                    sc_network::config::NetworkBackendType::Litep2p => {
420
                        start_node_impl_container::<_, _, sc_network::Litep2pNetworkBackend>(
421
                            container_chain_cli_config,
422
                            relay_chain_interface.clone(),
423
                            orchestrator_chain_interface.clone(),
424
                            sync_keystore.clone(),
425
                            container_chain_para_id,
426
                            collation_params.clone(),
427
                            generate_rpc_builder.clone(),
428
                            &container_chain_cli,
429
                            data_preserver,
430
                            start_bootnode_params.clone(),
431
                        )
432
                        .await?
433
                    }
434
                };
435

            
436
            // Keep all node parts in one variable to make them easier to drop
437
            let node_parts = (
438
                container_chain_task_manager,
439
                container_chain_client,
440
                container_chain_db,
441
                db_path,
442
            );
443

            
444
            if db_existed_before {
445
                // If the database already existed before, check if it can be used or it needs to be removed.
446
                // To remove the database, we restart the node, wait for the db to close to avoid a
447
                // "shutdown error" log, and then remove it.
448
                if let Some(db_removal_reason) = db_needs_removal(
449
                    &node_parts.1,
450
                    &orchestrator_chain_interface,
451
                    orchestrator_block_hash,
452
                    container_chain_para_id,
453
                    &container_chain_cli,
454
                    container_chain_cli.base.keep_db,
455
                )
456
                .await?
457
                {
458
                    let db_path = node_parts.3.clone();
459
                    // Important, drop `node_parts` before trying to `wait_for_paritydb_lock`
460
                    drop(node_parts);
461
                    // Wait here to for the database created in the previous loop iteration to close.
462
                    // Dropping is not enough because there is some background process that keeps the database open,
463
                    // so we check the paritydb lock file directly.
464
                    log::info!(
465
                        "Restarting container chain {} after db deletion. Reason: {:?}",
466
                        container_chain_para_id,
467
                        db_removal_reason,
468
                    );
469
                    wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
470
                        .await
471
                        .map_err(|e| {
472
                            log::warn!(
473
                                "Error waiting for chain {} to release db lock: {:?}",
474
                                container_chain_para_id,
475
                                e
476
                            );
477

            
478
                            e
479
                        })?;
480
                    delete_container_chain_db(&db_path);
481

            
482
                    // Recursion, will only happen once because `db_existed_before` will be false after
483
                    // removing the db. Apparently closures cannot be recursive so fake recursion by
484
                    // using a loop + continue
485
                    continue;
486
                }
487
            }
488

            
489
            // If using full sync, print a warning if the local db is at block 0 and the chain has thousands of blocks
490
            if container_chain_cli.base.base.network_params.sync == SyncMode::Full {
491
                let last_container_block_temp = node_parts.1.chain_info().best_number;
492
                let cc_block_num = get_latest_container_block_number_from_orchestrator(
493
                    &orchestrator_chain_interface,
494
                    orchestrator_block_hash,
495
                    container_chain_para_id,
496
                )
497
                .await
498
                .unwrap_or(0);
499
                if last_container_block_temp == 0 && cc_block_num > MAX_BLOCK_DIFF_FOR_FULL_SYNC {
500
                    let db_folder = format!("full-container-{}", container_chain_para_id);
501
                    log::error!("\
502
                        Existing database for container chain {} is at block 0, assuming that warp sync failed.\n\
503
                        The node will now use full sync, which has to download {} blocks.\n\
504
                        If running as collator, it may not finish syncing on time and miss block rewards.\n\
505
                        To force using warp sync, stop tanssi-node and manually remove the db folder: {:?}\n\
506
                        ", container_chain_para_id, cc_block_num, db_folder)
507
                }
508
            }
509

            
510
            return sc_service::error::Result::Ok(node_parts);
511
        }
512

            
513
        unreachable!("Above loop can run at most 2 times, and in the second iteration it is guaranteed to return")
514
    };
515

            
516
    let (mut container_chain_task_manager, container_chain_client, container_chain_db, db_path) =
517
        start_node_impl_container_with_restart().await?;
518

            
519
    // Signal that allows to gracefully stop a container chain
520
    let (signal, on_exit) = oneshot::channel::<bool>();
521

            
522
    let monitor_id;
523
    {
524
        let mut state = state.lock().expect("poison error");
525
        let container_chain_client = container_chain_client as Arc<dyn Any + Sync + Send>;
526

            
527
        monitor_id = state.spawned_containers_monitor.push(SpawnedContainer {
528
            id: 0,
529
            para_id: container_chain_para_id,
530
            start_time: Instant::now(),
531
            stop_signal_time: None,
532
            stop_task_manager_time: None,
533
            stop_refcount_time: Default::default(),
534
            backend: Arc::downgrade(&container_chain_db),
535
            client: Arc::downgrade(&container_chain_client),
536
        });
537

            
538
        if state
539
            .spawned_container_chains
540
            .contains_key(&container_chain_para_id)
541
        {
542
            return Err(format!("Tried to spawn a container chain when another container chain with the same para id was already running: {:?}", container_chain_para_id).into());
543
        }
544
        state.spawned_container_chains.insert(
545
            container_chain_para_id,
546
            ContainerChainState {
547
                stop_handle: StopContainerChain {
548
                    signal,
549
                    id: monitor_id,
550
                },
551
                db_path: db_path.clone(),
552
            },
553
        );
554
    }
555

            
556
    // Add the container chain task manager as a child task to the parent task manager.
557
    // We want to stop the node if this task manager stops, but we also want to allow a
558
    // graceful shutdown using the `on_exit` future.
559
    let name = "container-chain-task-manager";
560
    spawn_handle.spawn(name, None, async move {
561
        let mut container_chain_task_manager_future =
562
            container_chain_task_manager.future().fuse();
563
        let mut on_exit_future = on_exit.fuse();
564

            
565
        futures::select! {
566
            res1 = container_chain_task_manager_future => {
567
                // An essential task failed or the task manager was stopped unexpectedly
568
                // using `.terminate()`. This should stop the container chain but not the node.
569
                if res1.is_err() {
570
                    log::error!("Essential task failed in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
571
                } else {
572
                    log::error!("Unexpected shutdown in container chain {} task manager. Shutting down container chain service", container_chain_para_id);
573
                }
574
                // Mark this container chain as "failed to stop" to avoid warning in `self.stop()`
575
                let mut state = state.lock().expect("poison error");
576
                state.failed_para_ids.insert(container_chain_para_id);
577
                // Never delete db in this case because it is not a graceful shutdown
578
            }
579
            stop_unassigned = on_exit_future => {
580
                // Graceful shutdown.
581
                // `stop_unassigned` will be `Ok(keep_db)` if `.stop()` has been called, which means that the
582
                // container chain has been unassigned, and will be `Err` if the handle has been dropped,
583
                // which means that the node is stopping.
584
                // Delete existing database if running as collator
585
                if validator && stop_unassigned == Ok(false) && !keep_db {
586
                    // If this breaks after a code change, make sure that all the variables that
587
                    // may keep the chain alive are dropped before the call to `wait_for_paritydb_lock`.
588
                    drop(container_chain_task_manager_future);
589
                    drop(container_chain_task_manager);
590
                    let db_closed = wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
591
                        .await
592
                        .map_err(|e| {
593
                            log::warn!(
594
                                "Error waiting for chain {} to release db lock: {:?}",
595
                                container_chain_para_id,
596
                                e
597
                            );
598
                        }).is_ok();
599
                    // If db has not closed in 60 seconds we do not delete it.
600
                    if db_closed {
601
                        delete_container_chain_db(&db_path);
602
                    }
603
                }
604
            }
605
        }
606

            
607
        let mut state = state.lock().expect("poison error");
608
        state
609
            .spawned_containers_monitor
610
            .set_stop_task_manager_time(monitor_id, Instant::now());
611
    });
612

            
613
    Ok(())
614
}
615

            
616
/// Interface for spawning and stopping container chain embeded nodes.
617
pub trait Spawner {
618
    /// Access to the Orchestrator Chain Interface
619
    fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface>;
620

            
621
    /// Try to start a new container chain. In case of an error, this does not stop the node, and
622
    /// the container chain will be attempted to spawn again when the collator is reassigned to it.
623
    ///
624
    /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
625
    /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
626
    /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
627
    /// `handle_update_assignment`.
628
    fn spawn(
629
        &self,
630
        container_chain_para_id: ParaId,
631
        start_collation: bool,
632
    ) -> impl std::future::Future<Output = ()> + Send;
633

            
634
    /// Stop a container chain. Prints a warning if the container chain was not running.
635
    /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
636
    /// to ensure that the container chain has fully stopped. The database path can be `None` if the
637
    /// chain was not running.
638
    fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf>;
639
}
640

            
641
impl<
642
        RuntimeApi: MinimalContainerRuntimeApi,
643
        TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
644
    > Spawner for ContainerChainSpawner<RuntimeApi, TGenerateRpcBuilder>
645
{
646
    /// Access to the Orchestrator Chain Interface
647
    fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
648
        self.params.orchestrator_chain_interface.clone()
649
    }
650

            
651
    /// Try to start a new container chain. In case of an error, this does not stop the node, and
652
    /// the container chain will be attempted to spawn again when the collator is reassigned to it.
653
    ///
654
    /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
655
    /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
656
    /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
657
    /// `handle_update_assignment`.
658
    async fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
659
        let try_spawn_params = self.params.clone();
660
        let state = self.state.clone();
661
        let state2 = state.clone();
662

            
663
        match try_spawn(
664
            try_spawn_params,
665
            state,
666
            container_chain_para_id,
667
            start_collation,
668
        )
669
        .await
670
        {
671
            Ok(()) => {}
672
            Err(e) => {
673
                log::error!(
674
                    "Failed to start container chain {}: {}",
675
                    container_chain_para_id,
676
                    e
677
                );
678
                // Mark this container chain as "failed to start"
679
                let mut state = state2.lock().expect("poison error");
680
                state.failed_para_ids.insert(container_chain_para_id);
681
            }
682
        }
683
    }
684

            
685
    /// Stop a container chain. Prints a warning if the container chain was not running.
686
    /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
687
    /// to ensure that the container chain has fully stopped. The database path can be `None` if the
688
    /// chain was not running.
689
    fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
690
        let mut state = self.state.lock().expect("poison error");
691
        let stop_handle = state
692
            .spawned_container_chains
693
            .remove(&container_chain_para_id);
694

            
695
        match stop_handle {
696
            Some(stop_handle) => {
697
                log::info!("Stopping container chain {}", container_chain_para_id);
698

            
699
                let id = stop_handle.stop_handle.id;
700
                state
701
                    .spawned_containers_monitor
702
                    .set_stop_signal_time(id, Instant::now());
703

            
704
                // Send signal to perform graceful shutdown, which will delete the db if needed
705
                let _ = stop_handle.stop_handle.signal.send(keep_db);
706

            
707
                Some(stop_handle.db_path)
708
            }
709
            None => {
710
                // Do not print the warning message if this is a container chain that has failed to
711
                // start, because in that case it will not be running
712
                if !state.failed_para_ids.remove(&container_chain_para_id) {
713
                    log::warn!(
714
                        "Tried to stop a container chain that is not running: {}",
715
                        container_chain_para_id
716
                    );
717
                }
718

            
719
                None
720
            }
721
        }
722
    }
723
}
724

            
725
impl<
726
        RuntimeApi: MinimalContainerRuntimeApi,
727
        TGenerateRpcBuilder: GenerateRpcBuilder<RuntimeApi>,
728
    > ContainerChainSpawner<RuntimeApi, TGenerateRpcBuilder>
729
{
730
    /// Receive and process `CcSpawnMsg`s indefinitely
731
    pub async fn rx_loop(
732
        mut self,
733
        mut rx: mpsc::UnboundedReceiver<CcSpawnMsg>,
734
        validator: bool,
735
        solochain: bool,
736
    ) {
737
        let orchestrator_para_id = self
738
            .params
739
            .collation_params
740
            .as_ref()
741
            .expect("assignment update should only occur in a collating node")
742
            .orchestrator_para_id;
743

            
744
        // The node always starts as an orchestrator chain collator.
745
        // This is because the assignment is detected after importing a new block, so if all
746
        // collators stop at the same time, when they start again nobody will produce the new block.
747
        // So all nodes start as orchestrator chain collators, until the first block is imported,
748
        // then the real assignment is used.
749
        // Except in solochain mode, then the initial assignment is None.
750
        if validator && !solochain {
751
            self.handle_update_assignment(Some(orchestrator_para_id), None, true)
752
                .await;
753
        }
754

            
755
        while let Some(msg) = rx.recv().await {
756
            match msg {
757
                CcSpawnMsg::UpdateAssignment { current, next } => {
758
                    self.handle_update_assignment(current, next, false).await;
759
                }
760
            }
761
        }
762

            
763
        // The while loop can end if all the senders get dropped, but since this is an
764
        // essential task we don't want it to stop. So await a future that never completes.
765
        // This should only happen when starting a full node.
766
        if !validator {
767
            let () = std::future::pending().await;
768
        }
769
    }
770

            
771
    /// Handle `CcSpawnMsg::UpdateAssignment`
772
    async fn handle_update_assignment(
773
        &mut self,
774
        current: Option<ParaId>,
775
        next: Option<ParaId>,
776
        disable_db_folder_cleanup: bool,
777
    ) {
778
        if !disable_db_folder_cleanup && !self.db_folder_cleanup_done {
779
            self.db_folder_cleanup_done = true;
780

            
781
            // Disabled when running with --keep-db
782
            let keep_db = self.params.container_chain_cli.base.keep_db;
783
            if !keep_db {
784
                let mut chains_to_keep = HashSet::new();
785
                chains_to_keep.extend(current);
786
                chains_to_keep.extend(next);
787
                self.db_folder_cleanup(&chains_to_keep);
788
            }
789
        }
790

            
791
        let orchestrator_para_id = self
792
            .params
793
            .collation_params
794
            .as_ref()
795
            .expect("assignment update should only occur in a collating node")
796
            .orchestrator_para_id;
797

            
798
        let HandleUpdateAssignmentResult {
799
            chains_to_stop,
800
            chains_to_start,
801
            need_to_restart: _,
802
        } = handle_update_assignment_state_change(
803
            &mut self.state.lock().expect("poison error"),
804
            orchestrator_para_id,
805
            current,
806
            next,
807
        );
808

            
809
        if current != Some(orchestrator_para_id) {
810
            // If not assigned to orchestrator chain anymore, we need to stop the collator process
811
            let maybe_exit_notification_receiver = self
812
                .collation_cancellation_constructs
813
                .take()
814
                .map(|(cancellation_token, exit_notification_receiver)| {
815
                    cancellation_token.cancel();
816
                    exit_notification_receiver
817
                });
818

            
819
            if let Some(exit_notification_receiver) = maybe_exit_notification_receiver {
820
                let _ = exit_notification_receiver.await;
821
            }
822
        } else if self.collation_cancellation_constructs.is_none() {
823
            // If assigned to orchestrator chain but the collator process is not running, start it
824
            self.collation_cancellation_constructs = Some((self.collate_on_tanssi)());
825
        }
826

            
827
        // Stop all container chains that are no longer needed
828
        let mut db_paths_restart = vec![];
829
        for para_id in chains_to_stop {
830
            // Keep db if we are currently assigned to this chain
831
            let keep_db = Some(para_id) == current;
832
            let maybe_db_path = self.stop(para_id, keep_db);
833
            // If we are restarting this chain, save its db_path to check when it actually stopped
834
            if let Some(db_path) = maybe_db_path {
835
                if chains_to_start.contains(&para_id) {
836
                    db_paths_restart.push((para_id, db_path));
837
                }
838
            }
839
        }
840

            
841
        if !db_paths_restart.is_empty() {
842
            // Ensure the chains we stopped actually stopped by checking if their database is unlocked.
843
            // Using `join_all` because in one edge case we may be restarting 2 chains,
844
            // but almost always this will be only one future.
845
            let futs = db_paths_restart
846
                .into_iter()
847
                .map(|(para_id, db_path)| async move {
848
                    wait_for_paritydb_lock(&db_path, MAX_DB_RESTART_TIMEOUT)
849
                        .await
850
                        .map_err(|e| {
851
                            log::warn!(
852
                                "Error waiting for chain {} to release db lock: {:?}",
853
                                para_id,
854
                                e
855
                            );
856
                        })
857
                });
858
            futures::future::join_all(futs).await;
859
        }
860

            
861
        // Start all new container chains (usually 1)
862
        for para_id in chains_to_start {
863
            // Edge case: when starting the node it may be assigned to a container chain, so we need to
864
            // start a container chain already collating.
865
            // TODO: another edge case: if current == None, and running_chains == 0,
866
            // and chains_to_start == 1, we can start this chain as collating, and we won't need
867
            // to restart it on the next session. We need to add some extra state somewhere to
868
            // implement this properly.
869
            let start_collation = Some(para_id) == current;
870
            self.spawn(para_id, start_collation).await;
871
        }
872
    }
873

            
874
    fn db_folder_cleanup(&self, chains_to_keep: &HashSet<ParaId>) {
875
        // "containers" folder
876
        let mut base_path = self
877
            .params
878
            .container_chain_cli
879
            .base
880
            .base
881
            .shared_params
882
            .base_path
883
            .as_ref()
884
            .expect("base_path is always set")
885
            .to_owned();
886

            
887
        // "containers/chains"
888
        base_path.push("chains");
889

            
890
        // Inside chains folder we have container folders such as
891
        // containers/chains/simple_container_2000/
892
        // containers/chains/frontier_container_2001/
893
        // But this is not the para id, it's the chain id which we have set to include the para id, but that's not mandatory.
894
        // To get the para id we need to look for the paritydb folder:
895
        // containers/chains/frontier_container_2001/paritydb/full-container-2001/
896
        let mut chain_folders = sort_container_folders_by_para_id(&base_path);
897

            
898
        // Keep chains that we are assigned to
899
        for para_id in chains_to_keep {
900
            chain_folders.remove(&Some(*para_id));
901
        }
902

            
903
        // Print nice log message when removing folders
904
        if !chain_folders.is_empty() {
905
            let chain_folders_fmt = chain_folders
906
                .iter()
907
                .flat_map(|(para_id, vec_paths)| {
908
                    let para_id_fmt = if let Some(para_id) = para_id {
909
                        para_id.to_string()
910
                    } else {
911
                        "None".to_string()
912
                    };
913
                    vec_paths
914
                        .iter()
915
                        .map(move |path| format!("\n{}: {}", para_id_fmt, path.display()))
916
                })
917
                .collect::<String>();
918
            log::info!(
919
                "db_folder_cleanup: removing container folders: (para_id, path):{}",
920
                chain_folders_fmt
921
            );
922
        }
923

            
924
        // Remove, ignoring errors
925
        for (_para_id, folders) in chain_folders {
926
            for folder in folders {
927
                let _ = std::fs::remove_dir_all(&folder);
928
            }
929
        }
930
    }
931
}
932

            
933
struct HandleUpdateAssignmentResult {
934
    chains_to_stop: Vec<ParaId>,
935
    chains_to_start: Vec<ParaId>,
936
    #[allow(dead_code)] // no longer used except in tests
937
    need_to_restart: bool,
938
}
939

            
940
// This is a separate function to allow testing
941
35
fn handle_update_assignment_state_change(
942
35
    state: &mut ContainerChainSpawnerState,
943
35
    orchestrator_para_id: ParaId,
944
35
    current: Option<ParaId>,
945
35
    next: Option<ParaId>,
946
35
) -> HandleUpdateAssignmentResult {
947
35
    if (state.assigned_para_id, state.next_assigned_para_id) == (current, next) {
948
        // If nothing changed there is nothing to update
949
        return HandleUpdateAssignmentResult {
950
            chains_to_stop: Default::default(),
951
            chains_to_start: Default::default(),
952
            need_to_restart: false,
953
        };
954
35
    }
955

            
956
    // Create a set with the container chains that were running before, and the container
957
    // chains that should be running after the updated assignment. This is used to calculate
958
    // the difference, and stop and start the required container chains.
959
35
    let mut running_chains_before = HashSet::new();
960
35
    let mut running_chains_after = HashSet::new();
961

            
962
35
    running_chains_before.extend(state.assigned_para_id);
963
35
    running_chains_before.extend(state.next_assigned_para_id);
964
    // Ignore orchestrator_para_id because it is handled in a special way, as it does not need to
965
    // start one session before in order to sync.
966
35
    running_chains_before.remove(&orchestrator_para_id);
967

            
968
35
    running_chains_after.extend(current);
969
35
    running_chains_after.extend(next);
970
35
    running_chains_after.remove(&orchestrator_para_id);
971
35
    let mut need_to_restart_current = false;
972
35
    let mut need_to_restart_next = false;
973

            
974
35
    if state.assigned_para_id != current {
975
24
        if let Some(para_id) = current {
976
            // If the assigned container chain has changed, we may need to
977
            // restart it in collation mode, unless it is the orchestrator chain.
978
16
            if para_id != orchestrator_para_id {
979
13
                need_to_restart_current = true;
980
13
            }
981
8
        }
982

            
983
24
        if let Some(para_id) = state.assigned_para_id {
984
18
            if para_id != orchestrator_para_id && Some(para_id) == next {
985
2
                need_to_restart_next = true;
986
16
            }
987
6
        }
988
11
    }
989

            
990
35
    state.assigned_para_id = current;
991
35
    state.next_assigned_para_id = next;
992

            
993
35
    let mut chains_to_stop: Vec<_> = running_chains_before
994
35
        .difference(&running_chains_after)
995
35
        .copied()
996
35
        .collect();
997
35
    let mut chains_to_start: Vec<_> = running_chains_after
998
35
        .difference(&running_chains_before)
999
35
        .copied()
35
        .collect();
35
    if need_to_restart_current {
        // Force restart of new assigned container chain: if it was running before it was in "syncing mode",
        // which doesn't use the correct ports, so start it in "collation mode".
13
        let id = current.unwrap();
13
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
6
            chains_to_stop.push(id);
7
        }
13
        if !chains_to_start.contains(&id) {
6
            chains_to_start.push(id);
7
        }
22
    }
35
    if need_to_restart_next {
        // Handle edge case of going from (2000, 2001) to (2001, 2000). In that case we must restart both chains,
        // because previously 2000 was collating and now 2000 will only be syncing.
2
        let id = next.unwrap();
2
        if running_chains_before.contains(&id) && !chains_to_stop.contains(&id) {
2
            chains_to_stop.push(id);
2
        }
2
        if !chains_to_start.contains(&id) {
2
            chains_to_start.push(id);
2
        }
33
    }
    HandleUpdateAssignmentResult {
35
        chains_to_stop,
35
        chains_to_start,
35
        need_to_restart: need_to_restart_current || need_to_restart_next,
    }
35
}
async fn get_latest_container_block_number_from_orchestrator(
    orchestrator_chain_interface: &Arc<dyn OrchestratorChainInterface>,
    orchestrator_block_hash: PHash,
    container_chain_para_id: ParaId,
) -> Option<u32> {
    // Get the container chain's latest block from orchestrator chain and compare with client's one
    orchestrator_chain_interface
        .latest_block_number(orchestrator_block_hash, container_chain_para_id)
        .await
        .unwrap_or_default()
}
#[derive(Debug)]
#[allow(dead_code)]
enum DbRemovalReason {
    HighBlockDiff {
        best_block_number_db: u32,
        best_block_number_onchain: u32,
    },
    GenesisHashMismatch {
        container_client_genesis_hash: H256,
        chain_spec_genesis_hash_v0: H256,
        chain_spec_genesis_hash_v1: H256,
    },
}
/// Given a container chain client, check if the database is valid. If not, returns `Some` with the
/// reason for db removal.
/// Reasons may be:
/// * High block diff: when the local db is outdated and it would take a long time to sync using full sync, we remove it to be able to use warp sync.
/// * Genesis hash mismatch, when the chain was deregistered and a different chain with the same para id was registered.
async fn db_needs_removal<RuntimeApi: MinimalContainerRuntimeApi>(
    container_chain_client: &Arc<ContainerChainClient<RuntimeApi>>,
    orchestrator_chain_interface: &Arc<dyn OrchestratorChainInterface>,
    orchestrator_block_hash: PHash,
    container_chain_para_id: ParaId,
    container_chain_cli: &ContainerChainCli,
    keep_db: bool,
) -> sc_service::error::Result<Option<DbRemovalReason>> {
    // Check block diff, only needed if keep-db is false
    if !keep_db {
        // Get latest block number from the container chain client
        let last_container_block_temp = container_chain_client.chain_info().best_number;
        if last_container_block_temp == 0 {
            // Don't remove an empty database, as it may be in the process of a warp sync
        } else if get_latest_container_block_number_from_orchestrator(
            orchestrator_chain_interface,
            orchestrator_block_hash,
            container_chain_para_id,
        )
        .await
        .unwrap_or(0)
        .abs_diff(last_container_block_temp)
            > MAX_BLOCK_DIFF_FOR_FULL_SYNC
        {
            // if the diff is big, delete db and restart using warp sync
            return Ok(Some(DbRemovalReason::HighBlockDiff {
                best_block_number_db: last_container_block_temp,
                best_block_number_onchain: last_container_block_temp,
            }));
        }
    }
    // Generate genesis hash to compare against container client's genesis hash
    let container_preloaded_genesis = container_chain_cli.preloaded_chain_spec.as_ref().unwrap();
    // Check with both state versions, but first v1 which is the latest
    let block_v1: Block =
        generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V1)
            .map_err(|e| format!("{:?}", e))?;
    let chain_spec_genesis_hash_v1 = block_v1.header().hash();
    let container_client_genesis_hash = container_chain_client.chain_info().genesis_hash;
    if container_client_genesis_hash != chain_spec_genesis_hash_v1 {
        let block_v0: Block =
            generate_genesis_block(&**container_preloaded_genesis, sp_runtime::StateVersion::V0)
                .map_err(|e| format!("{:?}", e))?;
        let chain_spec_genesis_hash_v0 = block_v0.header().hash();
        if container_client_genesis_hash != chain_spec_genesis_hash_v0 {
            log::info!("Container genesis V0: {:?}", chain_spec_genesis_hash_v0);
            log::info!("Container genesis V1: {:?}", chain_spec_genesis_hash_v1);
            log::info!(
                "Chain spec genesis {:?} did not match with any container genesis - Restarting...",
                container_client_genesis_hash
            );
            return Ok(Some(DbRemovalReason::GenesisHashMismatch {
                container_client_genesis_hash,
                chain_spec_genesis_hash_v0,
                chain_spec_genesis_hash_v1,
            }));
        }
    }
    Ok(None)
}
/// Remove the container chain database folder. This is called with db_path:
///     `Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002`
/// but we want to delete everything under
///     `Collator2002-01/data/containers/chains/simple_container_2002`
/// So we use `delete_empty_folders_recursive` to try to remove the parent folders as well, but only
/// if they are empty. This is to avoid removing any secret keys or other important data.
fn delete_container_chain_db(db_path: &Path) {
    // Remove folder `full-container-2002`
    let _ = std::fs::remove_dir_all(db_path);
    // Remove all the empty folders inside `simple_container_2002`, including self
    if let Some(parent) = db_path.ancestors().nth(2) {
        delete_empty_folders_recursive(parent);
    }
}
/// Removes all empty folders in `path`, recursively. Then, if `path` is empty, it removes it as well.
/// Ignores any IO errors.
fn delete_empty_folders_recursive(path: &Path) {
    let entry_iter = std::fs::read_dir(path);
    let entry_iter = match entry_iter {
        Ok(x) => x,
        Err(_e) => return,
    };
    for entry in entry_iter {
        let entry = match entry {
            Ok(x) => x,
            Err(_e) => continue,
        };
        let path = entry.path();
        if path.is_dir() {
            delete_empty_folders_recursive(&path);
        }
    }
    // Try to remove dir. Returns an error if the directory is not empty, but we ignore it.
    let _ = std::fs::remove_dir(path);
}
/// Parse a list of boot nodes in `Vec<u8>` format. Invalid boot nodes are filtered out.
3
fn parse_boot_nodes_ignore_invalid(
3
    boot_nodes_raw: Vec<Vec<u8>>,
3
    container_chain_para_id: ParaId,
3
) -> Vec<MultiaddrWithPeerId> {
3
    boot_nodes_raw
3
        .into_iter()
3
        .filter_map(|x| {
3
            let x = String::from_utf8(x)
3
                .map_err(|e| {
1
                    log::debug!(
                        "Invalid boot node in container chain {}: {}",
                        container_chain_para_id,
                        e
                    );
1
                })
3
                .ok()?;
2
            x.parse::<MultiaddrWithPeerId>()
2
                .map_err(|e| {
1
                    log::debug!(
                        "Invalid boot node in container chain {}: {}",
                        container_chain_para_id,
                        e
                    )
1
                })
2
                .ok()
3
        })
3
        .collect()
3
}
pub async fn wait_for_paritydb_lock(db_path: &Path, max_timeout: Duration) -> Result<(), String> {
    let now = Instant::now();
    while now.elapsed() < max_timeout {
        let lock_held = check_paritydb_lock_held(db_path)
            .map_err(|e| format!("Failed to check if lock file is held: {}", e))?;
        if !lock_held {
            return Ok(());
        }
        sleep(Duration::from_secs(1)).await;
    }
    Err("Timeout when waiting for paritydb lock".to_string())
}
/// Given a path to a paritydb database, check if its lock file is held. This indicates that a
/// background process is still using the database, so we should wait before trying to open it.
///
/// This should be kept up to date with the way paritydb handles the lock file:
/// <https://github.com/paritytech/parity-db/blob/2b6820e310a08678d4540c044f41a93d87343ac8/src/db.rs#L215>
fn check_paritydb_lock_held(db_path: &Path) -> Result<bool, std::io::Error> {
    if !db_path.is_dir() {
        // Lock file does not exist, so it is not held
        return Ok(false);
    }
    let mut lock_path: std::path::PathBuf = db_path.to_owned();
    lock_path.push("lock");
    let lock_file = std::fs::OpenOptions::new()
        .create(true)
        .read(true)
        .write(true)
        .truncate(true)
        .open(lock_path.as_path())?;
    // Check if the lock file is busy by trying to lock it.
    // Returns err if failed to adquire the lock.
    let lock_held = lock_file.try_lock_exclusive().is_err();
    Ok(lock_held)
}
fn sort_container_folders_by_para_id(
    chains_folder_path: &Path,
) -> HashMap<Option<ParaId>, Vec<PathBuf>> {
    let mut h = HashMap::new();
    let entry_iter = std::fs::read_dir(chains_folder_path);
    let entry_iter = match entry_iter {
        Ok(x) => x,
        Err(_e) => return h,
    };
    for entry in entry_iter {
        let entry = match entry {
            Ok(x) => x,
            Err(_e) => continue,
        };
        let path = entry.path();
        if path.is_dir() {
            if let Ok(para_id) = process_container_folder_get_para_id(path.clone()) {
                h.entry(para_id).or_default().push(path);
            }
        }
    }
    h
}
fn process_container_folder_get_para_id(path: PathBuf) -> std::io::Result<Option<ParaId>> {
    // Build the path to the paritydb directory
    let paritydb_path = path.join("paritydb");
    // Check if the paritydb directory exists and is a directory
    if !paritydb_path.is_dir() {
        // If not, associate the path with `None` in the hashmap
        return Ok(None);
    }
    // Read the entries in the paritydb directory
    let entry_iter = std::fs::read_dir(&paritydb_path)?;
    let mut para_id: Option<ParaId> = None;
    // Iterate over each entry in the paritydb directory
    for entry in entry_iter {
        let entry = entry?;
        let sub_path = entry.path();
        // Only consider directories
        if !sub_path.is_dir() {
            continue;
        }
        let sub_path_file_name = match sub_path.file_name().and_then(|s| s.to_str()) {
            Some(x) => x,
            None => {
                continue;
            }
        };
        // That follow this pattern
        if !sub_path_file_name.starts_with("full-container-") {
            continue;
        }
        if let Some(id) = parse_para_id_from_folder_name(sub_path_file_name) {
            if para_id.is_some() {
                // If there is more than one folder with a para id, assume this folder is
                // corrupted and ignore it, keep it for manual deletion
                return Err(std::io::Error::new(std::io::ErrorKind::Other, ""));
            }
            para_id = Some(id);
        }
    }
    Ok(para_id)
}
// Input:
// full-container-2000
// Output:
// Some(2000)
5
fn parse_para_id_from_folder_name(folder_name: &str) -> Option<ParaId> {
    // Find last '-' in string
5
    let idx = folder_name.rfind('-')?;
    // +1 to skip the '-'
3
    let id_str = &folder_name[idx + 1..];
    // Try to parse as u32, in case of error return None
3
    let id = id_str.parse::<u32>().ok()?;
1
    Some(id.into())
5
}
#[cfg(test)]
mod tests {
    use {super::*, std::path::PathBuf};
    // Copy of ContainerChainSpawner with extra assertions for tests, and mocked spawn function.
    struct MockContainerChainSpawner {
        state: Arc<Mutex<ContainerChainSpawnerState>>,
        orchestrator_para_id: ParaId,
        collate_on_tanssi: Arc<
            dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>) + Send + Sync,
        >,
        collation_cancellation_constructs: Option<()>,
        // Keep track of the last CollateOn message, for tests
        currently_collating_on: Arc<Mutex<Option<ParaId>>>,
    }
    impl MockContainerChainSpawner {
10
        fn new() -> Self {
10
            let orchestrator_para_id = 1000.into();
            // The node always starts as an orchestrator chain collator
10
            let currently_collating_on = Arc::new(Mutex::new(Some(orchestrator_para_id)));
10
            let currently_collating_on2 = currently_collating_on.clone();
10
            let collate_closure = move || {
3
                let mut cco = currently_collating_on2.lock().unwrap();
3
                assert_ne!(
3
                    *cco,
3
                    Some(orchestrator_para_id),
                    "Received CollateOn message when we were already collating on this chain: {}",
                    orchestrator_para_id
                );
3
                *cco = Some(orchestrator_para_id);
3
                let (_, receiver) = futures::channel::oneshot::channel();
3
                (CancellationToken::new(), receiver)
3
            };
10
            let collate_on_tanssi: Arc<
10
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
10
                    + Send
10
                    + Sync,
10
            > = Arc::new(collate_closure);
10
            Self {
10
                state: Arc::new(Mutex::new(ContainerChainSpawnerState {
10
                    spawned_container_chains: Default::default(),
10
                    assigned_para_id: Some(orchestrator_para_id),
10
                    next_assigned_para_id: None,
10
                    failed_para_ids: Default::default(),
10
                    spawned_containers_monitor: Default::default(),
10
                })),
10
                orchestrator_para_id,
10
                collate_on_tanssi,
10
                // Some if collator starts on orchestrator chain
10
                collation_cancellation_constructs: Some(()),
10
                currently_collating_on,
10
            }
10
        }
21
        fn spawn(&self, container_chain_para_id: ParaId, start_collation: bool) {
21
            let (signal, _on_exit) = oneshot::channel();
21
            let currently_collating_on2 = self.currently_collating_on.clone();
21
            let collate_closure = move || {
13
                let mut cco = currently_collating_on2.lock().unwrap();
13
                assert_ne!(
13
                    *cco,
13
                    Some(container_chain_para_id),
                    "Received CollateOn message when we were already collating on this chain: {}",
                    container_chain_para_id
                );
13
                *cco = Some(container_chain_para_id);
13
                let (_, receiver) = futures::channel::oneshot::channel();
13
                (CancellationToken::new(), receiver)
13
            };
21
            let collate_on: Arc<
21
                dyn Fn() -> (CancellationToken, futures::channel::oneshot::Receiver<()>)
21
                    + Send
21
                    + Sync,
21
            > = Arc::new(collate_closure);
            // Dummy db_path for tests, is not actually used
21
            let db_path = PathBuf::from(format!("/tmp/container-{}/db", container_chain_para_id));
21
            let old = self
21
                .state
21
                .lock()
21
                .expect("poison error")
21
                .spawned_container_chains
21
                .insert(
21
                    container_chain_para_id,
21
                    ContainerChainState {
21
                        stop_handle: StopContainerChain { signal, id: 0 },
21
                        db_path,
21
                    },
                );
21
            assert!(
21
                old.is_none(),
                "tried to spawn a container chain that was already running: {}",
                container_chain_para_id
            );
21
            if start_collation {
13
                let (_cancellation_token, _exit_receiver) = collate_on();
13
            }
21
        }
15
        fn stop(&self, container_chain_para_id: ParaId) {
15
            let stop_handle = self
15
                .state
15
                .lock()
15
                .expect("poison error")
15
                .spawned_container_chains
15
                .remove(&container_chain_para_id);
15
            match stop_handle {
15
                Some(_stop_handle) => {
15
                    log::info!("Stopping container chain {}", container_chain_para_id);
                }
                None => {
                    panic!(
                        "Tried to stop a container chain that is not running: {}",
                        container_chain_para_id
                    );
                }
            }
            // Update currently_collating_on, if we stopped the chain we are no longer collating there
15
            let mut lco = self.currently_collating_on.lock().unwrap();
15
            if *lco == Some(container_chain_para_id) {
7
                *lco = None;
8
            }
15
        }
35
        fn handle_update_assignment(&mut self, current: Option<ParaId>, next: Option<ParaId>) {
            let HandleUpdateAssignmentResult {
35
                chains_to_stop,
35
                chains_to_start,
35
                need_to_restart,
35
            } = handle_update_assignment_state_change(
35
                &mut self.state.lock().unwrap(),
35
                self.orchestrator_para_id,
35
                current,
35
                next,
35
            );
35
            if current != Some(self.orchestrator_para_id) {
                // If not assigned to orchestrator chain anymore, we need to stop the collator process
27
                let mut cco = self.currently_collating_on.lock().unwrap();
27
                if *cco == Some(self.orchestrator_para_id) {
10
                    *cco = None;
17
                }
27
                self.collation_cancellation_constructs = None;
8
            } else if self.collation_cancellation_constructs.is_none() {
3
                let (_cancellation_token, _exit_notification_receiver) = (self.collate_on_tanssi)();
3
                self.collation_cancellation_constructs = Some(());
5
            }
            // Assert we never start and stop the same container chain
56
            for para_id in &chains_to_start {
21
                if !need_to_restart {
4
                    assert!(
4
                        !chains_to_stop.contains(para_id),
                        "Tried to start and stop same container chain: {}",
                        para_id
                    );
                } else {
                    // Will try to start and stop container chain with id "current" or "next", so ignore that
17
                    if Some(*para_id) != current && Some(*para_id) != next {
                        assert!(
                            !chains_to_stop.contains(para_id),
                            "Tried to start and stop same container chain: {}",
                            para_id
                        );
17
                    }
                }
            }
            // Assert we never start or stop the orchestrator chain
35
            assert!(!chains_to_start.contains(&self.orchestrator_para_id));
35
            assert!(!chains_to_stop.contains(&self.orchestrator_para_id));
            // Stop all container chains that are no longer needed
50
            for para_id in chains_to_stop {
15
                self.stop(para_id);
15
            }
            // Start all new container chains (usually 1)
56
            for para_id in chains_to_start {
21
                // Edge case: when starting the node it may be assigned to a container chain, so we need to
21
                // start a container chain already collating.
21
                let start_collation = Some(para_id) == current;
21
                self.spawn(para_id, start_collation);
21
            }
            // Assert that if we are currently assigned to a container chain, we are collating there
35
            if let Some(para_id) = current {
24
                self.assert_collating_on(Some(para_id));
24
            } else {
11
                self.assert_collating_on(None);
11
            }
35
        }
        #[track_caller]
71
        fn assert_collating_on(&self, para_id: Option<ParaId>) {
71
            let currently_collating_on = *self.currently_collating_on.lock().unwrap();
71
            assert_eq!(currently_collating_on, para_id);
71
        }
        #[track_caller]
36
        fn assert_running_chains(&self, para_ids: &[ParaId]) {
36
            let mut actually_running: Vec<ParaId> = self
36
                .state
36
                .lock()
36
                .unwrap()
36
                .spawned_container_chains
36
                .keys()
36
                .cloned()
36
                .collect();
36
            actually_running.sort();
36
            let mut should_be_running = para_ids.to_vec();
36
            should_be_running.sort();
36
            assert_eq!(actually_running, should_be_running);
36
        }
    }
    #[test]
1
    fn starts_collating_on_tanssi() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_orchestrator_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, Some(1000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn assigned_to_container_chain() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn spawn_container_chains() {
1
        let mut m = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(1000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
        m.handle_update_assignment(Some(2001.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1
        m.handle_update_assignment(Some(2001.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2001.into()]);
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn swap_current_next() {
        // Going from (2000, 2001) to (2001, 2000) shouldn't start or stop any container chains
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
        m.handle_update_assignment(Some(2001.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2001.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
    }
    #[test]
1
    fn stop_collating_orchestrator() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(1000.into()), Some(1000.into()));
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(Some(1000.into()), None);
1
        m.assert_collating_on(Some(1000.into()));
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn stop_collating_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
        // This will send a CollateOn message to the same chain as the last CollateOn,
        // but this is needed because that chain has been stopped
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_collating_container_start_immediately() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
        // This will start the chain already collating
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn stop_all_chains() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), Some(2001.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into(), 2001.into()]);
1
        m.handle_update_assignment(None, None);
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[]);
1
    }
    #[test]
1
    fn keep_collating_on_container() {
1
        let mut m: MockContainerChainSpawner = MockContainerChainSpawner::new();
1
        m.handle_update_assignment(Some(2000.into()), None);
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(None, Some(2000.into()));
1
        m.assert_collating_on(None);
1
        m.assert_running_chains(&[2000.into()]);
1
        m.handle_update_assignment(Some(2000.into()), Some(2000.into()));
1
        m.assert_collating_on(Some(2000.into()));
1
        m.assert_running_chains(&[2000.into()]);
1
    }
    #[test]
1
    fn invalid_boot_nodes_are_ignored() {
1
        let para_id = 100.into();
1
        let bootnode1 =
1
            b"/ip4/127.0.0.1/tcp/33049/ws/p2p/12D3KooWHVMhQDHBpj9vQmssgyfspYecgV6e3hH1dQVDUkUbCYC9"
1
                .to_vec();
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"A".to_vec()], para_id),
1
            vec![]
        );
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![b"\xff".to_vec()], para_id),
1
            vec![]
        );
        // Valid boot nodes are not ignored
1
        assert_eq!(
1
            parse_boot_nodes_ignore_invalid(vec![bootnode1], para_id).len(),
            1
        );
1
    }
    #[test]
1
    fn path_ancestors() {
        // Test the implementation of `delete_container_chain_db`
1
        let db_path = PathBuf::from("/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002/paritydb/full-container-2002");
1
        let parent = db_path.ancestors().nth(2).unwrap();
1
        assert_eq!(
            parent,
1
            PathBuf::from(
                "/tmp/zombienet/Collator2002-01/data/containers/chains/simple_container_2002"
            )
        )
1
    }
    #[test]
1
    fn para_id_from_folder_name() {
1
        assert_eq!(parse_para_id_from_folder_name(""), None,);
1
        assert_eq!(parse_para_id_from_folder_name("full"), None,);
1
        assert_eq!(parse_para_id_from_folder_name("full-container"), None,);
1
        assert_eq!(parse_para_id_from_folder_name("full-container-"), None,);
1
        assert_eq!(
1
            parse_para_id_from_folder_name("full-container-2000"),
1
            Some(ParaId::from(2000)),
        );
1
    }
}