1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
#[cfg(feature = "migrations")]
30
pub mod migrations;
31

            
32
pub mod weights;
33
pub use weights::WeightInfo;
34

            
35
use {
36
    core::cmp::min,
37
    frame_support::{
38
        dispatch::DispatchErrorWithPostInfo,
39
        pallet,
40
        pallet_prelude::*,
41
        storage::types::{StorageDoubleMap, StorageMap},
42
        traits::{
43
            fungible::{Inspect, MutateHold},
44
            tokens::{Balance, Precision},
45
        },
46
        Blake2_128Concat,
47
    },
48
    frame_system::pallet_prelude::*,
49
    parity_scale_codec::{FullCodec, MaxEncodedLen},
50
    scale_info::TypeInfo,
51
    serde::{Deserialize, Serialize},
52
    sp_runtime::{
53
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
54
        ArithmeticError,
55
    },
56
    sp_std::{fmt::Debug, marker::PhantomData},
57
};
58

            
59
pub use pallet::*;
60

            
61
/// Type able to provide the current time for given unit.
62
/// For each unit the returned number should monotonically increase and not
63
/// overflow.
64
pub trait TimeProvider<Unit, Number> {
65
    fn now(unit: &Unit) -> Option<Number>;
66

            
67
    /// Benchmarks: should return the time unit which has the worst performance calling
68
    /// `TimeProvider::now(unit)` with.
69
    #[cfg(feature = "runtime-benchmarks")]
70
    fn bench_worst_case_time_unit() -> Unit;
71

            
72
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
73
    #[cfg(feature = "runtime-benchmarks")]
74
    fn bench_set_now(instant: Number);
75
}
76

            
77
/// Interactions the pallet needs with assets.
78
pub trait AssetsManager<AccountId, AssetId, Balance> {
79
    /// Transfer assets deposited by an account to another account.
80
    /// Those assets should not be considered deposited in the target account.
81
    fn transfer_deposit(
82
        asset_id: &AssetId,
83
        from: &AccountId,
84
        to: &AccountId,
85
        amount: Balance,
86
    ) -> DispatchResult;
87

            
88
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
89
    /// enough of that asset. Funds should be safe and not slashable.
90
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
91
        -> DispatchResult;
92

            
93
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
94
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
95
        -> DispatchResult;
96

            
97
    /// Return the deposit for given asset and account.
98
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
99

            
100
    /// Benchmarks: should return the asset id which has the worst performance when interacting
101
    /// with it.
102
    #[cfg(feature = "runtime-benchmarks")]
103
    fn bench_worst_case_asset_id() -> AssetId;
104

            
105
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
106
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
107
    /// from one asset to another. If there is only one asset id it is fine to return it in both
108
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
109
    #[cfg(feature = "runtime-benchmarks")]
110
    fn bench_worst_case_asset_id2() -> AssetId;
111

            
112
    /// Benchmarks: should set the balance.
113
    #[cfg(feature = "runtime-benchmarks")]
114
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
115
}
116

            
117
11998
#[pallet]
118
pub mod pallet {
119
    use super::*;
120

            
121
    /// Pooled Staking pallet.
122
46877
    #[pallet::pallet]
123
    pub struct Pallet<T>(PhantomData<T>);
124

            
125
    #[pallet::config]
126
    pub trait Config: frame_system::Config {
127
        /// Overarching event type
128
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
129

            
130
        /// Type used to represent stream ids. Should be large enough to not overflow.
131
        type StreamId: AtLeast32BitUnsigned
132
            + Default
133
            + Debug
134
            + Copy
135
            + Clone
136
            + FullCodec
137
            + TypeInfo
138
            + MaxEncodedLen;
139

            
140
        /// The balance type, which is also the type representing time (as this
141
        /// pallet will do math with both time and balances to compute how
142
        /// much should be paid).
143
        type Balance: Balance;
144

            
145
        /// Type representing an asset id, a identifier allowing distinguishing assets.
146
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
147

            
148
        /// Provide interaction with assets.
149
        type AssetsManager: AssetsManager<Self::AccountId, Self::AssetId, Self::Balance>;
150

            
151
        /// Currency for the opening balance hold for the storage used by the Stream.
152
        /// NOT to be confused with Assets.
153
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
154
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
155

            
156
        type RuntimeHoldReason: From<HoldReason>;
157

            
158
        #[pallet::constant]
159
        type OpenStreamHoldAmount: Get<Self::Balance>;
160

            
161
        /// Represents which units of time can be used. Designed to be an enum
162
        /// with a variant for each kind of time source/scale supported.
163
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
164

            
165
        /// Provide the current time in given unit.
166
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
167

            
168
        type WeightInfo: weights::WeightInfo;
169
    }
170

            
171
    pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
172
    pub type AssetIdOf<T> = <T as Config>::AssetId;
173

            
174
    pub type RequestNonce = u32;
175

            
176
    /// A stream payment from source to target.
177
    /// Stores the last time the stream was updated, which allows to compute
178
    /// elapsed time and perform payment.
179
    #[derive(
180
        RuntimeDebug,
181
        PartialEq,
182
        Eq,
183
        Encode,
184
        Decode,
185
        Clone,
186
4944
        TypeInfo,
187
        Serialize,
188
        Deserialize,
189
        MaxEncodedLen,
190
    )]
