1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19
extern crate alloc;
20

            
21
#[cfg(test)]
22
mod mock;
23

            
24
#[cfg(test)]
25
mod tests;
26

            
27
#[cfg(feature = "runtime-benchmarks")]
28
mod benchmarking;
29

            
30
#[cfg(feature = "migrations")]
31
pub mod migrations;
32

            
33
pub mod weights;
34
pub use weights::WeightInfo;
35

            
36
use {
37
    alloc::fmt::Debug,
38
    core::cmp::min,
39
    core::marker::PhantomData,
40
    frame_support::{
41
        dispatch::DispatchErrorWithPostInfo,
42
        pallet,
43
        pallet_prelude::*,
44
        storage::types::{StorageDoubleMap, StorageMap},
45
        traits::{
46
            fungible::{Inspect, MutateHold},
47
            tokens::{Balance, Precision},
48
        },
49
        Blake2_128Concat,
50
    },
51
    frame_system::pallet_prelude::*,
52
    parity_scale_codec::{DecodeWithMemTracking, FullCodec, MaxEncodedLen},
53
    scale_info::TypeInfo,
54
    serde::{Deserialize, Serialize},
55
    sp_runtime::{
56
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
57
        ArithmeticError,
58
    },
59
};
60

            
61
pub use pallet::*;
62

            
63
/// Type able to provide the current time for given unit.
64
/// For each unit the returned number should monotonically increase and not
65
/// overflow.
66
pub trait TimeProvider<Unit, Number> {
67
    fn now(unit: &Unit) -> Option<Number>;
68

            
69
    /// Benchmarks: should return the time unit which has the worst performance calling
70
    /// `TimeProvider::now(unit)` with.
71
    #[cfg(feature = "runtime-benchmarks")]
72
    fn bench_worst_case_time_unit() -> Unit;
73

            
74
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
75
    #[cfg(feature = "runtime-benchmarks")]
76
    fn bench_set_now(instant: Number);
77
}
78

            
79
/// Interactions the pallet needs with assets.
80
pub trait AssetsManager<AccountId, AssetId, Balance> {
81
    /// Transfer assets deposited by an account to another account.
82
    /// Those assets should not be considered deposited in the target account.
83
    fn transfer_deposit(
84
        asset_id: &AssetId,
85
        from: &AccountId,
86
        to: &AccountId,
87
        amount: Balance,
88
    ) -> DispatchResult;
89

            
90
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
91
    /// enough of that asset. Funds should be safe and not slashable.
92
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
93
        -> DispatchResult;
94

            
95
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
96
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
97
        -> DispatchResult;
98

            
99
    /// Return the deposit for given asset and account.
100
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
101

            
102
    /// Benchmarks: should return the asset id which has the worst performance when interacting
103
    /// with it.
104
    #[cfg(feature = "runtime-benchmarks")]
105
    fn bench_worst_case_asset_id() -> AssetId;
106

            
107
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
108
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
109
    /// from one asset to another. If there is only one asset id it is fine to return it in both
110
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
111
    #[cfg(feature = "runtime-benchmarks")]
112
    fn bench_worst_case_asset_id2() -> AssetId;
113

            
114
    /// Benchmarks: should set the balance.
115
    #[cfg(feature = "runtime-benchmarks")]
116
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
117
}
118

            
119
40560
#[pallet]
120
pub mod pallet {
121
    use super::*;
122

            
123
    /// Pooled Staking pallet.
124
69439
    #[pallet::pallet]
125
    pub struct Pallet<T>(PhantomData<T>);
126

            
127
    #[pallet::config]
128
    pub trait Config: frame_system::Config {
129
        /// Overarching event type
130
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
131

            
132
        /// Type used to represent stream ids. Should be large enough to not overflow.
133
        type StreamId: AtLeast32BitUnsigned
134
            + Default
135
            + Debug
136
            + Copy
137
            + Clone
138
            + FullCodec
139
            + TypeInfo
140
            + MaxEncodedLen;
141

            
142
        /// The balance type, which is also the type representing time (as this
143
        /// pallet will do math with both time and balances to compute how
144
        /// much should be paid).
145
        type Balance: Balance;
146

            
147
        /// Type representing an asset id, a identifier allowing distinguishing assets.
148
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
149

            
150
        /// Provide interaction with assets.
151
        type AssetsManager: AssetsManager<Self::AccountId, Self::AssetId, Self::Balance>;
152

            
153
        /// Currency for the opening balance hold for the storage used by the Stream.
154
        /// NOT to be confused with Assets.
155
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
156
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
157

            
158
        type RuntimeHoldReason: From<HoldReason>;
159

            
160
        #[pallet::constant]
161
        type OpenStreamHoldAmount: Get<Self::Balance>;
162

            
163
        /// Represents which units of time can be used. Designed to be an enum
164
        /// with a variant for each kind of time source/scale supported.
165
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
166

            
167
        /// Provide the current time in given unit.
168
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
169

            
170
        type WeightInfo: weights::WeightInfo;
171
    }
172

            
173
    pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
174
    pub type AssetIdOf<T> = <T as Config>::AssetId;
175

            
176
    pub type RequestNonce = u32;
177

            
178
    /// A stream payment from source to target.
179
    /// Stores the last time the stream was updated, which allows to compute
180
    /// elapsed time and perform payment.
181
    #[derive(
182
        RuntimeDebug,
183
        PartialEq,
184
        Eq,
185
        Encode,
186
        Decode,
187
        Clone,
188
4992
        TypeInfo,
189
        Serialize,
190
        Deserialize,
191
        MaxEncodedLen,
192
    )]
