1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::spawner::{wait_for_paritydb_lock, Spawner},
19
    dc_orchestrator_chain_interface::{
20
        DataPreserverAssignment, OrchestratorChainError, OrchestratorChainInterface,
21
        OrchestratorChainResult,
22
    },
23
    futures::stream::StreamExt,
24
    std::{future::Future, time::Duration},
25
    tc_consensus::ParaId,
26
};
27

            
28
pub type ProfileId = <dancebox_runtime::Runtime as pallet_data_preservers::Config>::ProfileId;
29

            
30
1
async fn try_fut<T, E>(fut: impl Future<Output = Result<T, E>>) -> Result<T, E> {
31
7
    fut.await
32
}
33

            
34
/// Watch assignements by indefinitly listening to finalized block notifications and switching to
35
/// the chain the profile is assigned to.
36
#[sc_tracing::logging::prefix_logs_with("Data Preserver Assignment Watcher")]
37
pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) {
38
    use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment;
39

            
40
1
    if let OrchestratorChainResult::Err(e) = try_fut(async move {
41
1
        log::info!("Starting Data Preserver Assignment Watcher for profile #{profile_id}");
42

            
43
1
        let orchestrator_chain_interface = spawner.orchestrator_chain_interface();
44
1

            
45
1
        let mut current_assignment = DataPreserverAssignment::<ParaId>::NotAssigned;
46

            
47
1
        let mut stream = orchestrator_chain_interface
48
1
            .finality_notification_stream()
49
            .await?;
50

            
51
8
        while let Some(header) = stream.next().await {
52
7
            let hash = header.hash();
53

            
54
7
            let new_assignment = orchestrator_chain_interface
55
7
                .data_preserver_active_assignment(hash, profile_id)
56
                .await?;
57

            
58
7
            if current_assignment == new_assignment {
59
                continue;
60
7
            }
61
7

            
62
7
            log::info!(
63
                "Assignement changed at block {hash}: {current_assignment:?} => {new_assignment:?}"
64
            );
65

            
66
7
            match (current_assignment, new_assignment) {
67
                // switch from not assigned/inactive to active, start embeded node
68
                (
69
                    Assignment::NotAssigned | Assignment::Inactive(_),
70
2
                    Assignment::Active(para_id),
71
2
                ) => {
72
2
                    spawner.spawn(para_id, false).await;
73
                }
74
                // Assignement switches from active to inactive for same para_id, we stop the
75
                // embeded node but keep db
76
1
                (Assignment::Active(para_id), Assignment::Inactive(x)) if para_id == x => {
77
1
                    let db_path = spawner.stop(para_id, true); // keep db
78
1
                    if let Some(db_path) = db_path {
79
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
80
                            .await
81
                            .map_err(OrchestratorChainError::GenericError)?;
82
1
                    }
83
                }
84
                // No longer assigned or assigned inactive to other para id, remove previous node
85
                (
86
1
                    Assignment::Active(para_id),
87
1
                    Assignment::Inactive(_) | Assignment::NotAssigned,
88
1
                ) => {
89
1
                    spawner.stop(para_id, false); // don't keep db
90
1
                }
91
                // Changed para id, remove previous node and start new one
92
1
                (Assignment::Active(previous_para_id), Assignment::Active(para_id)) => {
93
1
                    let db_path = spawner.stop(previous_para_id, false); // don't keep db
94
1
                    if let Some(db_path) = db_path {
95
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
96
                            .await
97
                            .map_err(OrchestratorChainError::GenericError)?;
98
1
                    }
99

            
100
1
                    spawner.spawn(para_id, false).await;
101
                }
102
                // don't do anything yet
103
                (
104
                    Assignment::NotAssigned | Assignment::Inactive(_),
105
                    Assignment::NotAssigned | Assignment::Inactive(_),
106
2
                ) => (),
107
            }
108

            
109
7
            current_assignment = new_assignment;
110
        }
111

            
112
        Ok(())
113
    })
114
    .await
115
    {
116
        log::error!("Error in data preservers assignement watching task: {e:?}");
117
    }
118
}
119

            
120
#[cfg(test)]
121
mod tests {
122
    use {
123
        super::*,
124
        dc_orchestrator_chain_interface::{
125
            BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader,
126
        },
127
        dp_container_chain_genesis_data::ContainerChainGenesisData,
128
        futures::Stream,
129
        nimbus_primitives::NimbusId,
130
        polkadot_overseer::Handle,
131
        sc_client_api::StorageProof,
132
        sp_core::H256,
133
        std::{
134
            collections::BTreeMap,
135
            ops::DerefMut,
136
            path::PathBuf,
137
            pin::Pin,
138
            sync::{Arc, Mutex},
139
            time::Duration,
140
        },
141
        tokio::sync::broadcast,
142
    };
143

            
144
    struct MockChainInterface {
145
        state: Mutex<MockChainInterfaceState>,
146
        notification_sender: broadcast::Sender<PHeader>,
147
    }
148

            
149
    struct MockChainInterfaceState {
150
        next_block_number: BlockNumber,
151
        blocks: BTreeMap<H256, BlockAssignment>,
152
    }
153

            
154
    struct BlockAssignment {
155
        assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>,
156
    }
157

            
158
    impl MockChainInterface {
159
1
        fn new() -> Self {
160
1
            Self {
161
1
                state: Mutex::new(MockChainInterfaceState {
162
1
                    next_block_number: 0,
163
1
                    blocks: BTreeMap::new(),
164
1
                }),
165
1

            
166
1
                notification_sender: broadcast::Sender::new(100),
167
1
            }
168
1
        }
169

            
170
7
        fn mock_block(&self, assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>) {
171
7
            let mut state = self.state.lock().unwrap();
172
7
            state.next_block_number += 1;
173
7

            
174
7
            let header = PHeader {
175
7
                parent_hash: H256::zero(),
176
7
                number: state.next_block_number,
177
7
                state_root: H256::zero(),
178
7
                extrinsics_root: H256::zero(),
179
7
                digest: Default::default(),
180
7
            };
181
7
            let hash = header.hash();
182
7

            
183
7
            state.blocks.insert(hash, BlockAssignment { assignments });
184
7

            
185
7
            self.notification_sender
186
7
                .send(header)
187
7
                .expect("to properly send block header");
188
7
        }
189
    }
190

            
191
    #[async_trait::async_trait]
192
    impl OrchestratorChainInterface for MockChainInterface {
193
        fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
194
            unimplemented!("not used in test")
195
        }
196

            
197
        async fn get_storage_by_key(
198
            &self,
199
            _orchestrator_parent: PHash,
200
            _key: &[u8],
201
        ) -> OrchestratorChainResult<Option<Vec<u8>>> {
202
            unimplemented!("not used in test")
203
        }
204

            
205
        async fn prove_read(
206
            &self,
207
            _orchestrator_parent: PHash,
208
            _relevant_keys: &Vec<Vec<u8>>,
209
        ) -> OrchestratorChainResult<StorageProof> {
210
            unimplemented!("not used in test")
211
        }
212

            
213
        async fn import_notification_stream(
214
            &self,
215
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
216
            unimplemented!("not used in test")
217
        }
218

            
219
        async fn new_best_notification_stream(
220
            &self,
221
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
222
            unimplemented!("not used in test")
223
        }
224

            
225
        async fn finality_notification_stream(
226
            &self,
227
1
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
228
1
            let receiver = self.notification_sender.subscribe();
229
1
            let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
230
7
                .filter_map(|x| async { x.ok() });
231
1
            let stream = Box::pin(stream);
232
1
            Ok(stream)
233
2
        }
234

            
235
        async fn genesis_data(
236
            &self,
237
            _orchestrator_parent: PHash,
238
            _para_id: ParaId,
239
        ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
240
            unimplemented!("not used in test")
241
        }
242

            
243
        async fn boot_nodes(
244
            &self,
245
            _orchestrator_parent: PHash,
246
            _para_id: ParaId,
247
        ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
248
            unimplemented!("not used in test")
249
        }
250

            
251
        async fn latest_block_number(
252
            &self,
253
            _orchestrator_parent: PHash,
254
            _para_id: ParaId,
255
        ) -> OrchestratorChainResult<Option<BlockNumber>> {
256
            unimplemented!("not used in test")
257
        }
258

            
259
        async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
260
            unimplemented!("not used in test")
261
        }