191
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
192
        /// Payer, source of the stream.
193
        pub source: AccountId,
194
        /// Payee, target of the stream.
195
        pub target: AccountId,
196
        /// Stream config
197
        pub config: StreamConfig<Unit, AssetId, Balance>,
198
        /// How much is deposited to fund this stream.
199
        pub deposit: Balance,
200
        /// Last time the stream was updated in `config.time_unit`.
201
        pub last_time_updated: Balance,
202
        /// Nonce for requests. This prevents a request to make a first request
203
        /// then change it to another request to frontrun the other party
204
        /// accepting.
205
        pub request_nonce: RequestNonce,
206
        /// A pending change request if any.
207
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
208
        /// One-time opening deposit. Will be released on close.
209
        pub opening_deposit: Balance,
210
    }
211

            
212
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
213
95
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
214
95
            match account {
215
95
                a if a == self.source => Some(Party::Source),
216
39
                a if a == self.target => Some(Party::Target),
217
3
                _ => None,
218
            }
219
95
        }
220
    }
221

            
222
    /// Stream configuration.
223
    #[derive(
224
        RuntimeDebug,
225
        PartialEq,
226
        Eq,
227
        Encode,
228
        Decode,
229
        Copy,
230
        Clone,
231
3090
        TypeInfo,
232
        Serialize,
233
        Deserialize,
234
        MaxEncodedLen,
235
    )]
236
    pub struct StreamConfig<Unit, AssetId, BalanceOrDuration> {
237
        /// Unit in which time is measured using a `TimeProvider`.
238
        pub time_unit: Unit,
239
        /// Asset used for payment.
240
        pub asset_id: AssetId,
241
        /// Amount of asset / unit.
242
        pub rate: BalanceOrDuration,
243
        /// Minimum amount of time that can be used for mandatory change requests.
244
        pub minimum_request_deadline_delay: BalanceOrDuration,
245
        /// Minimal amount the source must deposit in the stream. Deposit can go
246
        /// lower due to time passing, but deposit cannot be **decreased** lower
247
        /// explicitly by the source. If this is non-zero, the stream can only
248
        /// be closed by the target or if the deposit is zero. If deposit is lower
249
        /// than minimum due to time passing, source is allowed to **increase** the
250
        /// deposit to a value lower than the soft minimum.
251
        ///
252
        /// This system guarantees to the target that the source cannot instantly
253
        /// stop paying, and may prepare for termination of service if the deposit
254
        /// is below this minimum.
255
        pub soft_minimum_deposit: BalanceOrDuration,
256
    }
257

            
258
    /// Origin of a change request.
259
    #[derive(
260
        RuntimeDebug,
261
        PartialEq,
262
        Eq,
263
        Encode,
264
        Decode,
265
        Copy,
266
        Clone,
267
2472
        TypeInfo,
268
        Serialize,
269
        Deserialize,
270
        MaxEncodedLen,
271
    )]
272
    pub enum Party {
273
53
        Source,
274
25
        Target,
275
    }
276

            
277
    impl Party {
278
4
        pub fn inverse(self) -> Self {
279
4
            match self {
280
2
                Party::Source => Party::Target,
281
2
                Party::Target => Party::Source,
282
            }
283
4
        }
284
    }
285

            
286
    /// Kind of change requested.
287
    #[derive(
288
        RuntimeDebug,
289
        PartialEq,
290
        Eq,
291
        Encode,
292
        Decode,
293
        Copy,
294
        Clone,
295
1236
        TypeInfo,
296
        Serialize,
297
        Deserialize,
298
        MaxEncodedLen,
299
    )]
300
    pub enum ChangeKind<Time> {
301
22
        /// The requested change is a suggestion, and the other party doesn't
302
        /// need to accept it.
303
        Suggestion,
304
92
        /// The requested change is mandatory, and the other party must either
305
        /// accept the change or close the stream. Reaching the deadline will
306
        /// close the stream too.
307
        Mandatory { deadline: Time },
308
    }
309

            
310
    /// Describe how the deposit should change.
311
    #[derive(
312
        RuntimeDebug,
313
        PartialEq,
314
        Eq,
315
        Encode,
316
        Decode,
317
        Copy,
318
        Clone,
319
2472
        TypeInfo,
320
        Serialize,
321
        Deserialize,
322
        MaxEncodedLen,
323
    )]
