1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::spawner::{wait_for_paritydb_lock, Spawner},
19
    dc_orchestrator_chain_interface::{
20
        DataPreserverAssignment, OrchestratorChainError, OrchestratorChainInterface,
21
        OrchestratorChainResult,
22
    },
23
    frame_support::__private::sp_tracing::tracing::Instrument,
24
    futures::stream::StreamExt,
25
    std::{future::Future, time::Duration},
26
    tc_consensus::ParaId,
27
};
28

            
29
pub type ProfileId = <dancebox_runtime::Runtime as pallet_data_preservers::Config>::ProfileId;
30

            
31
1
async fn try_fut<T, E>(fut: impl Future<Output = Result<T, E>>) -> Result<T, E> {
32
1
    fut.await
33
}
34

            
35
/// Watch assignements by indefinitly listening to finalized block notifications and switching to
36
/// the chain the profile is assigned to.
37
1
pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) {
38
    use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment;
39

            
40
1
    if let OrchestratorChainResult::Err(e) = try_fut(async move {
41
1
        log::info!("Starting Data Preserver Assignment Watcher for profile #{profile_id}");
42

            
43
1
        let orchestrator_chain_interface = spawner.orchestrator_chain_interface();
44
1

            
45
1
        let mut current_assignment = DataPreserverAssignment::<ParaId>::NotAssigned;
46

            
47
1
        let mut stream = orchestrator_chain_interface
48
1
            .finality_notification_stream()
49
1
            .await?;
50

            
51
8
        while let Some(header) = stream.next().await {
52
7
            let hash = header.hash();
53

            
54
7
            let new_assignment = orchestrator_chain_interface
55
7
                .data_preserver_active_assignment(hash, profile_id)
56
7
                .await?;
57

            
58
7
            if current_assignment == new_assignment {
59
                continue;
60
7
            }
61
7

            
62
7
            log::info!(
63
                "Assignement changed at block {hash}: {current_assignment:?} => {new_assignment:?}"
64
            );
65

            
66
7
            match (current_assignment, new_assignment) {
67
                // switch from not assigned/inactive to active, start embeded node
68
                (
69
                    Assignment::NotAssigned | Assignment::Inactive(_),
70
2
                    Assignment::Active(para_id),
71
2
                ) => {
72
2
                    spawner.spawn(para_id, false).await;
73
                }
74
                // Assignement switches from active to inactive for same para_id, we stop the
75
                // embeded node but keep db
76
1
                (Assignment::Active(para_id), Assignment::Inactive(x)) if para_id == x => {
77
1
                    let db_path = spawner.stop(para_id, true); // keep db
78
1
                    if let Some(db_path) = db_path {
79
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
80
                            .await
81
                            .map_err(OrchestratorChainError::GenericError)?;
82
1
                    }
83
                }
84
                // No longer assigned or assigned inactive to other para id, remove previous node
85
                (
86
1
                    Assignment::Active(para_id),
87
1
                    Assignment::Inactive(_) | Assignment::NotAssigned,
88
1
                ) => {
89
1
                    spawner.stop(para_id, false); // don't keep db
90
1
                }
91
                // Changed para id, remove previous node and start new one
92
1
                (Assignment::Active(previous_para_id), Assignment::Active(para_id)) => {
93
1
                    let db_path = spawner.stop(previous_para_id, false); // don't keep db
94
1
                    if let Some(db_path) = db_path {
95
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
96
                            .await
97
                            .map_err(OrchestratorChainError::GenericError)?;
98
1
                    }
99

            
100
1
                    spawner.spawn(para_id, false).await;
101
                }
102
                // don't do anything yet
103
                (
104
                    Assignment::NotAssigned | Assignment::Inactive(_),
105
                    Assignment::NotAssigned | Assignment::Inactive(_),
106
2
                ) => (),
107
            }
108

            
109
7
            current_assignment = new_assignment;
110
        }
111

            
112
        Ok(())
113
1
    })
114
1
    .instrument(sc_tracing::tracing::info_span!(
115
1
        sc_tracing::logging::PREFIX_LOG_SPAN,
116
1
        name = "Data Preserver Assignment Watcher",
117
1
    ))
118
1
    .await
119
    {
120
        log::error!("Error in data preservers assignement watching task: {e:?}");
121
    }
122
}
123

            
124
#[cfg(test)]
125
mod tests {
126
    use {
127
        super::*,
128
        dc_orchestrator_chain_interface::{
129
            BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader,
130
        },
131
        dp_container_chain_genesis_data::ContainerChainGenesisData,
132
        futures::Stream,
133
        nimbus_primitives::NimbusId,
134
        polkadot_overseer::Handle,
135
        sc_client_api::StorageProof,
136
        sp_core::H256,
137
        std::{
138
            collections::BTreeMap,
139
            ops::DerefMut,
140
            path::PathBuf,
141
            pin::Pin,
142
            sync::{Arc, Mutex},
143
            time::Duration,
144
        },
145
        tokio::sync::broadcast,
146
    };
147

            
148
    struct MockChainInterface {
149
        state: Mutex<MockChainInterfaceState>,
150
        notification_sender: broadcast::Sender<PHeader>,
151
    }
152

            
153
    struct MockChainInterfaceState {
154
        next_block_number: BlockNumber,
155
        blocks: BTreeMap<H256, BlockAssignment>,
156
    }
157

            
158
    struct BlockAssignment {
159
        assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>,
160
    }
161

            
162
    impl MockChainInterface {
163
1
        fn new() -> Self {
164
1
            Self {
165
1
                state: Mutex::new(MockChainInterfaceState {
166
1
                    next_block_number: 0,
167
1
                    blocks: BTreeMap::new(),
168
1
                }),
169
1

            
170
1
                notification_sender: broadcast::Sender::new(100),
171
1
            }
172
1
        }
173

            
174
7
        fn mock_block(&self, assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>) {
175
7
            let mut state = self.state.lock().unwrap();
176
7
            state.next_block_number += 1;
177
7

            
178
7
            let header = PHeader {
179
7
                parent_hash: H256::zero(),
180
7
                number: state.next_block_number,
181
7
                state_root: H256::zero(),
182
7
                extrinsics_root: H256::zero(),
183
7
                digest: Default::default(),
184
7
            };
185
7
            let hash = header.hash();
186
7

            
187
7
            state.blocks.insert(hash, BlockAssignment { assignments });
188
7

            
189
7
            self.notification_sender
190
7
                .send(header)
191
7
                .expect("to properly send block header");
192
7
        }
193
    }
194

            
195
    #[async_trait::async_trait]
196
    impl OrchestratorChainInterface for MockChainInterface {
197
        fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
198
            unimplemented!("not used in test")
199
        }
200

            
201
        async fn get_storage_by_key(
202
            &self,
203
            _orchestrator_parent: PHash,
204
            _key: &[u8],
205
        ) -> OrchestratorChainResult<Option<Vec<u8>>> {
206
            unimplemented!("not used in test")
207
        }
208

            
209
        async fn prove_read(
210
            &self,
211
            _orchestrator_parent: PHash,
212
            _relevant_keys: &Vec<Vec<u8>>,
213
        ) -> OrchestratorChainResult<StorageProof> {
214
            unimplemented!("not used in test")
215
        }
216

            
217
        async fn import_notification_stream(
218
            &self,
219
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
220
            unimplemented!("not used in test")
221
        }
222

            
223
        async fn new_best_notification_stream(
224
            &self,
225
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
226
            unimplemented!("not used in test")
227
        }
228

            
229
        async fn finality_notification_stream(
230
            &self,
231
1
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
232
1
            let receiver = self.notification_sender.subscribe();
233
1
            let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
234
7
                .filter_map(|x| async { x.ok() });
235
1
            let stream = Box::pin(stream);
236
1
            Ok(stream)
237
2
        }
238

            
239
        async fn genesis_data(
240
            &self,
241
            _orchestrator_parent: PHash,
242
            _para_id: ParaId,
243
        ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
244
            unimplemented!("not used in test")
245
        }
246

            
247
        async fn boot_nodes(
248
            &self,
249
            _orchestrator_parent: PHash,
250
            _para_id: ParaId,
251
        ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
252
            unimplemented!("not used in test")
253
        }
254

            
255
        async fn latest_block_number(
256
            &self,
257
            _orchestrator_parent: PHash,
258
            _para_id: ParaId,
259
        ) -> OrchestratorChainResult<Option<BlockNumber>> {
260
            unimplemented!("not used in test")
261
        }
262

            
263
        async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
264
            unimplemented!("not used in test")
265
        }
