1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
use {
18
    crate::spawner::{wait_for_paritydb_lock, Spawner},
19
    dc_orchestrator_chain_interface::{
20
        DataPreserverAssignment, OrchestratorChainError, OrchestratorChainInterface,
21
        OrchestratorChainResult,
22
    },
23
    futures::stream::StreamExt,
24
    std::{future::Future, time::Duration},
25
    tc_consensus::ParaId,
26
};
27

            
28
pub type ProfileId = <dancebox_runtime::Runtime as pallet_data_preservers::Config>::ProfileId;
29

            
30
1
async fn try_fut<T, E>(fut: impl Future<Output = Result<T, E>>) -> Result<T, E> {
31
7
    fut.await
32
}
33

            
34
/// Watch assignements by indefinitly listening to finalized block notifications and switching to
35
/// the chain the profile is assigned to.
36
1
pub async fn task_watch_assignment(spawner: impl Spawner, profile_id: ProfileId) {
37
    use dc_orchestrator_chain_interface::DataPreserverAssignment as Assignment;
38

            
39
1
    if let OrchestratorChainResult::Err(e) = try_fut(async move {
40
1
        let orchestrator_chain_interface = spawner.orchestrator_chain_interface();
41
1

            
42
1
        let mut current_assignment = DataPreserverAssignment::<ParaId>::NotAssigned;
43

            
44
1
        let mut stream = orchestrator_chain_interface
45
1
            .finality_notification_stream()
46
            .await?;
47

            
48
8
        while let Some(header) = stream.next().await {
49
7
            let hash = header.hash();
50

            
51
7
            let new_assignment = orchestrator_chain_interface
52
7
                .data_preserver_active_assignment(hash, profile_id)
53
                .await?;
54

            
55
7
            log::info!("Assignement for block {hash}: {new_assignment:?}");
56

            
57
7
            match (current_assignment, new_assignment) {
58
                // no change
59
7
                (x, y) if x == y => continue,
60
                // switch from not assigned/inactive to active, start embeded node
61
                (
62
                    Assignment::NotAssigned | Assignment::Inactive(_),
63
2
                    Assignment::Active(para_id),
64
2
                ) => {
65
2
                    spawner.spawn(para_id, false).await;
66
                }
67
                // Assignement switches from active to inactive for same para_id, we stop the
68
                // embeded node but keep db
69
1
                (Assignment::Active(para_id), Assignment::Inactive(x)) if para_id == x => {
70
1
                    let db_path = spawner.stop(para_id, true); // keep db
71
1
                    if let Some(db_path) = db_path {
72
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
73
                            .await
74
                            .map_err(OrchestratorChainError::GenericError)?;
75
1
                    }
76
                }
77
                // No longer assigned or assigned inactive to other para id, remove previous node
78
                (
79
1
                    Assignment::Active(para_id),
80
1
                    Assignment::Inactive(_) | Assignment::NotAssigned,
81
1
                ) => {
82
1
                    spawner.stop(para_id, false); // don't keep db
83
1
                }
84
                // Changed para id, remove previous node and start new one
85
1
                (Assignment::Active(previous_para_id), Assignment::Active(para_id)) => {
86
1
                    let db_path = spawner.stop(previous_para_id, false); // don't keep db
87
1
                    if let Some(db_path) = db_path {
88
                        wait_for_paritydb_lock(&db_path, Duration::from_secs(10))
89
                            .await
90
                            .map_err(OrchestratorChainError::GenericError)?;
91
1
                    }
92

            
93
1
                    spawner.spawn(para_id, false).await;
94
                }
95
                // don't do anything yet
96
                (
97
                    Assignment::NotAssigned | Assignment::Inactive(_),
98
                    Assignment::NotAssigned | Assignment::Inactive(_),
99
2
                ) => (),
100
            }
101

            
102
7
            current_assignment = new_assignment;
103
        }
104

            
105
        Ok(())
106
1
    })
107
7
    .await
108
    {
109
        log::error!("Error in data preservers assignement watching task: {e:?}");
110
    }
111
}
112

            
113
#[cfg(test)]
114
mod tests {
115
    use {
116
        super::*,
117
        dc_orchestrator_chain_interface::{
118
            BlockNumber, DataPreserverProfileId, OrchestratorChainError, PHash, PHeader,
119
        },
120
        dp_container_chain_genesis_data::ContainerChainGenesisData,
121
        futures::Stream,
122
        nimbus_primitives::NimbusId,
123
        polkadot_overseer::Handle,
124
        sc_client_api::StorageProof,
125
        sp_core::H256,
126
        std::{
127
            collections::BTreeMap,
128
            ops::DerefMut,
129
            path::PathBuf,
130
            pin::Pin,
131
            sync::{Arc, Mutex},
132
            time::Duration,
133
        },
134
        tokio::sync::broadcast,
135
    };
136

            
137
    struct MockChainInterface {
138
        state: Mutex<MockChainInterfaceState>,
139
        notification_sender: broadcast::Sender<PHeader>,
140
    }
141

            
142
    struct MockChainInterfaceState {
143
        next_block_number: BlockNumber,
144
        blocks: BTreeMap<H256, BlockAssignment>,
145
    }
146

            
147
    struct BlockAssignment {
148
        assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>,
149
    }
150

            
151
    impl MockChainInterface {
152
1
        fn new() -> Self {
153
1
            Self {
154
1
                state: Mutex::new(MockChainInterfaceState {
155
1
                    next_block_number: 0,
156
1
                    blocks: BTreeMap::new(),
157
1
                }),
158
1

            
159
1
                notification_sender: broadcast::Sender::new(100),
160
1
            }
161
1
        }
162

            
163
7
        fn mock_block(&self, assignments: BTreeMap<ProfileId, DataPreserverAssignment<ParaId>>) {
164
7
            let mut state = self.state.lock().unwrap();
165
7
            state.next_block_number += 1;
166
7

            
167
7
            let header = PHeader {
168
7
                parent_hash: H256::zero(),
169
7
                number: state.next_block_number,
170
7
                state_root: H256::zero(),
171
7
                extrinsics_root: H256::zero(),
172
7
                digest: Default::default(),
173
7
            };
174
7
            let hash = header.hash();
175
7

            
176
7
            state.blocks.insert(hash, BlockAssignment { assignments });
177
7

            
178
7
            self.notification_sender
179
7
                .send(header)
180
7
                .expect("to properly send block header");
181
7
        }
182
    }
183

            
184
    #[async_trait::async_trait]
185
    impl OrchestratorChainInterface for MockChainInterface {
186
        fn overseer_handle(&self) -> OrchestratorChainResult<Handle> {
187
            unimplemented!("not used in test")
188
        }
189

            
190
        async fn get_storage_by_key(
191
            &self,
192
            _orchestrator_parent: PHash,
193
            _key: &[u8],
194
        ) -> OrchestratorChainResult<Option<Vec<u8>>> {
195
            unimplemented!("not used in test")
196
        }
197

            
198
        async fn prove_read(
199
            &self,
200
            _orchestrator_parent: PHash,
201
            _relevant_keys: &Vec<Vec<u8>>,
202
        ) -> OrchestratorChainResult<StorageProof> {
203
            unimplemented!("not used in test")
204
        }
205

            
206
        async fn import_notification_stream(
207
            &self,
208
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
209
            unimplemented!("not used in test")
210
        }
211

            
212
        async fn new_best_notification_stream(
213
            &self,
214
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
215
            unimplemented!("not used in test")
216
        }
217

            
218
        async fn finality_notification_stream(
219
            &self,
220
1
        ) -> OrchestratorChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
221
1
            let receiver = self.notification_sender.subscribe();
222
1
            let stream = tokio_stream::wrappers::BroadcastStream::new(receiver)
223
7
                .filter_map(|x| async { x.ok() });
224
1
            let stream = Box::pin(stream);
225
1
            Ok(stream)
226
2
        }
227

            
228
        async fn genesis_data(
229
            &self,
230
            _orchestrator_parent: PHash,
231
            _para_id: ParaId,
232
        ) -> OrchestratorChainResult<Option<ContainerChainGenesisData>> {
233
            unimplemented!("not used in test")
234
        }
235

            
236
        async fn boot_nodes(
237
            &self,
238
            _orchestrator_parent: PHash,
239
            _para_id: ParaId,
240
        ) -> OrchestratorChainResult<Vec<Vec<u8>>> {
241
            unimplemented!("not used in test")
242
        }
243

            
244
        async fn latest_block_number(
245
            &self,
246
            _orchestrator_parent: PHash,
247
            _para_id: ParaId,
248
        ) -> OrchestratorChainResult<Option<BlockNumber>> {
249
            unimplemented!("not used in test")
250
        }
251

            
252
        async fn best_block_hash(&self) -> OrchestratorChainResult<PHash> {
253
            unimplemented!("not used in test")
254
        }
255

            
256
        async fn finalized_block_hash(&self) -> OrchestratorChainResult<PHash> {
257
            unimplemented!("not used in test")
258
        }
259

            
260
        async fn data_preserver_active_assignment(
261
            &self,
262
            orchestrator_parent: PHash,
263
            profile_id: DataPreserverProfileId,
264
7
        ) -> OrchestratorChainResult<DataPreserverAssignment<ParaId>> {
265
7
            let mut state = self.state.lock().unwrap();
266
7
            let block = state.blocks.get_mut(&orchestrator_parent).ok_or_else(|| {
267
                OrchestratorChainError::GenericError("this block is not mocked".into())
268
7
            })?;
269

            
270
7
            Ok(block
271
7
                .assignments
272
7
                .get(&profile_id)
273
7
                .cloned()
274
7
                .unwrap_or(DataPreserverAssignment::NotAssigned))
275
14
        }
276

            
277
        async fn check_para_id_assignment(
278
            &self,
279
            _orchestrator_parent: PHash,
280
            _authority: NimbusId,
281
        ) -> OrchestratorChainResult<Option<ParaId>> {
282
            unimplemented!("not used in test")
283
        }
284

            
285
        async fn check_para_id_assignment_next_session(
286
            &self,
287
            _orchestrator_parent: PHash,
288
            _authority: NimbusId,
289
        ) -> OrchestratorChainResult<Option<ParaId>> {
290
            unimplemented!("not used in test")
291
        }
292
    }