324
    pub enum DepositChange<Balance> {
325
80
        /// Increase deposit by given amount.
326
        Increase(Balance),
327
1
        /// Decrease deposit by given amount.
328
        Decrease(Balance),
329
6
        /// Set deposit to given amount.
330
        Absolute(Balance),
331
    }
332

            
333
    /// A request to change a stream config.
334
    #[derive(
335
        RuntimeDebug,
336
        PartialEq,
337
        Eq,
338
        Encode,
339
        Decode,
340
        Clone,
341
2472
        TypeInfo,
342
        Serialize,
343
        Deserialize,
344
        MaxEncodedLen,
345
    )]
346
    pub struct ChangeRequest<Unit, AssetId, Balance> {
347
        pub requester: Party,
348
        pub kind: ChangeKind<Balance>,
349
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
350
        pub deposit_change: Option<DepositChange<Balance>>,
351
    }
352

            
353
    pub type StreamOf<T> =
354
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
355

            
356
    pub type StreamConfigOf<T> =
357
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
358

            
359
    pub type ChangeRequestOf<T> =
360
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
361

            
362
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
363
    pub struct StreamPaymentStatus<Balance> {
364
        pub payment: Balance,
365
        pub deposit_left: Balance,
366
        /// Whenever the stream is stalled, which can occur either when no funds are left or
367
        /// if the time is past a mandatory request deadline.
368
        pub stalled: bool,
369
    }
370

            
371
    /// Store the next available stream id.
372
994
    #[pallet::storage]
373
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
374

            
375
    /// Store each stream indexed by an Id.
376
1092
    #[pallet::storage]
377
    pub type Streams<T: Config> = StorageMap<
378
        Hasher = Blake2_128Concat,
379
        Key = T::StreamId,
380
        Value = StreamOf<T>,
381
        QueryKind = OptionQuery,
382
    >;
383

            
384
    /// Lookup for all streams with given source.
385
    /// To avoid maintaining a growing list of stream ids, they are stored in
386
    /// the form of an entry (AccountId, StreamId). If such entry exists then
387
    /// this AccountId is a source in StreamId. One can iterate over all storage
388
    /// keys starting with the AccountId to find all StreamIds.
389
734
    #[pallet::storage]
390
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
391
        Key1 = AccountIdOf<T>,
392
        Hasher1 = Blake2_128Concat,
393
        Key2 = T::StreamId,
394
        Hasher2 = Blake2_128Concat,
395
        Value = (),
396
        QueryKind = OptionQuery,
397
    >;
398

            
399
    /// Lookup for all streams with given target.
400
    /// To avoid maintaining a growing list of stream ids, they are stored in
401
    /// the form of an entry (AccountId, StreamId). If such entry exists then
402
    /// this AccountId is a target in StreamId. One can iterate over all storage
403
    /// keys starting with the AccountId to find all StreamIds.
404
734
    #[pallet::storage]
405
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
406
        Key1 = AccountIdOf<T>,
407
        Hasher1 = Blake2_128Concat,
408
        Key2 = T::StreamId,
409
        Hasher2 = Blake2_128Concat,
410
        Value = (),
411
        QueryKind = OptionQuery,
412
    >;
413

            
414
164
    #[pallet::error]
415
    #[derive(Clone, PartialEq, Eq)]
416
    pub enum Error<T> {
417
        UnknownStreamId,
418
        StreamIdOverflow,
419
        UnauthorizedOrigin,
420
        CantBeBothSourceAndTarget,
421
        CantFetchCurrentTime,
422
        SourceCantDecreaseRate,
423
        TargetCantIncreaseRate,
424
        CantOverrideMandatoryChange,
425
        NoPendingRequest,
426
        CantAcceptOwnRequest,
427
        CanOnlyCancelOwnRequest,
428
        WrongRequestNonce,
429
        ChangingAssetRequiresAbsoluteDepositChange,
430
        TargetCantChangeDeposit,
431
        ImmediateDepositChangeRequiresSameAssetId,
432
        DeadlineCantBeInPast,
433
        CantFetchStatusBeforeLastTimeUpdated,
434
        DeadlineDelayIsBelowMinium,
435
        CantDecreaseDepositUnderSoftDepositMinimum,
436
        SourceCantCloseActiveStreamWithSoftDepositMinimum,
437
        CantCreateStreamWithDepositUnderSoftMinimum,
438
    }
439

            
440
618
    #[pallet::event]
441
236
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
442
    pub enum Event<T: Config> {
443
49
        StreamOpened {
444
            stream_id: T::StreamId,
445
        },
446
8
        StreamClosed {
447
            stream_id: T::StreamId,
448
            refunded: T::Balance,
449
        },
450
13
        StreamPayment {
451
            stream_id: T::StreamId,
452
            source: AccountIdOf<T>,
453
            target: AccountIdOf<T>,
454
            amount: T::Balance,
455
            stalled: bool,
456
        },
457
12
        StreamConfigChangeRequested {
458
            stream_id: T::StreamId,
459
            request_nonce: RequestNonce,
460
            requester: Party,
461
            old_config: StreamConfigOf<T>,
462
            new_config: StreamConfigOf<T>,
463
        },
464
7
        StreamConfigChanged {
465
            stream_id: T::StreamId,
466
            old_config: StreamConfigOf<T>,
467
            new_config: StreamConfigOf<T>,
468
            deposit_change: Option<DepositChange<T::Balance>>,
469
        },
470
    }