193
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
194
        /// Payer, source of the stream.
195
        pub source: AccountId,
196
        /// Payee, target of the stream.
197
        pub target: AccountId,
198
        /// Stream config
199
        pub config: StreamConfig<Unit, AssetId, Balance>,
200
        /// How much is deposited to fund this stream.
201
        pub deposit: Balance,
202
        /// Last time the stream was updated in `config.time_unit`.
203
        pub last_time_updated: Balance,
204
        /// Nonce for requests. This prevents a request to make a first request
205
        /// then change it to another request to frontrun the other party
206
        /// accepting.
207
        pub request_nonce: RequestNonce,
208
        /// A pending change request if any.
209
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
210
        /// One-time opening deposit. Will be released on close.
211
        pub opening_deposit: Balance,
212
    }
213

            
214
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
215
95
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
216
95
            match account {
217
95
                a if a == self.source => Some(Party::Source),
218
39
                a if a == self.target => Some(Party::Target),
219
3
                _ => None,
220
            }
221
95
        }
222
    }
223

            
224
    /// Stream configuration.
225
    #[derive(
226
        RuntimeDebug,
227
        PartialEq,
228
        Eq,
229
        Encode,
230
        Decode,
231
        Copy,
232
        Clone,
233
3120
        TypeInfo,
234
        Serialize,
235
        Deserialize,
236
        MaxEncodedLen,
237
        DecodeWithMemTracking,
238
    )]
239
    pub struct StreamConfig<Unit, AssetId, BalanceOrDuration> {
240
        /// Unit in which time is measured using a `TimeProvider`.
241
        pub time_unit: Unit,
242
        /// Asset used for payment.
243
        pub asset_id: AssetId,
244
        /// Amount of asset / unit.
245
        pub rate: BalanceOrDuration,
246
        /// Minimum amount of time that can be used for mandatory change requests.
247
        pub minimum_request_deadline_delay: BalanceOrDuration,
248
        /// Minimal amount the source must deposit in the stream. Deposit can go
249
        /// lower due to time passing, but deposit cannot be **decreased** lower
250
        /// explicitly by the source. If this is non-zero, the stream can only
251
        /// be closed by the target or if the deposit is zero. If deposit is lower
252
        /// than minimum due to time passing, source is allowed to **increase** the
253
        /// deposit to a value lower than the soft minimum.
254
        ///
255
        /// This system guarantees to the target that the source cannot instantly
256
        /// stop paying, and may prepare for termination of service if the deposit
257
        /// is below this minimum.
258
        pub soft_minimum_deposit: BalanceOrDuration,
259
    }
260

            
261
    /// Origin of a change request.
262
    #[derive(
263
        RuntimeDebug,
264
        PartialEq,
265
        Eq,
266
        Encode,
267
        Decode,
268
        DecodeWithMemTracking,
269
        Copy,
270
        Clone,
271
2496
        TypeInfo,
272
        Serialize,
273
        Deserialize,
274
        MaxEncodedLen,
275
    )]
276
    pub enum Party {
277
53
        Source,
278
25
        Target,
279
    }
280

            
281
    impl Party {
282
4
        pub fn inverse(self) -> Self {
283
4
            match self {
284
2
                Party::Source => Party::Target,
285
2
                Party::Target => Party::Source,
286
            }
287
4
        }
288
    }
289

            
290
    /// Kind of change requested.
291
    #[derive(
292
        RuntimeDebug,
293
        PartialEq,
294
        Eq,
295
        Encode,
296
        Decode,
297
        Copy,
298
        Clone,
299
1248
        TypeInfo,
300
        Serialize,
301
        Deserialize,
302
        MaxEncodedLen,
303
        DecodeWithMemTracking,
304
    )]
305
    pub enum ChangeKind<Time> {
306
22
        /// The requested change is a suggestion, and the other party doesn't
307
        /// need to accept it.
308
        Suggestion,
309
        /// The requested change is mandatory, and the other party must either
310
        /// accept the change or close the stream. Reaching the deadline will
311
        /// close the stream too.
312
        Mandatory { deadline: Time },
313
    }
314

            
315
    /// Describe how the deposit should change.
316
    #[derive(
317
        RuntimeDebug,
318
        PartialEq,
319
        Eq,
320
        Encode,
321
        Decode,
322
        Copy,
323
        Clone,