293

            
294
    #[derive(Debug, PartialEq, Eq, Hash)]
295
    enum SpawnerEvent {
296
        Started(ParaId, bool),
297
        Stopped(ParaId, bool),
298
    }
299

            
300
    #[derive(Clone)]
301
    struct MockSpawner {
302
        state: Arc<Mutex<Vec<SpawnerEvent>>>,
303
        chain_interface: Arc<MockChainInterface>,
304
    }
305

            
306
    impl MockSpawner {
307
1
        fn new() -> Self {
308
1
            Self {
309
1
                state: Arc::new(Mutex::new(Vec::new())),
310
1
                chain_interface: Arc::new(MockChainInterface::new()),
311
1
            }
312
1
        }
313

            
314
7
        fn collect_events(&self) -> Vec<SpawnerEvent> {
315
7
            let mut events = vec![];
316
7
            let mut state = self.state.lock().unwrap();
317
7
            std::mem::swap(state.deref_mut(), &mut events);
318
7
            events
319
7
        }
320
    }
321

            
322
    impl Spawner for MockSpawner {
323
1
        fn orchestrator_chain_interface(&self) -> Arc<dyn OrchestratorChainInterface> {
324
1
            self.chain_interface.clone()
325
1
        }
326

            
327
        /// Try to start a new container chain. In case of an error, this does not stop the node, and
328
        /// the container chain will be attempted to spawn again when the collator is reassigned to it.
329
        ///
330
        /// It is possible that we try to spawn-stop-spawn the same chain, and the second spawn fails
331
        /// because the chain has not stopped yet, because `stop` does not wait for the chain to stop,
332
        /// so before calling `spawn` make sure to call `wait_for_paritydb_lock` before, like we do in
333
        /// `handle_update_assignment`.
334
3
        fn spawn(
335
3
            &self,
336
3
            container_chain_para_id: ParaId,
337
3
            start_collation: bool,
338
3
        ) -> impl std::future::Future<Output = ()> + Send {
339
3
            let mut set = self.state.lock().unwrap();
340
3
            set.push(SpawnerEvent::Started(
341
3
                container_chain_para_id,
342
3
                start_collation,
343
3
            ));
344

            
345
3
            async {}
346
3
        }
347

            
348
        /// Stop a container chain. Prints a warning if the container chain was not running.
349
        /// Returns the database path for the container chain, can be used with `wait_for_paritydb_lock`
350
        /// to ensure that the container chain has fully stopped. The database path can be `None` if the
351
        /// chain was not running.
352
3
        fn stop(&self, container_chain_para_id: ParaId, keep_db: bool) -> Option<PathBuf> {
353
3
            let mut set = self.state.lock().unwrap();
354
3
            set.push(SpawnerEvent::Stopped(container_chain_para_id, keep_db));
355
3

            
356
3
            None
357
3
        }
358
    }