262

            
263
        async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
264
            unimplemented!("not used in test")
265
        }
266

            
267
        async fn data_preserver_active_assignment(
268
            &self,
269
            orchestrator_parent: PHash,
270
            profile_id: DataPreserverProfileId,
271
7
        ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
272
7
            let mut state = self.state.lock().unwrap();
273
7
            let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| {
274
                OrchestratorChainError::GenericError("this block is not mocked".into())
275
7
            })?;
276

            
277
7
            Ok(block
278
7
                .assignments
279
7
                .get(&profile_id)
280
7
                .cloned()
281
7
                .unwrap_or(DataPreserverAssignment::NotAssigned))
282
14
        }
283

            
284
        async fn check_para_id_assignment(
285
            &self,
286
            _orchestrator_parent: PHash,
287
            _authority: NimbusId,
288
        ) -> OrchestratorChainResult<Option<ParaId>> {
289
            unimplemented!("not used in test")
290
        }
291

            
292
        async fn check_para_id_assignment_next_session(
293
            &self,
294
            _orchestrator_parent: PHash,
295
            _authority: NimbusId,
296
        ) -> OrchestratorChainResult<Option<ParaId>> {
297
            unimplemented!("not used in test")
298
        }
299
    }
300

            
301
    #[derive(Debug, PartialEq, Eq, Hash)]
302
    enum SpawnerEvent {
303
        Started(ParaId, bool),
304
        Stopped(ParaId, bool),
305
    }
306

            
307
    #[derive(Clone)]
308
    struct MockSpawner {
309
        state: Arc<Mutex<Vec<SpawnerEvent>>>,
310
        chain_interface: Arc<MockChainInterface>,
311
    }