324
1872
        TypeInfo,
325
        Serialize,
326
        Deserialize,
327
        MaxEncodedLen,
328
        DecodeWithMemTracking,
329
    )]
330
    pub enum DepositChange<Balance> {
331
        /// Increase deposit by given amount.
332
        Increase(Balance),
333
        /// Decrease deposit by given amount.
334
        Decrease(Balance),
335
        /// Set deposit to given amount.
336
        Absolute(Balance),
337
    }
338

            
339
    /// A request to change a stream config.
340
    #[derive(
341
        RuntimeDebug,
342
        PartialEq,
343
        Eq,
344
        Encode,
345
        Decode,
346
        Clone,
347
2496
        TypeInfo,
348
        Serialize,
349
        Deserialize,
350
        MaxEncodedLen,
351
    )]
352
    pub struct ChangeRequest<Unit, AssetId, Balance> {
353
        pub requester: Party,
354
        pub kind: ChangeKind<Balance>,
355
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
356
        pub deposit_change: Option<DepositChange<Balance>>,
357
    }
358

            
359
    pub type StreamOf<T> =
360
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
361

            
362
    pub type StreamConfigOf<T> =
363
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
364

            
365
    pub type ChangeRequestOf<T> =
366
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
367

            
368
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
369
    pub struct StreamPaymentStatus<Balance> {
370
        pub payment: Balance,
371
        pub deposit_left: Balance,
372
        /// Whenever the stream is stalled, which can occur either when no funds are left or
373
        /// if the time is past a mandatory request deadline.
374
        pub stalled: bool,
375
    }
376

            
377
    /// Store the next available stream id.
378
1000
    #[pallet::storage]
379
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
380

            
381
    /// Store each stream indexed by an Id.
382
1098
    #[pallet::storage]
383
    pub type Streams<T: Config> = StorageMap<
384
        Hasher = Blake2_128Concat,
385
        Key = T::StreamId,
386
        Value = StreamOf<T>,
387
        QueryKind = OptionQuery,
388
    >;
389

            
390
    /// Lookup for all streams with given source.
391
    /// To avoid maintaining a growing list of stream ids, they are stored in
392
    /// the form of an entry (AccountId, StreamId). If such entry exists then
393
    /// this AccountId is a source in StreamId. One can iterate over all storage
394
    /// keys starting with the AccountId to find all StreamIds.
395
740
    #[pallet::storage]
396
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
397
        Key1 = AccountIdOf<T>,
398
        Hasher1 = Blake2_128Concat,
399
        Key2 = T::StreamId,
400
        Hasher2 = Blake2_128Concat,
401
        Value = (),
402
        QueryKind = OptionQuery,
403
    >;
404

            
405
    /// Lookup for all streams with given target.
406
    /// To avoid maintaining a growing list of stream ids, they are stored in
407
    /// the form of an entry (AccountId, StreamId). If such entry exists then
408
    /// this AccountId is a target in StreamId. One can iterate over all storage
409
    /// keys starting with the AccountId to find all StreamIds.
410
740
    #[pallet::storage]
411
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
412
        Key1 = AccountIdOf<T>,
413
        Hasher1 = Blake2_128Concat,
414
        Key2 = T::StreamId,
415
        Hasher2 = Blake2_128Concat,
416
        Value = (),
417
        QueryKind = OptionQuery,
418
    >;
419

            
420
624
    #[pallet::error]
421
    #[derive(Clone, PartialEq, Eq)]
422
    pub enum Error<T> {
423
        UnknownStreamId,
424
        StreamIdOverflow,
425
        UnauthorizedOrigin,
426
        CantBeBothSourceAndTarget,
427
        CantFetchCurrentTime,
428
        SourceCantDecreaseRate,
429
        TargetCantIncreaseRate,
430
        CantOverrideMandatoryChange,
431
        NoPendingRequest,
432
        CantAcceptOwnRequest,
433
        CanOnlyCancelOwnRequest,
434
        WrongRequestNonce,
435
        ChangingAssetRequiresAbsoluteDepositChange,
436
        TargetCantChangeDeposit,
437
        ImmediateDepositChangeRequiresSameAssetId,
438
        DeadlineCantBeInPast,
439
        CantFetchStatusBeforeLastTimeUpdated,
440
        DeadlineDelayIsBelowMinium,
441
        CantDecreaseDepositUnderSoftDepositMinimum,
442
        SourceCantCloseActiveStreamWithSoftDepositMinimum,
443
        CantCreateStreamWithDepositUnderSoftMinimum,
444
    }
445

            
446
624
    #[pallet::event]