359

            
360
    #[tokio::test]
361
1
    async fn task_logic_works() {
362
1
        let spawner = MockSpawner::new();
363
1

            
364
1
        let profile_id = 0;
365
1
        let para_id1 = ParaId::from(1);
366
1
        let para_id2 = ParaId::from(2);
367
1

            
368
1
        tokio::spawn(task_watch_assignment(spawner.clone(), profile_id));
369
1
        // Wait for task to start and subscribe to block stream.
370
1
        tokio::time::sleep(Duration::from_millis(100)).await;
371
1

            
372
1
        spawner.chain_interface.mock_block({
373
1
            let mut map = BTreeMap::new();
374
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
375
1
            map
376
1
        });
377
1
        tokio::time::sleep(Duration::from_millis(100)).await;
378
1
        assert_eq!(
379
1
            spawner.collect_events(),
380
1
            vec![SpawnerEvent::Started(para_id1, false)]
381
1
        );
382
1

            
383
1
        spawner.chain_interface.mock_block({
384
1
            let mut map = BTreeMap::new();
385
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
386
1
            map
387
1
        });
388
1
        tokio::time::sleep(Duration::from_millis(100)).await;
389
1
        assert_eq!(
390
1
            spawner.collect_events(),
391
1
            vec![SpawnerEvent::Stopped(para_id1, false)]
392
1
        );
393
1

            
394
1
        spawner.chain_interface.mock_block({
395
1
            let mut map = BTreeMap::new();
396
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id2));
397
1
            map
398
1
        });