471

            
472
    /// Freeze reason to use if needed.
473
    #[pallet::composite_enum]
474
    pub enum FreezeReason {
475
        StreamPayment,
476
    }
477

            
478
    /// Hold reason to use if needed.
479
    #[pallet::composite_enum]
480
    pub enum HoldReason {
481
248
        StreamPayment,
482
562
        StreamOpened,
483
    }
484

            
485
540
    #[pallet::call]
486
    impl<T: Config> Pallet<T> {
487
        /// Create a payment stream from the origin to the target with provided config
488
        /// and initial deposit (in the asset defined in the config).
489
        #[pallet::call_index(0)]
490
        #[pallet::weight(T::WeightInfo::open_stream())]
491
        pub fn open_stream(
492
            origin: OriginFor<T>,
493
            target: AccountIdOf<T>,
494
            config: StreamConfigOf<T>,
495
            initial_deposit: T::Balance,
496
92
        ) -> DispatchResultWithPostInfo {
497
92
            let origin = ensure_signed(origin)?;
498

            
499
92
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
500

            
501
86
            Ok(().into())
502
        }
503

            
504
        /// Close a given stream in which the origin is involved. It performs the pending payment
505
        /// before closing the stream.
506
        #[pallet::call_index(1)]
507
        #[pallet::weight(T::WeightInfo::close_stream())]
508
        pub fn close_stream(
509
            origin: OriginFor<T>,
510
            stream_id: T::StreamId,
511
24
        ) -> DispatchResultWithPostInfo {
512
24
            let origin = ensure_signed(origin)?;
513
24
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
514

            
515
            // Only source or target can close a stream.
516
23
            ensure!(
517
23
                origin == stream.source || origin == stream.target,
518
1
                Error::<T>::UnauthorizedOrigin
519
            );
520

            
521
            // Update stream before closing it to ensure fair payment.
522
22
            Self::perform_stream_payment(stream_id, &mut stream)?;
523

            
524
            // If there is a soft minimum deposit, stream can be closed only by target or if deposit is empty.
525
22
            ensure!(
526
22
                stream.config.soft_minimum_deposit.is_zero()
527
3
                    || stream.deposit.is_zero()
528
2
                    || origin == stream.target,
529
1
                Error::<T>::SourceCantCloseActiveStreamWithSoftDepositMinimum
530
            );
531

            
532
            // Unfreeze funds left in the stream.
533
21
            T::AssetsManager::decrease_deposit(
534
21
                &stream.config.asset_id,
535
21
                &stream.source,
536
21
                stream.deposit,
537
21
            )?;
538

            
539
            // Release opening deposit
540
21
            if stream.opening_deposit > 0u32.into() {
541
21
                T::Currency::release(
542
21
                    &HoldReason::StreamOpened.into(),
543
21
                    &stream.source,
544
21
                    stream.opening_deposit,
545
21
                    Precision::Exact,
546
21
                )?;
547
            }
548

            
549
            // Remove stream from storage.
550
21
            Streams::<T>::remove(stream_id);
551
21
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
552
21
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
553
21

            
554
21
            // Emit event.
555
21
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
556
21
                stream_id,
557
21
                // TODO: Should `refunded` in event really include the opening_deposit?
558
21
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
559
21
            });
560
21

            
561
21
            Ok(().into())
562
        }
563

            
564
        /// Perform the pending payment of a stream. Anyone can call this.
565
        #[pallet::call_index(2)]
566
        #[pallet::weight(T::WeightInfo::perform_payment())]
567
        pub fn perform_payment(
568
            origin: OriginFor<T>,
569
            stream_id: T::StreamId,
570
27
        ) -> DispatchResultWithPostInfo {
571
27
            // No problem with anyone updating any stream.
572
27
            let _ = ensure_signed(origin)?;
573

            
574
27
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
575
26
            Self::perform_stream_payment(stream_id, &mut stream)?;
576
26
            Streams::<T>::insert(stream_id, stream);
577
26

            
578
26
            Ok(().into())
579
        }
580

            
581
        /// Requests a change to a stream config or deposit.
582
        ///
583
        /// If the new config don't change the time unit and asset id, the change will be applied
584
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
585
        /// in the stream and will have to be approved by the other party.
586
        ///
587
        /// This call accepts a deposit change, which can only be provided by the source of the
588
        /// stream. An absolute change is required when changing asset id, as the current deposit
589
        /// will be released and a new deposit is required in the new asset.