447
236
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
448
    pub enum Event<T: Config> {
449
        StreamOpened {
450
            stream_id: T::StreamId,
451
        },
452
        StreamClosed {
453
            stream_id: T::StreamId,
454
            refunded: T::Balance,
455
        },
456
        StreamPayment {
457
            stream_id: T::StreamId,
458
            source: AccountIdOf<T>,
459
            target: AccountIdOf<T>,
460
            amount: T::Balance,
461
            stalled: bool,
462
        },
463
        StreamConfigChangeRequested {
464
            stream_id: T::StreamId,
465
            request_nonce: RequestNonce,
466
            requester: Party,
467
            old_config: StreamConfigOf<T>,
468
            new_config: StreamConfigOf<T>,
469
        },
470
        StreamConfigChanged {
471
            stream_id: T::StreamId,
472
            old_config: StreamConfigOf<T>,
473
            new_config: StreamConfigOf<T>,
474
            deposit_change: Option<DepositChange<T::Balance>>,
475
        },
476
    }
477

            
478
    /// Freeze reason to use if needed.
479
    #[pallet::composite_enum]
480
    pub enum FreezeReason {
481
        StreamPayment,
482
    }
483

            
484
    /// Hold reason to use if needed.
485
    #[pallet::composite_enum]
486
    pub enum HoldReason {
487
248
        StreamPayment,
488
562
        StreamOpened,
489
    }
490

            
491
624
    #[pallet::call]
492
    impl<T: Config> Pallet<T> {
493
        /// Create a payment stream from the origin to the target with provided config
494
        /// and initial deposit (in the asset defined in the config).
495
        #[pallet::call_index(0)]
496
        #[pallet::weight(T::WeightInfo::open_stream())]
497
        #[allow(clippy::useless_conversion)]
498
        pub fn open_stream(
499
            origin: OriginFor<T>,
500
            target: AccountIdOf<T>,
501
            config: StreamConfigOf<T>,
502
            initial_deposit: T::Balance,
503
92
        ) -> DispatchResultWithPostInfo {
504
92
            let origin = ensure_signed(origin)?;
505

            
506
92
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
507

            
508
86
            Ok(().into())
509
        }
510

            
511
        /// Close a given stream in which the origin is involved. It performs the pending payment
512
        /// before closing the stream.
513
        #[pallet::call_index(1)]
514
        #[pallet::weight(T::WeightInfo::close_stream())]
515
        #[allow(clippy::useless_conversion)]
516
        pub fn close_stream(
517
            origin: OriginFor<T>,
518
            stream_id: T::StreamId,
519
24
        ) -> DispatchResultWithPostInfo {
520
24
            let origin = ensure_signed(origin)?;
521
24
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
522

            
523
            // Only source or target can close a stream.
524
23
            ensure!(
525
23
                origin == stream.source || origin == stream.target,
526
1
                Error::<T>::UnauthorizedOrigin
527
            );
528

            
529
            // Update stream before closing it to ensure fair payment.
530
22
            Self::perform_stream_payment(stream_id, &mut stream)?;
531

            
532
            // If there is a soft minimum deposit, stream can be closed only by target or if deposit is empty.
533
22
            ensure!(
534
22
                stream.config.soft_minimum_deposit.is_zero()
535
3
                    || stream.deposit.is_zero()
536
2
                    || origin == stream.target,
537
1
                Error::<T>::SourceCantCloseActiveStreamWithSoftDepositMinimum
538
            );
539

            
540
            // Unfreeze funds left in the stream.
541
21
            T::AssetsManager::decrease_deposit(
542
21
                &stream.config.asset_id,
543
21
                &stream.source,
544
21
                stream.deposit,
545
21
            )?;
546

            
547
            // Release opening deposit
548
21
            if stream.opening_deposit > 0u32.into() {
549
21
                T::Currency::release(
550
21
                    &HoldReason::StreamOpened.into(),
551
21
                    &stream.source,
552
21
                    stream.opening_deposit,
553
21
                    Precision::Exact,
554
21
                )?;
555
            }
556

            
557
            // Remove stream from storage.
558
21
            Streams::<T>::remove(stream_id);
559
21
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
560
21
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
561
21

            
562
21
            // Emit event.
563
21
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
564
21
                stream_id,
565
21
                // TODO: Should `refunded` in event really include the opening_deposit?
566
21
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
567
21
            });
568
21

            
569
21
            Ok(().into())
570
        }
571

            
572
        /// Perform the pending payment of a stream. Anyone can call this.
573
        #[pallet::call_index(2)]
574
        #[pallet::weight(T::WeightInfo::perform_payment())]
575
        #[allow(clippy::useless_conversion)]
576
        pub fn perform_payment(
577
            origin: OriginFor<T>,
578
            stream_id: T::StreamId,
579
27
        ) -> DispatchResultWithPostInfo {
580
27
            // No problem with anyone updating any stream.
581
27
            let _ = ensure_signed(origin)?;
582

            
583
27
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
584
26
            Self::perform_stream_payment(stream_id, &mut stream)?;
585
26
            Streams::<T>::insert(stream_id, stream);
586
26

            
587
26
            Ok(().into())
588
        }
589

            
590
        /// Requests a change to a stream config or deposit.
591
        ///
592
        /// If the new config don't change the time unit and asset id, the change will be applied
593
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
594
        /// in the stream and will have to be approved by the other party.