399
1
        tokio::time::sleep(Duration::from_millis(100)).await;
400
1
        assert_eq!(
401
1
            spawner.collect_events(),
402
1
            vec![SpawnerEvent::Started(para_id2, false)]
403
1
        );
404
1

            
405
1
        spawner.chain_interface.mock_block({
406
1
            let mut map = BTreeMap::new();
407
1
            map.insert(profile_id, DataPreserverAssignment::Active(para_id1));
408
1
            map
409
1
        });
410
1
        tokio::time::sleep(Duration::from_millis(100)).await;
411
1
        assert_eq!(
412
1
            spawner.collect_events(),
413
1
            vec![
414
1
                SpawnerEvent::Stopped(para_id2, false),
415
1
                SpawnerEvent::Started(para_id1, false)
416
1
            ]
417
1
        );
418
1

            
419
1
        spawner.chain_interface.mock_block({
420
1
            let mut map = BTreeMap::new();
421
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id1));
422
1
            map
423
1
        });
424
1
        tokio::time::sleep(Duration::from_millis(100)).await;
425
1
        assert_eq!(
426
1
            spawner.collect_events(),
427
1
            vec![SpawnerEvent::Stopped(para_id1, true)]
428
1
        );
429
1

            
430
1
        spawner.chain_interface.mock_block({
431
1
            let mut map = BTreeMap::new();
432
1
            map.insert(profile_id, DataPreserverAssignment::Inactive(para_id2));
433
1
            map
434
1
        });
435
1
        tokio::time::sleep(Duration::from_millis(100)).await;
436
1
        assert_eq!(spawner.collect_events(), vec![]);
437
1

            
438
1
        spawner.chain_interface.mock_block({
439
1
            let mut map = BTreeMap::new();
440
1
            map.insert(profile_id, DataPreserverAssignment::NotAssigned);
441
1
            map
442
1
        });
443
1
        tokio::time::sleep(Duration::from_millis(100)).await;
444
1
        assert_eq!(spawner.collect_events(), vec![]);
445
1
    }
446
}