266

            
267
        async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
268
            unimplemented!("not used in test")
269
        }
270

            
271
        async fn data_preserver_active_assignment(
272
            &self,
273
            orchestrator_parent: PHash,
274
            profile_id: DataPreserverProfileId,
275
7
        ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
276
7
            let mut state = self.state.lock().unwrap();
277
7
            let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| {
278
                OrchestratorChainError::GenericError("this block is not mocked".into())
279
7
            })?;
280

            
281
7
            Ok(block
282
7
                .assignments
283
7
                .get(&profile_id)
284
7
                .cloned()
285
7
                .unwrap_or(DataPreserverAssignment::NotAssigned))
286
14
        }
287

            
288
        async fn check_para_id_assignment(
289
            &self,
290
            _orchestrator_parent: PHash,
291
            _authority: NimbusId,
292
        ) -> OrchestratorChainResult<Option<ParaId>> {
293
            unimplemented!("not used in test")
294
        }
295

            
296
        async fn check_para_id_assignment_next_session(
297
            &self,
298
            _orchestrator_parent: PHash,
299
            _authority: NimbusId,
300
        ) -> OrchestratorChainResult<Option<ParaId>> {
301
            unimplemented!("not used in test")
302
        }
303
    }
304

            
305
    #[derive(Debug, PartialEq, Eq, Hash)]