595
        ///
596
        /// This call accepts a deposit change, which can only be provided by the source of the
597
        /// stream. An absolute change is required when changing asset id, as the current deposit
598
        /// will be released and a new deposit is required in the new asset.
599
        #[pallet::call_index(3)]
600
        #[pallet::weight(
601
            T::WeightInfo::request_change_immediate()
602
            .max(T::WeightInfo::request_change_delayed())
603
        )]
604
        #[allow(clippy::useless_conversion)]
605
        pub fn request_change(
606
            origin: OriginFor<T>,
607
            stream_id: T::StreamId,
608
            kind: ChangeKind<T::Balance>,
609
            new_config: StreamConfigOf<T>,
610
            deposit_change: Option<DepositChange<T::Balance>>,
611
61
        ) -> DispatchResultWithPostInfo {
612
61
            let origin = ensure_signed(origin)?;
613
61
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
614

            
615
60
            let requester = stream
616
60
                .account_to_party(origin)
617
60
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
618

            
619
59
            ensure!(
620
59
                requester == Party::Source || deposit_change.is_none(),
621
1
                Error::<T>::TargetCantChangeDeposit
622
            );
623

            
624
58
            if stream.config == new_config && deposit_change.is_none() {
625
1
                return Ok(().into());
626
57
            }
627

            
628
57
            if let ChangeKind::Mandatory { deadline } = kind {
629
20
                let now = T::TimeProvider::now(&stream.config.time_unit)
630
20
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
631

            
632
20
                let Some(diff) = deadline.checked_sub(&now) else {
633
1
                    return Err(Error::<T>::DeadlineCantBeInPast.into());
634
                };
635

            
636
19
                ensure!(
637
19
                    diff >= stream.config.minimum_request_deadline_delay,
638
1
                    Error::<T>::DeadlineDelayIsBelowMinium
639
                );
640
37
            }
641

            
642
            // If asset id and time unit are the same, we allow to make the change
643
            // immediatly if the origin is at a disadvantage.
644
            // We allow this even if there is already a pending request.
645
            // This checks that the deposit can't be **decreased** under
646
            // config.soft_minimum_deposit.
647
55
            if Self::maybe_immediate_change(
648
55
                stream_id,
649
55
                &mut stream,
650
55
                &new_config,
651
55
                deposit_change,
652
55
                requester,
653
55
            )? {
654
7
                return Ok(().into());
655
44
            }
656
44

            
657
44
            // If the source is requesting a change of asset, they must provide an absolute change.
658
44
            if requester == Party::Source
659
32
                && new_config.asset_id != stream.config.asset_id
660
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
661
            {
662
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
663
41
            }
664

            
665
            // If there is already a mandatory change request, only the origin
666
            // of this request can change it.
667
            if let Some(ChangeRequest {
668
                kind: ChangeKind::Mandatory { .. },
669
3
                requester: pending_requester,
670
                ..
671
5
            }) = &stream.pending_request
672
            {
673
3
                ensure!(
674
3
                    &requester == pending_requester,
675
2
                    Error::<T>::CantOverrideMandatoryChange
676
                );
677
38
            }
678

            
679
39
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
680
39
            stream.pending_request = Some(ChangeRequest {
681
39
                requester,
682
39
                kind,
683
39
                new_config: new_config.clone(),
684
39
                deposit_change,
685
39
            });
686
39

            
687
39
            // Emit event.
688
39
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
689
39
                stream_id,
690
39
                request_nonce: stream.request_nonce,
691
39
                requester,
692
39
                old_config: stream.config.clone(),
693
39
                new_config,
694
39
            });
695
39

            
696
39
            // Update storage.
697
39
            Streams::<T>::insert(stream_id, stream);
698
39

            
699
39
            Ok(().into())
700
        }
701

            
702
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
703
        /// frontrunning attacks. If the target made a request, the source is able to change their
704
        /// deposit.
705
        #[pallet::call_index(4)]
706
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
707
        #[allow(clippy::useless_conversion)]
