1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    crate::{
19
        service::ContainerChainBackend,
20
        spawner::{CcSpawnMsg, ContainerChainSpawnerState},
21
    },
22
    cumulus_primitives_core::ParaId,
23
    frame_support::DefaultNoBound,
24
    std::{
25
        any::Any,
26
        cell::Cell,
27
        collections::VecDeque,
28
        sync::{Arc, Mutex},
29
        time::Instant,
30
    },
31
    tokio::{
32
        sync::mpsc::UnboundedSender,
33
        time::{sleep, Duration},
34
    },
35
};
36

            
37
#[derive(DefaultNoBound)]
38
pub struct SpawnedContainersMonitor {
39
    /// List of the N most recently started container chains, with some statistics related to
40
    /// stopping time and reference count.
41
    list: VecDeque<SpawnedContainer>,
42
    /// Count the number of times a container chain has been started
43
    count: usize,
44
}
45

            
46
pub struct SpawnedContainer {
47
    /// Unique identifier for a spawned container (not ParaId)
48
    pub id: usize,
49
    /// Container chain para id
50
    pub para_id: ParaId,
51
    /// When did the container chain start
52
    pub start_time: Instant,
53
    /// When the container chain was asked to stop (`StopContainerChain` was dropped)
54
    pub stop_signal_time: Option<Instant>,
55
    /// When the container chain task manager was dropped, this should finish all the background
56
    /// tasks except the ones started in separate threads.
57
    pub stop_task_manager_time: Option<Instant>,
58
    /// When the `monitor_task` first observed that the reference counts are all 0.
59
    /// This won't be precise because it is checked using polling with a high period.
60
    pub stop_refcount_time: Cell<Option<Instant>>,
61
    /// Used to check the reference count, if it's 0 it means the database has been closed
62
    pub backend: std::sync::Weak<ContainerChainBackend>,
63
    /// Used to check the reference count, if it's 0 it means that the client has been closed.
64
    pub client: std::sync::Weak<dyn Any + Send + Sync>,
65
}
66

            
67
impl SpawnedContainer {
68
20
    pub fn is_stopped(&self) -> bool {
69
20
        self.stop_refcount_time.get().is_some() || {
70
            // Check reference count, and set stop_refcount_time if zero
71
15
            let refcount_zero = self.backend.strong_count() == 0 && self.client.strong_count() == 0;
72
15
            if refcount_zero {
73
15
                self.stop_refcount_time.set(Some(Instant::now()));
74
15

            
75
15
                true
76
            } else {
77
                false
78
            }
79
        }
80
20
    }
81

            
82
    pub fn summary(&self) -> String {
83
        #[derive(Debug)]
84
        #[allow(unused)]
85
        struct SpawnedContainerSummary {
86
            id: usize,
87
            para_id: ParaId,
88
            time_start_to_now: Duration,
89
            time_start_to_stop_signal: Option<Duration>,
90
            time_stop_signal_to_stop_task_manager: Option<Duration>,
91
            time_stop_task_manager_to_stop_refcount: Option<Duration>,
92
            time_stop_refcount_to_now: Option<Duration>,
93
            backend_refcount: usize,
94
            client_refcount: usize,
95
        }
96

            
97
        let summary = SpawnedContainerSummary {
98
            id: self.id,
99
            para_id: self.para_id,
100
            time_start_to_now: Instant::now().duration_since(self.start_time),
101
            time_start_to_stop_signal: self
102
                .stop_signal_time
103
                .map(|x| x.duration_since(self.start_time)),
104
            time_stop_signal_to_stop_task_manager: self
105
                .stop_task_manager_time
106
                .and_then(|x| Some(x.duration_since(self.stop_signal_time?))),
107
            time_stop_task_manager_to_stop_refcount: self
108
                .stop_refcount_time
109
                .get()
110
                .and_then(|x| Some(x.duration_since(self.stop_task_manager_time?))),
111
            time_stop_refcount_to_now: self
112
                .stop_refcount_time
113
                .get()
114
                .map(|x| Instant::now().duration_since(x)),
115
            backend_refcount: self.backend.strong_count(),
116
            client_refcount: self.client.strong_count(),
117
        };
118

            
119
        format!("{:?}", summary)
120
    }
121
}
122

            
123
impl SpawnedContainersMonitor {
124
    /// Returns a unique id which is not the ParaId
125
20
    pub fn push(&mut self, mut x: SpawnedContainer) -> usize {
126
20
        assert_eq!(x.id, 0, "SpawnedContainer.id must be set to 0, the actual id will be returned from push function");
127
20
        let id = self.count;
128
20
        x.id = id;
129
20
        self.list.push_back(x);
130
20
        self.count += 1;
131
20

            
132
20
        id
133
20
    }
134

            
135
    pub fn set_stop_signal_time(&mut self, id: usize, when: Instant) {
136
        let i = self.list.iter().position(|x| x.id == id);
137

            
138
        if let Some(i) = i {
139
            self.list[i].stop_signal_time = Some(when);
140
        }
141
    }
142

            
143
    pub fn set_stop_task_manager_time(&mut self, id: usize, when: Instant) {
144
        let i = self.list.iter().position(|x| x.id == id);
145

            
146
        if let Some(i) = i {
147
            self.list[i].stop_task_manager_time = Some(when);
148
        }
149
    }
150

            
151
    #[allow(unused)]
152
    pub fn set_stop_refcount_time(&mut self, id: usize, when: Instant) {
153
        let i = self.list.iter().position(|x| x.id == id);
154

            
155
        if let Some(i) = i {
156
            self.list[i].stop_refcount_time.set(Some(when));
157
        }
158
    }
159

            
160
    pub fn running_chains(&self) -> Vec<&SpawnedContainer> {
161
        self.list
162
            .iter()
163
            .filter(|container| !container.is_stopped())
164
            .collect()
165
    }
166

            
167
    #[allow(unused)]
168
2
    pub fn truncate_old(&mut self, new_len: usize) {
169
2
        if self.list.len() <= new_len {
170
1
            return;
171
1
        }
172
1

            
173
1
        let idx_new_first_element = self.list.len() - new_len;
174
1
        self.list.drain(0..idx_new_first_element);
175
2
    }
176

            
177
    #[allow(clippy::result_unit_err)]
178
2
    pub fn truncate_old_stopped_chains(&mut self, new_len: usize) -> Result<(), ()> {
179
2
        if self.list.len() <= new_len {
180
1
            return Ok(());
181
1
        }
182
1

            
183
1
        let mut to_retain = self.list.len() - new_len;
184
15
        self.list.retain(|container| {
185
15
            if to_retain == 0 {
186
10
                return true;
187
5
            }
188
5

            
189
5
            if container.is_stopped() {
190
5
                to_retain -= 1;
191
5
                false
192
            } else {
193
                true
194
            }
195
15
        });
196
1

            
197
1
        if self.list.len() <= new_len {
198
1
            Ok(())
199
        } else {
200
            Err(())
201
        }
202
2
    }
203
}
204

            
205
/// Background task that monitors the number of running container chains.
206
pub async fn monitor_task(state: Arc<Mutex<ContainerChainSpawnerState>>) {
207
    // Main loop frequency, doesn't need to be fast
208
    let monitor_period = Duration::from_secs(300 * 0 + 10);
209
    // Max number of allowed container chains before printing warnings.
210
    // There should be at most 2 container chains running at the same time (1 syncing + 1 collating),
211
    // but add a margin of error because a container chain may take a few seconds to stop.
212
    let max_running_container_chains = 4;
213

            
214
    loop {
215
        sleep(monitor_period).await;
216
        log::debug!("Monitor tick");
217
        let mut state = state.lock().unwrap();
218
        let monitor_state = &mut state.spawned_containers_monitor;
219

            
220
        let running_chains = monitor_state.running_chains();
221
        let running_para_ids: Vec<ParaId> = running_chains.iter().map(|x| x.para_id).collect();
222
        if running_chains.len() > max_running_container_chains {
223
            log::warn!("Too many container chains running at the same time");
224
            log::warn!(
225
                "Running container chains: {}: {:?}",
226
                running_chains.len(),
227
                running_para_ids
228
            );
229
            log::debug!(
230
                "{:?}",
231
                running_chains
232
                    .iter()
233
                    .map(|x| x.summary())
234
                    .collect::<Vec<_>>()
235
            )
236
        } else {
237
            log::debug!(
238
                "Running container chains: {}: {:?}",
239
                running_chains.len(),
240
                running_para_ids
241
            );
242
        }
243

            
244
        // Remove stopped container chains to keep the list small
245
        let _ = monitor_state.truncate_old_stopped_chains(10);
246
    }
247
}
248

            
249
#[allow(unused)]
250
/// Start and stop the same container chain in a loop, used for testing and debugging
251
pub async fn debug_start_and_stop_same_cc(cc_spawn_tx: UnboundedSender<CcSpawnMsg>) {
252
    let sleep_delay = Duration::from_secs(10);
253

            
254
    loop {
255
        sleep(sleep_delay).await;
256
        cc_spawn_tx
257
            .send(CcSpawnMsg::UpdateAssignment {
258
                current: Some(2000u32.into()),
259
                next: None,
260
            })
261
            .unwrap();
262
        sleep(sleep_delay).await;
263
        cc_spawn_tx
264
            .send(CcSpawnMsg::UpdateAssignment {
265
                current: None,
266
                next: None,
267
            })
268
            .unwrap();
269
        sleep(sleep_delay).await;
270
        cc_spawn_tx
271
            .send(CcSpawnMsg::UpdateAssignment {
272
                current: None,
273
                next: Some(2001u32.into()),
274
            })
275
            .unwrap();
276
        sleep(sleep_delay).await;
277
        cc_spawn_tx
278
            .send(CcSpawnMsg::UpdateAssignment {
279
                current: None,
280
                next: None,
281
            })
282
            .unwrap();
283
    }
284
}
285

            
286
#[cfg(test)]
287
mod tests {
288
    use super::*;
289

            
290
    #[test]
291
1
    fn test_truncate() {
292
1
        let mut monitor = SpawnedContainersMonitor::default();
293
20
        let default_container = || SpawnedContainer {
294
20
            id: Default::default(),
295
20
            para_id: Default::default(),
296
20
            start_time: Instant::now(),
297
20
            stop_signal_time: Default::default(),
298
20
            stop_task_manager_time: Default::default(),
299
20
            stop_refcount_time: Default::default(),
300
20
            backend: Default::default(),
301
20
            client: std::sync::Weak::<()>::new(),
302
20
        };
303

            
304
        // Truncating empty list does not panic
305
1
        monitor.truncate_old(0);
306
1
        monitor.truncate_old_stopped_chains(0).unwrap();
307

            
308
21
        for _ in 0..20 {
309
20
            monitor.push(default_container());
310
20
        }
311

            
312
1
        assert_eq!(monitor.list.len(), 20);
313
1
        assert_eq!(monitor.count, 20);
314

            
315
1
        monitor.truncate_old(15);
316
1
        assert_eq!(monitor.list.len(), 15);
317
1
        assert_eq!(monitor.count, 20);
318
        // Truncate should remove the oldest stopped chains, so the first id is now 5
319
1
        assert_eq!(monitor.list.front().map(|x| x.id), Some(5));
320

            
321
        // We are using Default::default which has a refcount of 0, so all chains are considered stopped
322
15
        assert!(monitor.list.iter().all(|x| x.is_stopped()));
323
1
        monitor.truncate_old_stopped_chains(10).unwrap();
324
1
        assert_eq!(monitor.list.len(), 10);
325
1
        assert_eq!(monitor.count, 20);
326
        // Truncate should remove the oldest stopped chains, so the first id is now 10
327
1
        assert_eq!(monitor.list.front().map(|x| x.id), Some(10));
328
1
    }
329
}