306
    enum SpawnerEvent {
307
        Started(ParaId, bool),
308
        Stopped(ParaId, bool),
309
    }
310

            
311
    #[derive(Clone)]
312
    struct MockSpawner {
313
        state: Arc<Mutex<Vec<SpawnerEvent>>>,
314
        chain_interface: Arc<MockChainInterface>,
315
    }
316

            
317
    impl MockSpawner {
318
1
        fn new() -> Self {
319
1
            Self {
320
1
                state: Arc::new(Mutex::new(Vec::new())),
321
1
                chain_interface: Arc::new(MockChainInterface::new()),
322
1
            }
323
1
        }
324

            
325
7
        fn collect_events(&self) -> Vec<SpawnerEvent> {
326
7
            let mut events = vec![];
327
7
            let mut state = self.state.lock().unwrap();
328
7
            std::mem::swap(state.deref_mut(), &mut events);
329
7
            events
330
7
        }
331
    }
332

            
333
    impl Spawner for MockSpawner {
334
1
        fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
335
1
            self.chain_interface.clone()
336
1
        }
337

            
338
        /// Try to start a new container chain. In case of an error, this does not stop the node, and
339
        /// the container chain will be attempted to spawn again when the collator is reassigned to it.
340
        ///
341
        /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
342
        /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
343
        /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
344
        /// `handle_update_assignment`.
345
3
        fn spawn(
346
3
            &self,
347
3
            container_chain_para_id: ParaId,
348
3
            start_collation: bool,
349
3
        ) -> impl std::future::Future<Output = ()> + Send {
350
3
            let mut set = self.state.lock().unwrap();
351
3
            set.push(SpawnerEvent::Started(
352
3
                container_chain_para_id,
353
3
                start_collation,
354
3
            ));
355

            
356
3
            async {}
357
3
        }