708
        pub fn accept_requested_change(
709
            origin: OriginFor<T>,
710
            stream_id: T::StreamId,
711
            request_nonce: RequestNonce,
712
            deposit_change: Option<DepositChange<T::Balance>>,
713
31
        ) -> DispatchResultWithPostInfo {
714
31
            let origin = ensure_signed(origin)?;
715
31
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
716

            
717
30
            let accepter = stream
718
30
                .account_to_party(origin)
719
30
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
720

            
721
29
            let Some(request) = stream.pending_request.clone() else {
722
1
                return Err(Error::<T>::NoPendingRequest.into());
723
            };
724

            
725
28
            ensure!(
726
28
                request_nonce == stream.request_nonce,
727
1
                Error::<T>::WrongRequestNonce
728
            );
729
27
            ensure!(
730
27
                accepter != request.requester,
731
1
                Error::<T>::CantAcceptOwnRequest
732
            );
733

            
734
26
            ensure!(
735
26
                accepter == Party::Source || deposit_change.is_none(),
736
1
                Error::<T>::TargetCantChangeDeposit
737
            );
738

            
739
            // Perform pending payment before changing config.
740
25
            Self::perform_stream_payment(stream_id, &mut stream)?;
741

            
742
            // Apply change.
743
            // It is safe to override config now as we have already performed the payment.
744
            // Checks made in apply_deposit_change needs to be done with new config.
745
25
            let old_config = stream.config;
746
25
            stream.config = request.new_config;
747
25
            let deposit_change = deposit_change.or(request.deposit_change);
748
25

            
749
25
            match (
750
25
                old_config.asset_id == stream.config.asset_id,
751
25
                deposit_change,
752
            ) {
753
                // Same asset and a change, we apply it like in `change_deposit` call.
754
13
                (true, Some(change)) => {
755
13
                    Self::apply_deposit_change(&mut stream, change)?;
756
                }
757
                // Same asset and no change, no problem.
758
6
                (true, None) => (),
759
                // Change in asset with absolute new amount
760
3
                (false, Some(DepositChange::Absolute(amount))) => {
761
3
                    // As target chooses the deposit amount in new currency, we must ensure
762
3
                    // it is greater than the soft minimum deposit.
763
3
                    ensure!(
764
3
                        amount >= stream.config.soft_minimum_deposit,
765
1
                        Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum
766
                    );
767

            
768
                    // Release deposit in old asset.
769
2
                    T::AssetsManager::decrease_deposit(
770
2
                        &old_config.asset_id,
771
2
                        &stream.source,
772
2
                        stream.deposit,
773
2
                    )?;
774

            
775
                    // Make deposit in new asset.
776
2
                    T::AssetsManager::increase_deposit(
777
2
                        &stream.config.asset_id,
778
2
                        &stream.source,
779
2
                        amount,
780
2
                    )?;
781
2
                    stream.deposit = amount;
782
                }
783
                // It doesn't make sense to change asset while not providing an absolute new
784
                // amount.
785
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
786
            }
787

            
788
            // If time unit changes we need to update `last_time_updated` to be in the
789
            // new unit.
790
21
            if old_config.time_unit != stream.config.time_unit {
791
2
                stream.last_time_updated = T::TimeProvider::now(&stream.config.time_unit)
792
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
793
19
            }
794

            
795
            // Event
796
21
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
797
21
                stream_id,
798
21
                old_config,
799
21
                new_config: stream.config.clone(),
800
21
                deposit_change,
801
21
            });
802
21

            
803
21
            // Update stream in storage.
804
21
            stream.pending_request = None;
805
21
            Streams::<T>::insert(stream_id, stream);
806
21

            
807
21
            Ok(().into())
808
        }
809

            
810
        #[pallet::call_index(5)]
811
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
812
        #[allow(clippy::useless_conversion)]
813
        pub fn cancel_change_request(
814
            origin: OriginFor<T>,
815
            stream_id: T::StreamId,
816
6
        ) -> DispatchResultWithPostInfo {
817
6
            let origin = ensure_signed(origin)?;
818
6
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
819

            
820
5
            let accepter = stream
821
5
                .account_to_party(origin)
822
5
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
823

            
824
4
            let Some(request) = stream.pending_request.take() else {
825
1
                return Err(Error::<T>::NoPendingRequest.into());
826
            };
827

            
828
3
            ensure!(
829
3
                accepter == request.requester,
830
2
                Error::<T>::CanOnlyCancelOwnRequest
831
            );
832

            
833
            // Update storage.
834
            // Pending request is removed by calling `.take()`.
835
1
            Streams::<T>::insert(stream_id, stream);
836
1

            
837
1
            Ok(().into())
838
        }
839

            
840
        /// Allows immediately changing the deposit for a stream, which is simpler than
841
        /// calling `request_change` with the proper parameters.
842
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
843
        /// the call is included in a block, in which case the unit is no longer the same and quantities
844
        /// will not have the same scale/value.
845
        #[pallet::call_index(6)]
846
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
847
        #[allow(clippy::useless_conversion)]
848
        pub fn immediately_change_deposit(
849
            origin: OriginFor<T>,
850
            stream_id: T::StreamId,
851
            asset_id: T::AssetId,
852
            change: DepositChange<T::Balance>,
853
13
        ) -> DispatchResultWithPostInfo {
854
13
            let origin = ensure_signed(origin)?;
855
13
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
856

            
857
12
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
858
10
            ensure!(
859
10
                stream.config.asset_id == asset_id,
860
1
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
861
            );
862

            
863
            // Perform pending payment before changing deposit.
864
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
865

            
866
            // Apply change.
867
9
            Self::apply_deposit_change(&mut stream, change)?;
868

            
869
            // Event
870
4
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
871
4
                stream_id,
872
4
                old_config: stream.config.clone(),
873
4
                new_config: stream.config.clone(),
874
4
                deposit_change: Some(change),
875
4
            });