312

            
313
    impl MockSpawner {
314
1
        fn new() -> Self {
315
1
            Self {
316
1
                state: Arc::new(Mutex::new(Vec::new())),
317
1
                chain_interface: Arc::new(MockChainInterface::new()),
318
1
            }
319
1
        }
320

            
321
7
        fn collect_events(&self) -> Vec<SpawnerEvent> {
322
7
            let mut events = vec![];
323
7
            let mut state = self.state.lock().unwrap();
324
7
            std::mem::swap(state.deref_mut(), &mut events);
325
7
            events
326
7
        }
327
    }
328

            
329
    impl Spawner for MockSpawner {
330
1
        fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
331
1
            self.chain_interface.clone()
332
1
        }
333

            
334
        /// Try to start a new container chain. In case of an error, this does not stop the node, and
335
        /// the container chain will be attempted to spawn again when the collator is reassigned to it.
336
        ///
337
        /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
338
        /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
339
        /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
340
        /// `handle_update_assignment`.
341
3
        fn spawn(
342
3
            &self,
343
3
            container_chain_para_id: ParaId,
344
3
            start_collation: bool,
345
3
        ) -> impl std::future::Future<Output = ()> + Send {
346
3
            let mut set = self.state.lock().unwrap();
347
3
            set.push(SpawnerEvent::Started(
348
3
                container_chain_para_id,
349
3
                start_collation,
350
3
            ));
351

            
352
3
            async {}
353
3
        }
354

            
355
        /// Stop a container chain. Prints a warning if the container chain was not running.
356
        /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
357
        /// to ensure that the container chain has fully stopped. The database path can be `None` if the
358
        /// chain was not running.
359
3
        fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
360
3
            let mut set = self.state.lock().unwrap();
361
3
            set.push(SpawnerEvent::Stopped(container_chain_para_id, keep_db));
362
3

            
363
3
            None
364
3
        }
365
    }
366

            
367
    #[tokio::test]
368
1
    async fn task_logic_works() {
369
1
        let spawner = MockSpawner::new();
370
1

            
371
1
        let profile_id = 0;
372
1
        let para_id1 = ParaId::from(1);
373
1
        let para_id2 = ParaId::from(2);
374
1

            
375
1
        tokio::spawn(task_watch_assignment(spawner.clone(), profile_id));
376
1
        // Wait for task to start and subscribe to block stream.
377
1
        tokio::time::sleep(Duration::from_millis(100)).await;
378
1

            
379
1
        spawner.chain_interface.mock_block({
380
1
            let mut map = BTreeMap::new();
381
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
382
1
            map
383
1
        });
384
1
        tokio::time::sleep(Duration::from_millis(100)).await;
385
1
        assert_eq!(
386
1
            spawner.collect_events(),
387
1
            vec![SpawnerEvent::Started(para_id1, false)]
388
1
        );
389
1

            
390
1
        spawner.chain_interface.mock_block({
391
1
            let mut map = BTreeMap::new();
392
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
393
1
            map
394
1
        });
395
1
        tokio::time::sleep(Duration::from_millis(100)).await;
396
1
        assert_eq!(
397
1
            spawner.collect_events(),
398
1
            vec![SpawnerEvent::Stopped(para_id1, false)]
399
1
        );
400
1

            
401
1
        spawner.chain_interface.mock_block({
402
1
            let mut map = BTreeMap::new();
403
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id2));
404
1
            map
405
1
        });
406
1
        tokio::time::sleep(Duration::from_millis(100)).await;
407
1
        assert_eq!(
408
1
            spawner.collect_events(),
409
1
            vec![SpawnerEvent::Started(para_id2, false)]
410
1
        );
411
1

            
412
1
        spawner.chain_interface.mock_block({
413
1
            let mut map = BTreeMap::new();
414
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
415
1
            map
416
1
        });
417
1
        tokio::time::sleep(Duration::from_millis(100)).await;
418
1
        assert_eq!(
419
1
            spawner.collect_events(),
420
1
            vec![
421
1
                SpawnerEvent::Stopped(para_id2, false),
422
1
                SpawnerEvent::Started(para_id1, false)
423
1
            ]
424
1
        );
425
1

            
426
1
        spawner.chain_interface.mock_block({
427
1
            let mut map = BTreeMap::new();
428
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id1));
429
1
            map
430
1
        });
431
1
        tokio::time::sleep(Duration::from_millis(100)).await;
432
1
        assert_eq!(
433
1
            spawner.collect_events(),
434
1
            vec![SpawnerEvent::Stopped(para_id1, true)]
435
1
        );
436
1

            
437
1
        spawner.chain_interface.mock_block({
438
1
            let mut map = BTreeMap::new();
439
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id2));
440
1
            map
441
1
        });
442
1
        tokio::time::sleep(Duration::from_millis(100)).await;
443
1
        assert_eq!(spawner.collect_events(), vec![]);
444
1

            
445
1
        spawner.chain_interface.mock_block({
446
1
            let mut map = BTreeMap::new();
447
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
448
1
            map
449
1
        });
450
1
        tokio::time::sleep(Duration::from_millis(100)).await;
451
1
        assert_eq!(spawner.collect_events(), vec![]);
452
1
    }
453
}