590
        #[pallet::call_index(3)]
591
        #[pallet::weight(
592
            T::WeightInfo::request_change_immediate()
593
            .max(T::WeightInfo::request_change_delayed())
594
        )]
595
        pub fn request_change(
596
            origin: OriginFor<T>,
597
            stream_id: T::StreamId,
598
            kind: ChangeKind<T::Balance>,
599
            new_config: StreamConfigOf<T>,
600
            deposit_change: Option<DepositChange<T::Balance>>,
601
61
        ) -> DispatchResultWithPostInfo {
602
61
            let origin = ensure_signed(origin)?;
603
61
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
604

            
605
60
            let requester = stream
606
60
                .account_to_party(origin)
607
60
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
608

            
609
59
            ensure!(
610
59
                requester == Party::Source || deposit_change.is_none(),
611
1
                Error::<T>::TargetCantChangeDeposit
612
            );
613

            
614
58
            if stream.config == new_config && deposit_change.is_none() {
615
1
                return Ok(().into());
616
57
            }
617

            
618
57
            if let ChangeKind::Mandatory { deadline } = kind {
619
20
                let now = T::TimeProvider::now(&stream.config.time_unit)
620
20
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
621

            
622
20
                let Some(diff) = deadline.checked_sub(&now) else {
623
1
                    return Err(Error::<T>::DeadlineCantBeInPast.into());
624
                };
625

            
626
19
                ensure!(
627
19
                    diff >= stream.config.minimum_request_deadline_delay,
628
1
                    Error::<T>::DeadlineDelayIsBelowMinium
629
                );
630
37
            }
631

            
632
            // If asset id and time unit are the same, we allow to make the change
633
            // immediatly if the origin is at a disadvantage.
634
            // We allow this even if there is already a pending request.
635
            // This checks that the deposit can't be **decreased** under
636
            // config.soft_minimum_deposit.
637
55
            if Self::maybe_immediate_change(
638
55
                stream_id,
639
55
                &mut stream,
640
55
                &new_config,
641
55
                deposit_change,
642
55
                requester,
643
55
            )? {
644
7
                return Ok(().into());
645
44
            }
646
44

            
647
44
            // If the source is requesting a change of asset, they must provide an absolute change.
648
44
            if requester == Party::Source
649
32
                && new_config.asset_id != stream.config.asset_id
650
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
651
            {
652
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
653
41
            }
654

            
655
            // If there is already a mandatory change request, only the origin
656
            // of this request can change it.
657
            if let Some(ChangeRequest {
658
                kind: ChangeKind::Mandatory { .. },
659
3
                requester: pending_requester,
660
                ..
661
5
            }) = &stream.pending_request
662
            {
663
3
                ensure!(
664
3
                    &requester == pending_requester,
665
2
                    Error::<T>::CantOverrideMandatoryChange
666
                );
667
38
            }
668

            
669
39
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
670
39
            stream.pending_request = Some(ChangeRequest {
671
39
                requester,
672
39
                kind,
673
39
                new_config: new_config.clone(),
674
39
                deposit_change,
675
39
            });
676
39

            
677
39
            // Emit event.
678
39
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
679
39
                stream_id,
680
39
                request_nonce: stream.request_nonce,
681
39
                requester,
682
39
                old_config: stream.config.clone(),
683
39
                new_config,
684
39
            });
685
39

            
686
39
            // Update storage.
687
39
            Streams::<T>::insert(stream_id, stream);
688
39

            
689
39
            Ok(().into())
690
        }
691

            
692
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
693
        /// frontrunning attacks. If the target made a request, the source is able to change their
694
        /// deposit.
695
        #[pallet::call_index(4)]