876
4

            
877
4
            // Update stream in storage.
878
4
            Streams::<T>::insert(stream_id, stream);
879
4

            
880
4
            Ok(().into())
881
        }
882
    }
883

            
884
    impl<T: Config> Pallet<T> {
885
        /// Try to open a stream and returns its id.
886
        /// Prefers calling this function from other pallets instead of `open_stream` as the
887
        /// latter can't return the id.
888
96
        pub fn open_stream_returns_id(
889
96
            origin: AccountIdOf<T>,
890
96
            target: AccountIdOf<T>,
891
96
            config: StreamConfigOf<T>,
892
96
            initial_deposit: T::Balance,
893
96
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
894
96
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
895

            
896
95
            ensure!(
897
95
                initial_deposit >= config.soft_minimum_deposit,
898
1
                Error::<T>::CantCreateStreamWithDepositUnderSoftMinimum
899
            );
900

            
901
            // Generate a new stream id.
902
94
            let stream_id = NextStreamId::<T>::get();
903
94
            let next_stream_id = stream_id
904
94
                .checked_add(&One::one())
905
94
                .ok_or(Error::<T>::StreamIdOverflow)?;
906
93
            NextStreamId::<T>::set(next_stream_id);
907
93

            
908
93
            // Hold opening deposit for the storage used by Stream
909
93
            let opening_deposit = T::OpenStreamHoldAmount::get();
910
93
            if opening_deposit > 0u32.into() {
911
93
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
912
            }
913

            
914
            // Freeze initial deposit.
915
93
            T::AssetsManager::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
916

            
917
            // Create stream data.
918
90
            let now =
919
91
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
920
90
            let stream = Stream {
921
90
                source: origin.clone(),
922
90
                target: target.clone(),
923
90
                config,
924
90
                deposit: initial_deposit,
925
90
                last_time_updated: now,
926
90
                request_nonce: 0,
927
90
                pending_request: None,
928
90
                opening_deposit,
929
90
            };
930
90

            
931
90
            // Insert stream in storage.
932
90
            Streams::<T>::insert(stream_id, stream);
933
90
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
934
90
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
935
90

            
936
90
            // Emit event.
937
90
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
938
90

            
939
90
            Ok(stream_id)
940
96
        }
941

            
942
        /// Get the stream payment current status, telling how much payment is
943
        /// pending, how much deposit will be left and whenever the stream is stalled.
944
        /// The stream is considered stalled if no funds are left or if the provided
945
        /// time is past a mandatory request deadline. If the provided `now` is `None`
946
        /// then the current time will be fetched. Being able to provide a custom `now`
947
        /// allows to check the status in the future. It is invalid to provide a `now` that is
948
        /// before `last_time_updated`.
949
55
        pub fn stream_payment_status(
950
55
            stream_id: T::StreamId,
951
55
            now: Option<T::Balance>,
952
55
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
953
55
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
954
43
            let now = match now {
955
                Some(v) => v,
956
43
                None => T::TimeProvider::now(&stream.config.time_unit)
957
43
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
958
            };
959

            
960
43
            let last_time_updated = stream.last_time_updated;
961
43

            
962
43
            ensure!(
963
43
                now >= last_time_updated,
964
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
965
            );
966

            
967
43
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
968
55
        }
969

            
970
136
        fn stream_payment_status_by_ref(
971
136
            stream: &StreamOf<T>,
972
136
            last_time_updated: T::Balance,
973
136
            mut now: T::Balance,
974
136
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
975
136
            let mut stalled_by_deadline = false;
976

            
977
            // Take into account mandatory change request deadline. Note that
978
            // while it'll perform payment up to deadline,
979
            // `stream.last_time_updated` is still the "real now" to avoid
980
            // retroactive payment in case the deadline changes.
981
            if let Some(ChangeRequest {
982
34
                kind: ChangeKind::Mandatory { deadline },
983
                ..
984
46
            }) = &stream.pending_request
985
            {
986
34
                now = min(now, *deadline);
987
34

            
988
34
                if now == *deadline {
989
15
                    stalled_by_deadline = true;
990
21
                }
991
102
            }
992

            
993
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
994
136
            if stream.deposit.is_zero() {
995
                return Ok(StreamPaymentStatus {
996
                    payment: 0u32.into(),
997
                    deposit_left: stream.deposit,
998
                    stalled: true,
999
                });
136
            }
            // Dont perform payment if now is before or equal to `last_time_updated`.
            // It can be before due to the deadline adjustment.
136
            let Some(delta) = now.checked_sub(&last_time_updated) else {
2
                return Ok(StreamPaymentStatus {
2
                    payment: 0u32.into(),
2
                    deposit_left: stream.deposit,
2
                    stalled: true,
2
                });
            };
            // We compute the amount due to the target according to the rate, which may be
            // lowered if the stream deposit is lower.
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
            // considering it an error can make a stream un-updatable if too much time has passed
            // without updates.
134
            let mut payment = delta.saturating_mul(stream.config.rate);
            // We compute the new amount of locked funds. If it underflows it
            // means that there is more to pay that what is left, in which case
            // we pay all that is left.
134
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
132
                Some(v) if v.is_zero() => (v, true),
129
                Some(v) => (v, stalled_by_deadline),
                None => {
2
                    payment = stream.deposit;
2
                    (Zero::zero(), true)
                }
            };
134
            Ok(StreamPaymentStatus {
134
                payment,
134
                deposit_left,
134
                stalled,
134
            })
136
        }
        /// Behavior:
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
        /// last time the stream was updated. When updating the stream, **at most**
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
        /// account. If this amount is greater than the left deposit, the stream is considered
        /// drained **but not closed**. The source can come back later and refill the stream,
        /// however there will be no retroactive payment for the time spent as drained.
        /// If the stream payment is used to rent a service, the target should pause the service
        /// while the stream is drained, and resume it once it is refilled.