358

            
359
        /// Stop a container chain. Prints a warning if the container chain was not running.
360
        /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
361
        /// to ensure that the container chain has fully stopped. The database path can be `None` if the
362
        /// chain was not running.
363
3
        fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
364
3
            let mut set = self.state.lock().unwrap();
365
3
            set.push(SpawnerEvent::Stopped(container_chain_para_id, keep_db));
366
3

            
367
3
            None
368
3
        }
369
    }
370

            
371
    #[tokio::test]
372
1
    async fn task_logic_works() {
373
1
        let spawner = MockSpawner::new();
374
1

            
375
1
        let profile_id = 0;
376
1
        let para_id1 = ParaId::from(1);
377
1
        let para_id2 = ParaId::from(2);
378
1

            
379
1
        tokio::spawn(task_watch_assignment(spawner.clone(), profile_id));
380
1
        // Wait for task to start and subscribe to block stream.
381
1
        tokio::time::sleep(Duration::from_millis(100)).await;
382
1

            
383
1
        spawner.chain_interface.mock_block({
384
1
            let mut map = BTreeMap::new();
385
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
386
1
            map
387
1
        });
388
1
        tokio::time::sleep(Duration::from_millis(100)).await;
389
1
        assert_eq!(
390
1
            spawner.collect_events(),
391
1
            vec![SpawnerEvent::Started(para_id1, false)]
392
1
        );
393
1

            
394
1
        spawner.chain_interface.mock_block({
395
1
            let mut map = BTreeMap::new();
396
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
397
1
            map
398
1
        });
399
1
        tokio::time::sleep(Duration::from_millis(100)).await;
400
1
        assert_eq!(
401
1
            spawner.collect_events(),
402
1
            vec![SpawnerEvent::Stopped(para_id1, false)]
403
1
        );
404
1

            
405
1
        spawner.chain_interface.mock_block({
406
1
            let mut map = BTreeMap::new();
407
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id2));
408
1
            map
409
1
        });
410
1
        tokio::time::sleep(Duration::from_millis(100)).await;
411
1
        assert_eq!(
412
1
            spawner.collect_events(),
413
1
            vec![SpawnerEvent::Started(para_id2, false)]
414
1
        );
415
1

            
416
1
        spawner.chain_interface.mock_block({
417
1
            let mut map = BTreeMap::new();
418
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
419
1
            map
420
1
        });
421
1
        tokio::time::sleep(Duration::from_millis(100)).await;
422
1
        assert_eq!(
423
1
            spawner.collect_events(),
424
1
            vec![
425
1
                SpawnerEvent::Stopped(para_id2, false),
426
1
                SpawnerEvent::Started(para_id1, false)
427
1
            ]
428
1
        );
429
1

            
430
1
        spawner.chain_interface.mock_block({
431
1
            let mut map = BTreeMap::new();
432
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id1));
433
1
            map
434
1
        });
435
1
        tokio::time::sleep(Duration::from_millis(100)).await;
436
1
        assert_eq!(
437
1
            spawner.collect_events(),
438
1
            vec![SpawnerEvent::Stopped(para_id1, true)]
439
1
        );
440
1

            
441
1
        spawner.chain_interface.mock_block({
442
1
            let mut map = BTreeMap::new();
443
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id2));
444
1
            map
445
1
        });
446
1
        tokio::time::sleep(Duration::from_millis(100)).await;
447
1
        assert_eq!(spawner.collect_events(), vec![]);
448
1

            
449
1
        spawner.chain_interface.mock_block({
450
1
            let mut map = BTreeMap::new();
451
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
452
1
            map
453
1
        });
454
1
        tokio::time::sleep(Duration::from_millis(100)).await;
455
1
        assert_eq!(spawner.collect_events(), vec![]);
456
1
    }
457
}