696
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
697
        pub fn accept_requested_change(
698
            origin: OriginFor<T>,
699
            stream_id: T::StreamId,
700
            request_nonce: RequestNonce,
701
            deposit_change: Option<DepositChange<T::Balance>>,
702
31
        ) -> DispatchResultWithPostInfo {
703
31
            let origin = ensure_signed(origin)?;
704
31
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
705

            
706
30
            let accepter = stream
707
30
                .account_to_party(origin)
708
30
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
709

            
710
29
            let Some(request) = stream.pending_request.clone() else {
711
1
                return Err(Error::<T>::NoPendingRequest.into());
712
            };
713

            
714
28
            ensure!(
715
28
                request_nonce == stream.request_nonce,
716
1
                Error::<T>::WrongRequestNonce
717
            );
718
27
            ensure!(
719
27
                accepter != request.requester,
720
1
                Error::<T>::CantAcceptOwnRequest
721
            );
722

            
723
26
            ensure!(
724
26
                accepter == Party::Source || deposit_change.is_none(),
725
1
                Error::<T>::TargetCantChangeDeposit
726
            );
727

            
728
            // Perform pending payment before changing config.
729
25
            Self::perform_stream_payment(stream_id, &mut stream)?;
730

            
731
            // Apply change.
732
            // It is safe to override config now as we have already performed the payment.
733
            // Checks made in apply_deposit_change needs to be done with new config.
734
25
            let old_config = stream.config;
735
25
            stream.config = request.new_config;
736
25
            let deposit_change = deposit_change.or(request.deposit_change);
737
25

            
738
25
            match (
739
25
                old_config.asset_id == stream.config.asset_id,
740
25
                deposit_change,
741
            ) {
742
                // Same asset and a change, we apply it like in `change_deposit` call.
743
13
                (true, Some(change)) => {
744
13
                    Self::apply_deposit_change(&mut stream, change)?;
745
                }
746
                // Same asset and no change, no problem.
747
6
                (true, None) => (),
748
                // Change in asset with absolute new amount
749
3
                (false, Some(DepositChange::Absolute(amount))) => {
750
3
                    // As target chooses the deposit amount in new currency, we must ensure
751
3
                    // it is greater than the soft minimum deposit.
752
3
                    ensure!(
753
3
                        amount >= stream.config.soft_minimum_deposit,
754
1
                        Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum
755
                    );
756

            
757
                    // Release deposit in old asset.
758
2
                    T::AssetsManager::decrease_deposit(
759
2
                        &old_config.asset_id,
760
2
                        &stream.source,
761
2
                        stream.deposit,
762
2
                    )?;
763

            
764
                    // Make deposit in new asset.
765
2
                    T::AssetsManager::increase_deposit(
766
2
                        &stream.config.asset_id,
767
2
                        &stream.source,
768
2
                        amount,
769
2
                    )?;
770
2
                    stream.deposit = amount;
771
                }
772
                // It doesn't make sense to change asset while not providing an absolute new
773
                // amount.
774
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
775
            }
776

            
777
            // If time unit changes we need to update `last_time_updated` to be in the
778
            // new unit.
779
21
            if old_config.time_unit != stream.config.time_unit {
780
2
                stream.last_time_updated = T::TimeProvider::now(&stream.config.time_unit)
781
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
782
19
            }
783

            
784
            // Event
785
21
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
786
21
                stream_id,
787
21
                old_config,
788
21
                new_config: stream.config.clone(),
789
21
                deposit_change,
790
21
            });
791
21

            
792
21
            // Update stream in storage.
793
21
            stream.pending_request = None;
794
21
            Streams::<T>::insert(stream_id, stream);
795
21

            
796
21
            Ok(().into())
797
        }
798

            
799
        #[pallet::call_index(5)]
800
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
801
        pub fn cancel_change_request(
802
            origin: OriginFor<T>,
803
            stream_id: T::StreamId,
804
6
        ) -> DispatchResultWithPostInfo {
805
6
            let origin = ensure_signed(origin)?;
806
6
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
807

            
808
5
            let accepter = stream
809
5
                .account_to_party(origin)
810
5
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
811

            
812
4
            let Some(request) = stream.pending_request.take() else {
813
1
                return Err(Error::<T>::NoPendingRequest.into());
814
            };
815

            
816
3
            ensure!(
817
3
                accepter == request.requester,
818
2
                Error::<T>::CanOnlyCancelOwnRequest
819
            );
820

            
821
            // Update storage.
822
            // Pending request is removed by calling `.take()`.
823
1
            Streams::<T>::insert(stream_id, stream);
824
1

            
825
1
            Ok(().into())
826
        }
827

            
828
        /// Allows immediately changing the deposit for a stream, which is simpler than
829
        /// calling `request_change` with the proper parameters.
830
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
831
        /// the call is included in a block, in which case the unit is no longer the same and quantities
832
        /// will not have the same scale/value.
833
        #[pallet::call_index(6)]
834
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
835
        pub fn immediately_change_deposit(
836
            origin: OriginFor<T>,
837
            stream_id: T::StreamId,
838
            asset_id: T::AssetId,
839
            change: DepositChange<T::Balance>,
840
13
        ) -> DispatchResultWithPostInfo {
841
13
            let origin = ensure_signed(origin)?;
842
13
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
843

            
844
12
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
845
10
            ensure!(
846
10
                stream.config.asset_id == asset_id,
847
1
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
848
            );
849

            
850
            // Perform pending payment before changing deposit.
851
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
852

            
853
            // Apply change.
854
9
            Self::apply_deposit_change(&mut stream, change)?;
855

            
856
            // Event
857
4
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
858
4
                stream_id,
859
4
                old_config: stream.config.clone(),
860
4
                new_config: stream.config.clone(),
861
4
                deposit_change: Some(change),
862
4
            });
863
4

            
864
4
            // Update stream in storage.
865
4
            Streams::<T>::insert(stream_id, stream);
866
4

            
867
4
            Ok(().into())
868
        }
869
    }
