1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
use {
18
    crate::{
19
        service::ContainerChainBackend,
20
        spawner::{CcSpawnMsg, ContainerChainSpawnerState},
21
    },
22
    cumulus_primitives_core::ParaId,
23
    frame_support::DefaultNoBound,
24
    std::{
25
        any::Any,
26
        cell::Cell,
27
        collections::VecDeque,
28
        sync::{Arc, Mutex},
29
        time::Instant,
30
    },
31
    tokio::{
32
        sync::mpsc::UnboundedSender,
33
        time::{sleep, Duration},
34
    },
35
};
36

            
37
#[derive(DefaultNoBound)]
38
pub struct SpawnedContainersMonitor {
39
    /// List of the N most recently started container chains, with some statistics related to
40
    /// stopping time and reference count.
41
    list: VecDeque<SpawnedContainer>,
42
    /// Count the number of times a container chain has been started
43
    count: usize,
44
}
45

            
46
pub struct SpawnedContainer {
47
    /// Unique identifier for a spawned container (not ParaId)
48
    pub id: usize,
49
    /// Container chain para id
50
    pub para_id: ParaId,
51
    /// When did the container chain start
52
    pub start_time: Instant,
53
    /// When the container chain was asked to stop (`StopContainerChain` was dropped)
54
    pub stop_signal_time: Option<Instant>,
55
    /// When the container chain task manager was dropped, this should finish all the background
56
    /// tasks except the ones started in separate threads.
57
    pub stop_task_manager_time: Option<Instant>,
58
    /// When the `monitor_task` first observed that the reference counts are all 0.
59
    /// This won't be precise because it is checked using polling with a high period.
60
    pub stop_refcount_time: Cell<Option<Instant>>,
61
    /// Used to check the reference count, if it's 0 it means the database has been closed
62
    pub backend: std::sync::Weak<ContainerChainBackend>,
63
    /// Used to check the reference count, if it's 0 it means that the client has been closed.
64
    pub client: std::sync::Weak<dyn Any + Send + Sync>,
65
}
66

            
67
impl SpawnedContainer {
68
20
    pub fn is_stopped(&self) -> bool {
69
20
        self.stop_refcount_time.get().is_some() || {
70
            // Check reference count, and set stop_refcount_time if zero
71
15
            let refcount_zero = self.backend.strong_count() == 0 && self.client.strong_count() == 0;
72
15
            if refcount_zero {
73
15
                self.stop_refcount_time.set(Some(Instant::now()));
74
15

            
75
15
                true
76
            } else {
77
                false
78
            }
79
        }
80
20
    }
81

            
82
    pub fn summary(&self) -> String {
83
        #[derive(Debug)]
84
        #[allow(unused)]
85
        struct SpawnedContainerSummary {
86
            id: usize,
87
            para_id: ParaId,
88
            time_start_to_now: Duration,
89
            time_start_to_stop_signal: Option<Duration>,
90
            time_stop_signal_to_stop_task_manager: Option<Duration>,
91
            time_stop_task_manager_to_stop_refcount: Option<Duration>,
92
            time_stop_refcount_to_now: Option<Duration>,
93
            backend_refcount: usize,
94
            client_refcount: usize,
95
        }
96

            
97
        let summary = SpawnedContainerSummary {
98
            id: self.id,
99
            para_id: self.para_id,
100
            time_start_to_now: Instant::now().duration_since(self.start_time),
101
            time_start_to_stop_signal: self
102
                .stop_signal_time
103
                .map(|x| x.duration_since(self.start_time)),
104
            time_stop_signal_to_stop_task_manager: self
105
                .stop_task_manager_time
106
                .and_then(|x| Some(x.duration_since(self.stop_signal_time?))),
107
            time_stop_task_manager_to_stop_refcount: self
108
                .stop_refcount_time
109
                .get()
110
                .and_then(|x| Some(x.duration_since(self.stop_task_manager_time?))),
111
            time_stop_refcount_to_now: self
112
                .stop_refcount_time
113
                .get()
114
                .map(|x| Instant::now().duration_since(x)),
115
            backend_refcount: self.backend.strong_count(),
116
            client_refcount: self.client.strong_count(),
117
        };
118

            
119
        format!("{:?}", summary)
120
    }
121
}
122

            
123
impl SpawnedContainersMonitor {
124
    /// Returns a unique id which is not the ParaId
125
20
    pub fn push(&mut self, mut x: SpawnedContainer) -> usize {
126
20
        assert_eq!(x.id, 0, "SpawnedContainer.id must be set to 0, the actual id will be returned from push function");
127
20
        let id = self.count;
128
20
        x.id = id;
129
20
        self.list.push_back(x);
130
20
        self.count += 1;
131
20

            
132
20
        id
133
20
    }
134

            
135
    pub fn set_stop_signal_time(&mut self, id: usize, when: Instant) {
136
        let i = self.list.iter().position(|x| x.id == id);
137

            
138
        if let Some(i) = i {
139
            self.list[i].stop_signal_time = Some(when);
140
        }
141
    }
142

            
143
    pub fn set_stop_task_manager_time(&mut self, id: usize, when: Instant) {
144
        let i = self.list.iter().position(|x| x.id == id);
145

            
146
        if let Some(i) = i {
147
            self.list[i].stop_task_manager_time = Some(when);
148
        }
149
    }
150

            
151
    #[allow(unused)]
152
    pub fn set_stop_refcount_time(&mut self, id: usize, when: Instant) {
153
        let i = self.list.iter().position(|x| x.id == id);
154

            
155
        if let Some(i) = i {
156
            self.list[i].stop_refcount_time.set(Some(when));
157
        }
158
    }
159

            
160
    pub fn running_chains(&self) -> Vec<&SpawnedContainer> {
161
        self.list
162
            .iter()
163
            .filter(|container| !container.is_stopped())
164
            .collect()
165
    }
166

            
167
    #[allow(unused)]
168
2
    pub fn truncate_old(&mut self, new_len: usize) {
169
2
        if self.list.len() <= new_len {
170
1
            return;
171
1
        }
172
1

            
173
1
        let idx_new_first_element = self.list.len() - new_len;
174
1
        self.list.drain(0..idx_new_first_element);
175
2
    }
176

            
177
2
    pub fn truncate_old_stopped_chains(&mut self, new_len: usize) -> Result<(), ()> {
178
2
        if self.list.len() <= new_len {
179
1
            return Ok(());
180
1
        }
181
1

            
182
1
        let mut to_retain = self.list.len() - new_len;
183
15
        self.list.retain(|container| {
184
15
            if to_retain == 0 {
185
10
                return true;
186
5
            }
187
5

            
188
5
            if container.is_stopped() {
189
5
                to_retain -= 1;
190
5
                false
191
            } else {
192
                true
193
            }
194
15
        });
195
1

            
196
1
        if self.list.len() <= new_len {
197
1
            Ok(())
198
        } else {
199
            Err(())
200
        }
201
2
    }
202
}
203

            
204
/// Background task that monitors the number of running container chains.
205
pub async fn monitor_task(state: Arc<Mutex<ContainerChainSpawnerState>>) {
206
    // Main loop frequency, doesn't need to be fast
207
    let monitor_period = Duration::from_secs(300 * 0 + 10);
208
    // Max number of allowed container chains before printing warnings.
209
    // There should be at most 2 container chains running at the same time (1 syncing + 1 collating),
210
    // but add a margin of error because a container chain may take a few seconds to stop.
211
    let max_running_container_chains = 4;
212

            
213
    loop {
214
        sleep(monitor_period).await;
215
        log::debug!("Monitor tick");
216
        let mut state = state.lock().unwrap();
217
        let monitor_state = &mut state.spawned_containers_monitor;
218

            
219
        let running_chains = monitor_state.running_chains();
220
        let running_para_ids: Vec<ParaId> = running_chains.iter().map(|x| x.para_id).collect();
221
        if running_chains.len() > max_running_container_chains {
222
            log::warn!("Too many container chains running at the same time");
223
            log::warn!(
224
                "Running container chains: {}: {:?}",
225
                running_chains.len(),
226
                running_para_ids
227
            );
228
            log::debug!(
229
                "{:?}",
230
                running_chains
231
                    .iter()
232
                    .map(|x| x.summary())
233
                    .collect::<Vec<_>>()
234
            )
235
        } else {
236
            log::debug!(
237
                "Running container chains: {}: {:?}",
238
                running_chains.len(),
239
                running_para_ids
240
            );
241
        }
242

            
243
        // Remove stopped container chains to keep the list small
244
        let _ = monitor_state.truncate_old_stopped_chains(10);
245
    }
246
}
247

            
248
#[allow(unused)]
249
/// Start and stop the same container chain in a loop, used for testing and debugging
250
pub async fn debug_start_and_stop_same_cc(cc_spawn_tx: UnboundedSender<CcSpawnMsg>) {
251
    let sleep_delay = Duration::from_secs(10);
252

            
253
    loop {
254
        sleep(sleep_delay).await;
255
        cc_spawn_tx
256
            .send(CcSpawnMsg::UpdateAssignment {
257
                current: Some(2000u32.into()),
258
                next: None,
259
            })
260
            .unwrap();
261
        sleep(sleep_delay).await;
262
        cc_spawn_tx
263
            .send(CcSpawnMsg::UpdateAssignment {
264
                current: None,
265
                next: None,
266
            })
267
            .unwrap();
268
        sleep(sleep_delay).await;
269
        cc_spawn_tx
270
            .send(CcSpawnMsg::UpdateAssignment {
271
                current: None,
272
                next: Some(2001u32.into()),
273
            })
274
            .unwrap();
275
        sleep(sleep_delay).await;
276
        cc_spawn_tx
277
            .send(CcSpawnMsg::UpdateAssignment {
278
                current: None,
279
                next: None,
280
            })
281
            .unwrap();
282
    }
283
}
284

            
285
#[cfg(test)]
286
mod tests {
287
    use super::*;
288

            
289
    #[test]
290
1
    fn test_truncate() {
291
1
        let mut monitor = SpawnedContainersMonitor::default();
292
20
        let default_container = || SpawnedContainer {
293
20
            id: Default::default(),
294
20
            para_id: Default::default(),
295
20
            start_time: Instant::now(),
296
20
            stop_signal_time: Default::default(),
297
20
            stop_task_manager_time: Default::default(),
298
20
            stop_refcount_time: Default::default(),
299
20
            backend: Default::default(),
300
20
            client: std::sync::Weak::<()>::new(),
301
20
        };
302

            
303
        // Truncating empty list does not panic
304
1
        monitor.truncate_old(0);
305
1
        monitor.truncate_old_stopped_chains(0).unwrap();
306

            
307
21
        for _ in 0..20 {
308
20
            monitor.push(default_container());
309
20
        }
310

            
311
1
        assert_eq!(monitor.list.len(), 20);
312
1
        assert_eq!(monitor.count, 20);
313

            
314
1
        monitor.truncate_old(15);
315
1
        assert_eq!(monitor.list.len(), 15);
316
1
        assert_eq!(monitor.count, 20);
317
        // Truncate should remove the oldest stopped chains, so the first id is now 5
318
1
        assert_eq!(monitor.list.front().map(|x| x.id), Some(5));
319

            
320
        // We are using Default::default which has a refcount of 0, so all chains are considered stopped
321
15
        assert!(monitor.list.iter().all(|x| x.is_stopped()));
322
1
        monitor.truncate_old_stopped_chains(10).unwrap();
323
1
        assert_eq!(monitor.list.len(), 10);
324
1
        assert_eq!(monitor.count, 20);
325
        // Truncate should remove the oldest stopped chains, so the first id is now 10
326
1
        assert_eq!(monitor.list.front().map(|x| x.id), Some(10));
327
1
    }
328
}