93
        fn perform_stream_payment(
93
            stream_id: T::StreamId,
93
            stream: &mut StreamOf<T>,
93
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
93
            let now = T::TimeProvider::now(&stream.config.time_unit)
93
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
            // We want to update `stream.last_time_updated` to `now` as soon
            // as possible to avoid forgetting to do it. We copy the old value
            // for payment computation.
93
            let last_time_updated = stream.last_time_updated;
93
            stream.last_time_updated = now;
            let StreamPaymentStatus {
93
                payment,
93
                deposit_left,
93
                stalled,
93
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
93
            if payment.is_zero() {
39
                return Ok(0u32.into());
54
            }
54

            
54
            // Transfer from the source to target.
54
            T::AssetsManager::transfer_deposit(
54
                &stream.config.asset_id,
54
                &stream.source,
54
                &stream.target,
54
                payment,
54
            )?;
            // Update stream info.
54
            stream.deposit = deposit_left;
54

            
54
            // Emit event.
54
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
54
                stream_id,
54
                source: stream.source.clone(),
54
                target: stream.target.clone(),
54
                amount: payment,
54
                stalled,
54
            });
54

            
54
            Ok(payment)
93
        }
31
        fn apply_deposit_change(
31
            stream: &mut StreamOf<T>,
31
            change: DepositChange<T::Balance>,
31
        ) -> DispatchResultWithPostInfo {
31
            match change {
8
                DepositChange::Absolute(amount) => {
8
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::AssetsManager::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
7
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
7
                        if amount < stream.config.soft_minimum_deposit {
2
                            return Err(
2
                                Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into()
2
                            );
5
                        }
5

            
5
                        T::AssetsManager::decrease_deposit(
5
                            &stream.config.asset_id,
5
                            &stream.source,
5
                            decrease,
5
                        )?;
                    }
5
                    stream.deposit = amount;
                }
17
                DepositChange::Increase(increase) => {
17
                    stream.deposit = stream
17
                        .deposit
17
                        .checked_add(&increase)
17
                        .ok_or(ArithmeticError::Overflow)?;
15
                    T::AssetsManager::increase_deposit(
15
                        &stream.config.asset_id,
15
                        &stream.source,
15
                        increase,
15
                    )?;
                }
6
                DepositChange::Decrease(decrease) => {
6
                    let new_deposit = stream
6
                        .deposit
6
                        .checked_sub(&decrease)
6
                        .ok_or(ArithmeticError::Underflow)?;
4
                    if new_deposit < stream.config.soft_minimum_deposit {
2
                        return Err(Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into());
2
                    }
2

            
2
                    stream.deposit = new_deposit;
2
                    T::AssetsManager::decrease_deposit(
2
                        &stream.config.asset_id,
2
                        &stream.source,
2
                        decrease,
2
                    )?;
                }
            }
22
            Ok(().into())
31
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
55
        fn maybe_immediate_change(
55
            stream_id: T::StreamId,
55
            stream: &mut StreamOf<T>,
55
            new_config: &StreamConfigOf<T>,
55
            deposit_change: Option<DepositChange<T::Balance>>,
55
            requester: Party,
55
        ) -> Result<bool, DispatchErrorWithPostInfo> {
55
            if new_config.time_unit != stream.config.time_unit
37
                || new_config.asset_id != stream.config.asset_id
            {
24
                return Ok(false);
31
            }
31

            
31
            if requester == Party::Source && new_config.rate < stream.config.rate {
16
                return Ok(false);
15
            }
15

            
15
            if requester == Party::Target && new_config.rate > stream.config.rate {
4
                return Ok(false);
11
            }
11

            
11
            // Perform pending payment before changing config.
11
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
11
            if let Some(change) = deposit_change {
9
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
7
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
7
                stream_id,
7
                old_config: stream.config.clone(),
7
                new_config: new_config.clone(),
7
                deposit_change,
7
            });
7

            
7
            // Update storage.
7
            stream.config = new_config.clone();
7
            Streams::<T>::insert(stream_id, stream);
7

            
7
            Ok(true)
55
        }
    }
}