870

            
871
    impl<T: Config> Pallet<T> {
872
        /// Try to open a stream and returns its id.
873
        /// Prefers calling this function from other pallets instead of `open_stream` as the
874
        /// latter can't return the id.
875
96
        pub fn open_stream_returns_id(
876
96
            origin: AccountIdOf<T>,
877
96
            target: AccountIdOf<T>,
878
96
            config: StreamConfigOf<T>,
879
96
            initial_deposit: T::Balance,
880
96
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
881
96
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
882

            
883
95
            ensure!(
884
95
                initial_deposit >= config.soft_minimum_deposit,
885
1
                Error::<T>::CantCreateStreamWithDepositUnderSoftMinimum
886
            );
887

            
888
            // Generate a new stream id.
889
94
            let stream_id = NextStreamId::<T>::get();
890
94
            let next_stream_id = stream_id
891
94
                .checked_add(&One::one())
892
94
                .ok_or(Error::<T>::StreamIdOverflow)?;
893
93
            NextStreamId::<T>::set(next_stream_id);
894
93

            
895
93
            // Hold opening deposit for the storage used by Stream
896
93
            let opening_deposit = T::OpenStreamHoldAmount::get();
897
93
            if opening_deposit > 0u32.into() {
898
93
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
899
            }
900

            
901
            // Freeze initial deposit.
902
93
            T::AssetsManager::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
903

            
904
            // Create stream data.
905
90
            let now =
906
91
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
907
90
            let stream = Stream {
908
90
                source: origin.clone(),
909
90
                target: target.clone(),
910
90
                config,
911
90
                deposit: initial_deposit,
912
90
                last_time_updated: now,
913
90
                request_nonce: 0,
914
90
                pending_request: None,
915
90
                opening_deposit,
916
90
            };
917
90

            
918
90
            // Insert stream in storage.
919
90
            Streams::<T>::insert(stream_id, stream);
920
90
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
921
90
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
922
90

            
923
90
            // Emit event.
924
90
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
925
90

            
926
90
            Ok(stream_id)
927
96
        }
928

            
929
        /// Get the stream payment current status, telling how much payment is
930
        /// pending, how much deposit will be left and whenever the stream is stalled.
931
        /// The stream is considered stalled if no funds are left or if the provided
932
        /// time is past a mandatory request deadline. If the provided `now` is `None`
933
        /// then the current time will be fetched. Being able to provide a custom `now`
934
        /// allows to check the status in the future. It is invalid to provide a `now` that is
935
        /// before `last_time_updated`.
936
55
        pub fn stream_payment_status(
937
55
            stream_id: T::StreamId,
938
55
            now: Option<T::Balance>,
939
55
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
940
55
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
941
43
            let now = match now {
942
                Some(v) => v,
943
43
                None => T::TimeProvider::now(&stream.config.time_unit)
944
43
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
945
            };
946

            
947
43
            let last_time_updated = stream.last_time_updated;
948
43

            
949
43
            ensure!(
950
43
                now >= last_time_updated,
951
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
952
            );
953

            
954
43
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
955
55
        }
956

            
957
136
        fn stream_payment_status_by_ref(
958
136
            stream: &StreamOf<T>,
959
136
            last_time_updated: T::Balance,
960
136
            mut now: T::Balance,
961
136
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
962
136
            let mut stalled_by_deadline = false;
963

            
964
            // Take into account mandatory change request deadline. Note that
965
            // while it'll perform payment up to deadline,
966
            // `stream.last_time_updated` is still the "real now" to avoid
967
            // retroactive payment in case the deadline changes.
968
            if let Some(ChangeRequest {
969
34
                kind: ChangeKind::Mandatory { deadline },
970
                ..
971
46
            }) = &stream.pending_request
972
            {
973
34
                now = min(now, *deadline);
974
34

            
975
34
                if now == *deadline {
976
15
                    stalled_by_deadline = true;
977
21
                }
978
102
            }
979

            
980
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
981
136
            if stream.deposit.is_zero() {
982
                return Ok(StreamPaymentStatus {
983
                    payment: 0u32.into(),
984
                    deposit_left: stream.deposit,
985
                    stalled: true,
986
                });
987
136
            }
988

            
989
            // Dont perform payment if now is before or equal to `last_time_updated`.
990
            // It can be before due to the deadline adjustment.
991
136
            let Some(delta) = now.checked_sub(&last_time_updated) else {
992
2
                return Ok(StreamPaymentStatus {
993
2
                    payment: 0u32.into(),
994
2
                    deposit_left: stream.deposit,
995
2
                    stalled: true,
996
2
                });
997
            };
998

            
999
            // We compute the amount due to the target according to the rate, which may be
            // lowered if the stream deposit is lower.
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
            // considering it an error can make a stream un-updatable if too much time has passed
            // without updates.
134
            let mut payment = delta.saturating_mul(stream.config.rate);
            // We compute the new amount of locked funds. If it underflows it
            // means that there is more to pay that what is left, in which case
            // we pay all that is left.
134
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
132
                Some(v) if v.is_zero() => (v, true),
129
                Some(v) => (v, stalled_by_deadline),
                None => {
2
                    payment = stream.deposit;
2
                    (Zero::zero(), true)
                }
            };
134
            Ok(StreamPaymentStatus {
134
                payment,
134
                deposit_left,
134
                stalled,
134
            })
136
        }
        /// Behavior:
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
        /// last time the stream was updated. When updating the stream, **at most**
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
        /// account. If this amount is greater than the left deposit, the stream is considered
        /// drained **but not closed**. The source can come back later and refill the stream,
        /// however there will be no retroactive payment for the time spent as drained.
        /// If the stream payment is used to rent a service, the target should pause the service
        /// while the stream is drained, and resume it once it is refilled.
93
        fn perform_stream_payment(
93
            stream_id: T::StreamId,
93
            stream: &mut StreamOf<T>,
93
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
93
            let now = T::TimeProvider::now(&stream.config.time_unit)
93
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
            // We want to update `stream.last_time_updated` to `now` as soon
            // as possible to avoid forgetting to do it. We copy the old value
            // for payment computation.
93
            let last_time_updated = stream.last_time_updated;
93
            stream.last_time_updated = now;
            let StreamPaymentStatus {
93
                payment,
93
                deposit_left,
93
                stalled,
93
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
93
            if payment.is_zero() {
39
                return Ok(0u32.into());
54
            }
54

            
54
            // Transfer from the source to target.
54
            T::AssetsManager::transfer_deposit(
54
                &stream.config.asset_id,
54
                &stream.source,
54
                &stream.target,
54
                payment,
54
            )?;
            // Update stream info.
54
            stream.deposit = deposit_left;
54

            
54
            // Emit event.
54
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
54
                stream_id,
54
                source: stream.source.clone(),
54
                target: stream.target.clone(),
54
                amount: payment,
54
                stalled,
54
            });
54

            
54
            Ok(payment)
93
        }
31
        fn apply_deposit_change(
31
            stream: &mut StreamOf<T>,
31
            change: DepositChange<T::Balance>,
31
        ) -> DispatchResultWithPostInfo {
31
            match change {
8
                DepositChange::Absolute(amount) => {
8
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::AssetsManager::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
7
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
7
                        if amount < stream.config.soft_minimum_deposit {
2
                            return Err(
2
                                Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into()
2
                            );
5
                        }
5

            
5
                        T::AssetsManager::decrease_deposit(
5
                            &stream.config.asset_id,
5
                            &stream.source,
5
                            decrease,
5
                        )?;
                    }
5
                    stream.deposit = amount;
                }
17
                DepositChange::Increase(increase) => {
17
                    stream.deposit = stream
17
                        .deposit
17
                        .checked_add(&increase)
17
                        .ok_or(ArithmeticError::Overflow)?;
15
                    T::AssetsManager::increase_deposit(
15
                        &stream.config.asset_id,
15
                        &stream.source,
15
                        increase,
15
                    )?;
                }
6
                DepositChange::Decrease(decrease) => {
6
                    let new_deposit = stream
6
                        .deposit
6
                        .checked_sub(&decrease)
6
                        .ok_or(ArithmeticError::Underflow)?;
4
                    if new_deposit < stream.config.soft_minimum_deposit {
2
                        return Err(Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into());
2
                    }
2

            
2
                    stream.deposit = new_deposit;
2
                    T::AssetsManager::decrease_deposit(
2
                        &stream.config.asset_id,
2
                        &stream.source,
2
                        decrease,
2
                    )?;
                }
            }
22
            Ok(().into())
31
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
55
        fn maybe_immediate_change(
55
            stream_id: T::StreamId,
55
            stream: &mut StreamOf<T>,
55
            new_config: &StreamConfigOf<T>,
55
            deposit_change: Option<DepositChange<T::Balance>>,
55
            requester: Party,
55
        ) -> Result<bool, DispatchErrorWithPostInfo> {
55
            if new_config.time_unit != stream.config.time_unit
37
                || new_config.asset_id != stream.config.asset_id
            {
24
                return Ok(false);
31
            }
31

            
31
            if requester == Party::Source && new_config.rate < stream.config.rate {
16
                return Ok(false);
15
            }
15

            
15
            if requester == Party::Target && new_config.rate > stream.config.rate {
4
                return Ok(false);
11
            }
11

            
11
            // Perform pending payment before changing config.
11
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
11
            if let Some(change) = deposit_change {
9
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
7
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
7
                stream_id,
7
                old_config: stream.config.clone(),
7
                new_config: new_config.clone(),
7
                deposit_change,
7
            });
7

            
7
            // Update storage.
7
            stream.config = new_config.clone();
7
            Streams::<T>::insert(stream_id, stream);
7

            
7
            Ok(true)
55
        }
    }
}