1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19
extern crate alloc;
20

            
21
#[cfg(test)]
22
mod mock;
23

            
24
#[cfg(test)]
25
mod tests;
26

            
27
#[cfg(feature = "runtime-benchmarks")]
28
mod benchmarking;
29

            
30
#[cfg(feature = "migrations")]
31
pub mod migrations;
32

            
33
pub mod weights;
34
pub use weights::WeightInfo;
35

            
36
use {
37
    alloc::fmt::Debug,
38
    core::cmp::min,
39
    core::marker::PhantomData,
40
    frame_support::{
41
        dispatch::DispatchErrorWithPostInfo,
42
        pallet,
43
        pallet_prelude::*,
44
        storage::types::{StorageDoubleMap, StorageMap},
45
        traits::{
46
            fungible::{Inspect, MutateHold},
47
            tokens::{Balance, Precision},
48
        },
49
        Blake2_128Concat,
50
    },
51
    frame_system::pallet_prelude::*,
52
    parity_scale_codec::{DecodeWithMemTracking, FullCodec, MaxEncodedLen},
53
    scale_info::TypeInfo,
54
    serde::{Deserialize, Serialize},
55
    sp_runtime::{
56
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
57
        ArithmeticError,
58
    },
59
};
60

            
61
pub use pallet::*;
62

            
63
/// Type able to provide the current time for given unit.
64
/// For each unit the returned number should monotonically increase and not
65
/// overflow.
66
pub trait TimeProvider<Unit, Number> {
67
    fn now(unit: &Unit) -> Option<Number>;
68

            
69
    /// Benchmarks: should return the time unit which has the worst performance calling
70
    /// `TimeProvider::now(unit)` with.
71
    #[cfg(feature = "runtime-benchmarks")]
72
    fn bench_worst_case_time_unit() -> Unit;
73

            
74
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
75
    #[cfg(feature = "runtime-benchmarks")]
76
    fn bench_set_now(instant: Number);
77
}
78

            
79
/// Interactions the pallet needs with assets.
80
pub trait AssetsManager<AccountId, AssetId, Balance> {
81
    /// Transfer assets deposited by an account to another account.
82
    /// Those assets should not be considered deposited in the target account.
83
    fn transfer_deposit(
84
        asset_id: &AssetId,
85
        from: &AccountId,
86
        to: &AccountId,
87
        amount: Balance,
88
    ) -> DispatchResult;
89

            
90
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
91
    /// enough of that asset. Funds should be safe and not slashable.
92
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
93
        -> DispatchResult;
94

            
95
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
96
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
97
        -> DispatchResult;
98

            
99
    /// Return the deposit for given asset and account.
100
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
101

            
102
    /// Benchmarks: should return the asset id which has the worst performance when interacting
103
    /// with it.
104
    #[cfg(feature = "runtime-benchmarks")]
105
    fn bench_worst_case_asset_id() -> AssetId;
106

            
107
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
108
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
109
    /// from one asset to another. If there is only one asset id it is fine to return it in both
110
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
111
    #[cfg(feature = "runtime-benchmarks")]
112
    fn bench_worst_case_asset_id2() -> AssetId;
113

            
114
    /// Benchmarks: should set the balance.
115
    #[cfg(feature = "runtime-benchmarks")]
116
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
117
}
118

            
119
#[pallet]
120
pub mod pallet {
121
    use super::*;
122

            
123
    /// Pooled Staking pallet.
124
    #[pallet::pallet]
125
    pub struct Pallet<T>(PhantomData<T>);
126

            
127
    #[pallet::config]
128
    pub trait Config: frame_system::Config {
129
        /// Type used to represent stream ids. Should be large enough to not overflow.
130
        type StreamId: AtLeast32BitUnsigned
131
            + Default
132
            + Debug
133
            + Copy
134
            + Clone
135
            + FullCodec
136
            + TypeInfo
137
            + MaxEncodedLen;
138

            
139
        /// The balance type, which is also the type representing time (as this
140
        /// pallet will do math with both time and balances to compute how
141
        /// much should be paid).
142
        type Balance: Balance;
143

            
144
        /// Type representing an asset id, a identifier allowing distinguishing assets.
145
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
146

            
147
        /// Provide interaction with assets.
148
        type AssetsManager: AssetsManager<Self::AccountId, Self::AssetId, Self::Balance>;
149

            
150
        /// Currency for the opening balance hold for the storage used by the Stream.
151
        /// NOT to be confused with Assets.
152
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
153
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
154

            
155
        type RuntimeHoldReason: From<HoldReason>;
156

            
157
        #[pallet::constant]
158
        type OpenStreamHoldAmount: Get<Self::Balance>;
159

            
160
        /// Represents which units of time can be used. Designed to be an enum
161
        /// with a variant for each kind of time source/scale supported.
162
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
163

            
164
        /// Provide the current time in given unit.
165
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
166

            
167
        type WeightInfo: weights::WeightInfo;
168
    }
169

            
170
    pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
171
    pub type AssetIdOf<T> = <T as Config>::AssetId;
172

            
173
    pub type RequestNonce = u32;
174

            
175
    /// A stream payment from source to target.
176
    /// Stores the last time the stream was updated, which allows to compute
177
    /// elapsed time and perform payment.
178
    #[derive(
179
        RuntimeDebug,
180
        PartialEq,
181
        Eq,
182
        Encode,
183
        Decode,
184
        Clone,
185
        TypeInfo,
186
        Serialize,
187
        Deserialize,
188
        MaxEncodedLen,
189
    )]
190
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
191
        /// Payer, source of the stream.
192
        pub source: AccountId,
193
        /// Payee, target of the stream.
194
        pub target: AccountId,
195
        /// Stream config
196
        pub config: StreamConfig<Unit, AssetId, Balance>,
197
        /// How much is deposited to fund this stream.
198
        pub deposit: Balance,
199
        /// Last time the stream was updated in `config.time_unit`.
200
        pub last_time_updated: Balance,
201
        /// Nonce for requests. This prevents a request to make a first request
202
        /// then change it to another request to frontrun the other party
203
        /// accepting.
204
        pub request_nonce: RequestNonce,
205
        /// A pending change request if any.
206
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
207
        /// One-time opening deposit. Will be released on close.
208
        pub opening_deposit: Balance,
209
    }
210

            
211
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
212
71
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
213
71
            match account {
214
71
                a if a == self.source => Some(Party::Source),
215
27
                a if a == self.target => Some(Party::Target),
216
3
                _ => None,
217
            }
218
71
        }
219
    }
220

            
221
    /// Stream configuration.
222
    #[derive(
223
        RuntimeDebug,
224
        PartialEq,
225
        Eq,
226
        Encode,
227
        Decode,
228
        Copy,
229
        Clone,
230
        TypeInfo,
231
        Serialize,
232
        Deserialize,
233
        MaxEncodedLen,
234
        DecodeWithMemTracking,
235
    )]
236
    pub struct StreamConfig<Unit, AssetId, BalanceOrDuration> {
237
        /// Unit in which time is measured using a `TimeProvider`.
238
        pub time_unit: Unit,
239
        /// Asset used for payment.
240
        pub asset_id: AssetId,
241
        /// Amount of asset / unit.
242
        pub rate: BalanceOrDuration,
243
        /// Minimum amount of time that can be used for mandatory change requests.
244
        pub minimum_request_deadline_delay: BalanceOrDuration,
245
        /// Minimal amount the source must deposit in the stream. Deposit can go
246
        /// lower due to time passing, but deposit cannot be **decreased** lower
247
        /// explicitly by the source. If this is non-zero, the stream can only
248
        /// be closed by the target or if the deposit is zero. If deposit is lower
249
        /// than minimum due to time passing, source is allowed to **increase** the
250
        /// deposit to a value lower than the soft minimum.
251
        ///
252
        /// This system guarantees to the target that the source cannot instantly
253
        /// stop paying, and may prepare for termination of service if the deposit
254
        /// is below this minimum.
255
        pub soft_minimum_deposit: BalanceOrDuration,
256
    }
257

            
258
    /// Origin of a change request.
259
    #[derive(
260
        RuntimeDebug,
261
        PartialEq,
262
        Eq,
263
        Encode,
264
        Decode,
265
        DecodeWithMemTracking,
266
        Copy,
267
        Clone,
268
        TypeInfo,
269
        Serialize,
270
        Deserialize,
271
        MaxEncodedLen,
272
    )]
273
    pub enum Party {
274
        Source,
275
        Target,
276
    }
277

            
278
    impl Party {
279
4
        pub fn inverse(self) -> Self {
280
4
            match self {
281
2
                Party::Source => Party::Target,
282
2
                Party::Target => Party::Source,
283
            }
284
4
        }
285
    }
286

            
287
    /// Kind of change requested.
288
    #[derive(
289
        RuntimeDebug,
290
        PartialEq,
291
        Eq,
292
        Encode,
293
        Decode,
294
        Copy,
295
        Clone,
296
        TypeInfo,
297
        Serialize,
298
        Deserialize,
299
        MaxEncodedLen,
300
        DecodeWithMemTracking,
301
    )]
302
    pub enum ChangeKind<Time> {
303
        /// The requested change is a suggestion, and the other party doesn't
304
        /// need to accept it.
305
        Suggestion,
306
        /// The requested change is mandatory, and the other party must either
307
        /// accept the change or close the stream. Reaching the deadline will
308
        /// close the stream too.
309
        Mandatory { deadline: Time },
310
    }
311

            
312
    /// Describe how the deposit should change.
313
    #[derive(
314
        RuntimeDebug,
315
        PartialEq,
316
        Eq,
317
        Encode,
318
        Decode,
319
        Copy,
320
        Clone,
321
        TypeInfo,
322
        Serialize,
323
        Deserialize,
324
        MaxEncodedLen,
325
        DecodeWithMemTracking,
326
    )]
327
    pub enum DepositChange<Balance> {
328
        /// Increase deposit by given amount.
329
        Increase(Balance),
330
        /// Decrease deposit by given amount.
331
        Decrease(Balance),
332
        /// Set deposit to given amount.
333
        Absolute(Balance),
334
    }
335

            
336
    /// A request to change a stream config.
337
    #[derive(
338
        RuntimeDebug,
339
        PartialEq,
340
        Eq,
341
        Encode,
342
        Decode,
343
        Clone,
344
        TypeInfo,
345
        Serialize,
346
        Deserialize,
347
        MaxEncodedLen,
348
    )]
349
    pub struct ChangeRequest<Unit, AssetId, Balance> {
350
        pub requester: Party,
351
        pub kind: ChangeKind<Balance>,
352
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
353
        pub deposit_change: Option<DepositChange<Balance>>,
354
    }
355

            
356
    pub type StreamOf<T> =
357
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
358

            
359
    pub type StreamConfigOf<T> =
360
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
361

            
362
    pub type ChangeRequestOf<T> =
363
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
364

            
365
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
366
    pub struct StreamPaymentStatus<Balance> {
367
        pub payment: Balance,
368
        pub deposit_left: Balance,
369
        /// Whenever the stream is stalled, which can occur either when no funds are left or
370
        /// if the time is past a mandatory request deadline.
371
        pub stalled: bool,
372
    }
373

            
374
    /// Store the next available stream id.
375
    #[pallet::storage]
376
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
377

            
378
    /// Store each stream indexed by an Id.
379
    #[pallet::storage]
380
    pub type Streams<T: Config> = StorageMap<
381
        Hasher = Blake2_128Concat,
382
        Key = T::StreamId,
383
        Value = StreamOf<T>,
384
        QueryKind = OptionQuery,
385
    >;
386

            
387
    /// Lookup for all streams with given source.
388
    /// To avoid maintaining a growing list of stream ids, they are stored in
389
    /// the form of an entry (AccountId, StreamId). If such entry exists then
390
    /// this AccountId is a source in StreamId. One can iterate over all storage
391
    /// keys starting with the AccountId to find all StreamIds.
392
    #[pallet::storage]
393
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
394
        Key1 = AccountIdOf<T>,
395
        Hasher1 = Blake2_128Concat,
396
        Key2 = T::StreamId,
397
        Hasher2 = Blake2_128Concat,
398
        Value = (),
399
        QueryKind = OptionQuery,
400
    >;
401

            
402
    /// Lookup for all streams with given target.
403
    /// To avoid maintaining a growing list of stream ids, they are stored in
404
    /// the form of an entry (AccountId, StreamId). If such entry exists then
405
    /// this AccountId is a target in StreamId. One can iterate over all storage
406
    /// keys starting with the AccountId to find all StreamIds.
407
    #[pallet::storage]
408
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
409
        Key1 = AccountIdOf<T>,
410
        Hasher1 = Blake2_128Concat,
411
        Key2 = T::StreamId,
412
        Hasher2 = Blake2_128Concat,
413
        Value = (),
414
        QueryKind = OptionQuery,
415
    >;
416

            
417
    #[pallet::error]
418
    #[derive(Clone, PartialEq, Eq)]
419
    pub enum Error<T> {
420
        UnknownStreamId,
421
        StreamIdOverflow,
422
        UnauthorizedOrigin,
423
        CantBeBothSourceAndTarget,
424
        CantFetchCurrentTime,
425
        SourceCantDecreaseRate,
426
        TargetCantIncreaseRate,
427
        CantOverrideMandatoryChange,
428
        NoPendingRequest,
429
        CantAcceptOwnRequest,
430
        CanOnlyCancelOwnRequest,
431
        WrongRequestNonce,
432
        ChangingAssetRequiresAbsoluteDepositChange,
433
        TargetCantChangeDeposit,
434
        ImmediateDepositChangeRequiresSameAssetId,
435
        DeadlineCantBeInPast,
436
        CantFetchStatusBeforeLastTimeUpdated,
437
        DeadlineDelayIsBelowMinium,
438
        CantDecreaseDepositUnderSoftDepositMinimum,
439
        SourceCantCloseActiveStreamWithSoftDepositMinimum,
440
        CantCreateStreamWithDepositUnderSoftMinimum,
441
    }
442

            
443
    #[pallet::event]
444
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
445
    pub enum Event<T: Config> {
446
        StreamOpened {
447
            stream_id: T::StreamId,
448
        },
449
        StreamClosed {
450
            stream_id: T::StreamId,
451
            refunded: T::Balance,
452
        },
453
        StreamPayment {
454
            stream_id: T::StreamId,
455
            source: AccountIdOf<T>,
456
            target: AccountIdOf<T>,
457
            amount: T::Balance,
458
            stalled: bool,
459
        },
460
        StreamConfigChangeRequested {
461
            stream_id: T::StreamId,
462
            request_nonce: RequestNonce,
463
            requester: Party,
464
            old_config: StreamConfigOf<T>,
465
            new_config: StreamConfigOf<T>,
466
        },
467
        StreamConfigChanged {
468
            stream_id: T::StreamId,
469
            old_config: StreamConfigOf<T>,
470
            new_config: StreamConfigOf<T>,
471
            deposit_change: Option<DepositChange<T::Balance>>,
472
        },
473
    }
474

            
475
    /// Freeze reason to use if needed.
476
    #[pallet::composite_enum]
477
    pub enum FreezeReason {
478
        StreamPayment,
479
    }
480

            
481
    /// Hold reason to use if needed.
482
    #[pallet::composite_enum]
483
    pub enum HoldReason {
484
        StreamPayment,
485
        StreamOpened,
486
    }
487

            
488
    #[pallet::call]
489
    impl<T: Config> Pallet<T> {
490
        /// Create a payment stream from the origin to the target with provided config
491
        /// and initial deposit (in the asset defined in the config).
492
        #[pallet::call_index(0)]
493
        #[pallet::weight(T::WeightInfo::open_stream())]
494
        #[allow(clippy::useless_conversion)]
495
        pub fn open_stream(
496
            origin: OriginFor<T>,
497
            target: AccountIdOf<T>,
498
            config: StreamConfigOf<T>,
499
            initial_deposit: T::Balance,
500
80
        ) -> DispatchResultWithPostInfo {
501
80
            let origin = ensure_signed(origin)?;
502

            
503
80
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
504

            
505
74
            Ok(().into())
506
        }
507

            
508
        /// Close a given stream in which the origin is involved. It performs the pending payment
509
        /// before closing the stream.
510
        #[pallet::call_index(1)]
511
        #[pallet::weight(T::WeightInfo::close_stream())]
512
        #[allow(clippy::useless_conversion)]
513
        pub fn close_stream(
514
            origin: OriginFor<T>,
515
            stream_id: T::StreamId,
516
12
        ) -> DispatchResultWithPostInfo {
517
12
            let origin = ensure_signed(origin)?;
518
12
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
519

            
520
            // Only source or target can close a stream.
521
11
            ensure!(
522
11
                origin == stream.source || origin == stream.target,
523
1
                Error::<T>::UnauthorizedOrigin
524
            );
525

            
526
            // Update stream before closing it to ensure fair payment.
527
10
            Self::perform_stream_payment(stream_id, &mut stream)?;
528

            
529
            // If there is a soft minimum deposit, stream can be closed only by target or if deposit is empty.
530
10
            ensure!(
531
10
                stream.config.soft_minimum_deposit.is_zero()
532
3
                    || stream.deposit.is_zero()
533
2
                    || origin == stream.target,
534
1
                Error::<T>::SourceCantCloseActiveStreamWithSoftDepositMinimum
535
            );
536

            
537
            // Unfreeze funds left in the stream.
538
9
            T::AssetsManager::decrease_deposit(
539
9
                &stream.config.asset_id,
540
9
                &stream.source,
541
9
                stream.deposit,
542
            )?;
543

            
544
            // Release opening deposit
545
9
            if stream.opening_deposit > 0u32.into() {
546
9
                T::Currency::release(
547
9
                    &HoldReason::StreamOpened.into(),
548
9
                    &stream.source,
549
9
                    stream.opening_deposit,
550
9
                    Precision::Exact,
551
                )?;
552
            }
553

            
554
            // Remove stream from storage.
555
9
            Streams::<T>::remove(stream_id);
556
9
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
557
9
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
558

            
559
            // Emit event.
560
9
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
561
9
                stream_id,
562
9
                // TODO: Should `refunded` in event really include the opening_deposit?
563
9
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
564
9
            });
565

            
566
9
            Ok(().into())
567
        }
568

            
569
        /// Perform the pending payment of a stream. Anyone can call this.
570
        #[pallet::call_index(2)]
571
        #[pallet::weight(T::WeightInfo::perform_payment())]
572
        #[allow(clippy::useless_conversion)]
573
        pub fn perform_payment(
574
            origin: OriginFor<T>,
575
            stream_id: T::StreamId,
576
15
        ) -> DispatchResultWithPostInfo {
577
            // No problem with anyone updating any stream.
578
15
            let _ = ensure_signed(origin)?;
579

            
580
15
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
581
14
            Self::perform_stream_payment(stream_id, &mut stream)?;
582
14
            Streams::<T>::insert(stream_id, stream);
583

            
584
14
            Ok(().into())
585
        }
586

            
587
        /// Requests a change to a stream config or deposit.
588
        ///
589
        /// If the new config don't change the time unit and asset id, the change will be applied
590
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
591
        /// in the stream and will have to be approved by the other party.
592
        ///
593
        /// This call accepts a deposit change, which can only be provided by the source of the
594
        /// stream. An absolute change is required when changing asset id, as the current deposit
595
        /// will be released and a new deposit is required in the new asset.
596
        #[pallet::call_index(3)]
597
        #[pallet::weight(
598
            T::WeightInfo::request_change_immediate()
599
            .max(T::WeightInfo::request_change_delayed())
600
        )]
601
        #[allow(clippy::useless_conversion)]
602
        pub fn request_change(
603
            origin: OriginFor<T>,
604
            stream_id: T::StreamId,
605
            kind: ChangeKind<T::Balance>,
606
            new_config: StreamConfigOf<T>,
607
            deposit_change: Option<DepositChange<T::Balance>>,
608
49
        ) -> DispatchResultWithPostInfo {
609
49
            let origin = ensure_signed(origin)?;
610
49
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
611

            
612
48
            let requester = stream
613
48
                .account_to_party(origin)
614
48
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
615

            
616
47
            ensure!(
617
47
                requester == Party::Source || deposit_change.is_none(),
618
1
                Error::<T>::TargetCantChangeDeposit
619
            );
620

            
621
46
            if stream.config == new_config && deposit_change.is_none() {
622
1
                return Ok(().into());
623
45
            }
624

            
625
45
            if let ChangeKind::Mandatory { deadline } = kind {
626
8
                let now = T::TimeProvider::now(&stream.config.time_unit)
627
8
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
628

            
629
8
                let Some(diff) = deadline.checked_sub(&now) else {
630
1
                    return Err(Error::<T>::DeadlineCantBeInPast.into());
631
                };
632

            
633
7
                ensure!(
634
7
                    diff >= stream.config.minimum_request_deadline_delay,
635
1
                    Error::<T>::DeadlineDelayIsBelowMinium
636
                );
637
37
            }
638

            
639
            // If asset id and time unit are the same, we allow to make the change
640
            // immediatly if the origin is at a disadvantage.
641
            // We allow this even if there is already a pending request.
642
            // This checks that the deposit can't be **decreased** under
643
            // config.soft_minimum_deposit.
644
43
            if Self::maybe_immediate_change(
645
43
                stream_id,
646
43
                &mut stream,
647
43
                &new_config,
648
43
                deposit_change,
649
43
                requester,
650
4
            )? {
651
7
                return Ok(().into());
652
32
            }
653

            
654
            // If the source is requesting a change of asset, they must provide an absolute change.
655
32
            if requester == Party::Source
656
20
                && new_config.asset_id != stream.config.asset_id
657
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
658
            {
659
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
660
29
            }
661

            
662
            // If there is already a mandatory change request, only the origin
663
            // of this request can change it.
664
            if let Some(ChangeRequest {
665
                kind: ChangeKind::Mandatory { .. },
666
3
                requester: pending_requester,
667
                ..
668
5
            }) = &stream.pending_request
669
            {
670
3
                ensure!(
671
3
                    &requester == pending_requester,
672
2
                    Error::<T>::CantOverrideMandatoryChange
673
                );
674
26
            }
675

            
676
27
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
677
27
            stream.pending_request = Some(ChangeRequest {
678
27
                requester,
679
27
                kind,
680
27
                new_config: new_config.clone(),
681
27
                deposit_change,
682
27
            });
683

            
684
            // Emit event.
685
27
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
686
27
                stream_id,
687
27
                request_nonce: stream.request_nonce,
688
27
                requester,
689
27
                old_config: stream.config.clone(),
690
27
                new_config,
691
27
            });
692

            
693
            // Update storage.
694
27
            Streams::<T>::insert(stream_id, stream);
695

            
696
27
            Ok(().into())
697
        }
698

            
699
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
700
        /// frontrunning attacks. If the target made a request, the source is able to change their
701
        /// deposit.
702
        #[pallet::call_index(4)]
703
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
704
        #[allow(clippy::useless_conversion)]
705
        pub fn accept_requested_change(
706
            origin: OriginFor<T>,
707
            stream_id: T::StreamId,
708
            request_nonce: RequestNonce,
709
            deposit_change: Option<DepositChange<T::Balance>>,
710
19
        ) -> DispatchResultWithPostInfo {
711
19
            let origin = ensure_signed(origin)?;
712
19
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
713

            
714
18
            let accepter = stream
715
18
                .account_to_party(origin)
716
18
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
717

            
718
17
            let Some(request) = stream.pending_request.clone() else {
719
1
                return Err(Error::<T>::NoPendingRequest.into());
720
            };
721

            
722
16
            ensure!(
723
16
                request_nonce == stream.request_nonce,
724
1
                Error::<T>::WrongRequestNonce
725
            );
726
15
            ensure!(
727
15
                accepter != request.requester,
728
1
                Error::<T>::CantAcceptOwnRequest
729
            );
730

            
731
14
            ensure!(
732
14
                accepter == Party::Source || deposit_change.is_none(),
733
1
                Error::<T>::TargetCantChangeDeposit
734
            );
735

            
736
            // Perform pending payment before changing config.
737
13
            Self::perform_stream_payment(stream_id, &mut stream)?;
738

            
739
            // Apply change.
740
            // It is safe to override config now as we have already performed the payment.
741
            // Checks made in apply_deposit_change needs to be done with new config.
742
13
            let old_config = stream.config;
743
13
            stream.config = request.new_config;
744
13
            let deposit_change = deposit_change.or(request.deposit_change);
745

            
746
            match (
747
13
                old_config.asset_id == stream.config.asset_id,
748
13
                deposit_change,
749
            ) {
750
                // Same asset and a change, we apply it like in `change_deposit` call.
751
1
                (true, Some(change)) => {
752
1
                    Self::apply_deposit_change(&mut stream, change)?;
753
                }
754
                // Same asset and no change, no problem.
755
6
                (true, None) => (),
756
                // Change in asset with absolute new amount
757
3
                (false, Some(DepositChange::Absolute(amount))) => {
758
                    // As target chooses the deposit amount in new currency, we must ensure
759
                    // it is greater than the soft minimum deposit.
760
3
                    ensure!(
761
3
                        amount >= stream.config.soft_minimum_deposit,
762
1
                        Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum
763
                    );
764

            
765
                    // Release deposit in old asset.
766
2
                    T::AssetsManager::decrease_deposit(
767
2
                        &old_config.asset_id,
768
2
                        &stream.source,
769
2
                        stream.deposit,
770
                    )?;
771

            
772
                    // Make deposit in new asset.
773
2
                    T::AssetsManager::increase_deposit(
774
2
                        &stream.config.asset_id,
775
2
                        &stream.source,
776
2
                        amount,
777
                    )?;
778
2
                    stream.deposit = amount;
779
                }
780
                // It doesn't make sense to change asset while not providing an absolute new
781
                // amount.
782
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
783
            }
784

            
785
            // If time unit changes we need to update `last_time_updated` to be in the
786
            // new unit.
787
9
            if old_config.time_unit != stream.config.time_unit {
788
2
                stream.last_time_updated = T::TimeProvider::now(&stream.config.time_unit)
789
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
790
7
            }
791

            
792
            // Event
793
9
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
794
9
                stream_id,
795
9
                old_config,
796
9
                new_config: stream.config.clone(),
797
9
                deposit_change,
798
9
            });
799

            
800
            // Update stream in storage.
801
9
            stream.pending_request = None;
802
9
            Streams::<T>::insert(stream_id, stream);
803

            
804
9
            Ok(().into())
805
        }
806

            
807
        #[pallet::call_index(5)]
808
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
809
        #[allow(clippy::useless_conversion)]
810
        pub fn cancel_change_request(
811
            origin: OriginFor<T>,
812
            stream_id: T::StreamId,
813
6
        ) -> DispatchResultWithPostInfo {
814
6
            let origin = ensure_signed(origin)?;
815
6
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
816

            
817
5
            let accepter = stream
818
5
                .account_to_party(origin)
819
5
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
820

            
821
4
            let Some(request) = stream.pending_request.take() else {
822
1
                return Err(Error::<T>::NoPendingRequest.into());
823
            };
824

            
825
3
            ensure!(
826
3
                accepter == request.requester,
827
2
                Error::<T>::CanOnlyCancelOwnRequest
828
            );
829

            
830
            // Update storage.
831
            // Pending request is removed by calling `.take()`.
832
1
            Streams::<T>::insert(stream_id, stream);
833

            
834
1
            Ok(().into())
835
        }
836

            
837
        /// Allows immediately changing the deposit for a stream, which is simpler than
838
        /// calling `request_change` with the proper parameters.
839
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
840
        /// the call is included in a block, in which case the unit is no longer the same and quantities
841
        /// will not have the same scale/value.
842
        #[pallet::call_index(6)]
843
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
844
        #[allow(clippy::useless_conversion)]
845
        pub fn immediately_change_deposit(
846
            origin: OriginFor<T>,
847
            stream_id: T::StreamId,
848
            asset_id: T::AssetId,
849
            change: DepositChange<T::Balance>,
850
13
        ) -> DispatchResultWithPostInfo {
851
13
            let origin = ensure_signed(origin)?;
852
13
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
853

            
854
12
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
855
10
            ensure!(
856
10
                stream.config.asset_id == asset_id,
857
1
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
858
            );
859

            
860
            // Perform pending payment before changing deposit.
861
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
862

            
863
            // Apply change.
864
9
            Self::apply_deposit_change(&mut stream, change)?;
865

            
866
            // Event
867
4
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
868
4
                stream_id,
869
4
                old_config: stream.config.clone(),
870
4
                new_config: stream.config.clone(),
871
4
                deposit_change: Some(change),
872
4
            });
873

            
874
            // Update stream in storage.
875
4
            Streams::<T>::insert(stream_id, stream);
876

            
877
4
            Ok(().into())
878
        }
879
    }
880

            
881
    impl<T: Config> Pallet<T> {
882
        /// Try to open a stream and returns its id.
883
        /// Prefers calling this function from other pallets instead of `open_stream` as the
884
        /// latter can't return the id.
885
84
        pub fn open_stream_returns_id(
886
84
            origin: AccountIdOf<T>,
887
84
            target: AccountIdOf<T>,
888
84
            config: StreamConfigOf<T>,
889
84
            initial_deposit: T::Balance,
890
84
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
891
84
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
892

            
893
83
            ensure!(
894
83
                initial_deposit >= config.soft_minimum_deposit,
895
1
                Error::<T>::CantCreateStreamWithDepositUnderSoftMinimum
896
            );
897

            
898
            // Generate a new stream id.
899
82
            let stream_id = NextStreamId::<T>::get();
900
82
            let next_stream_id = stream_id
901
82
                .checked_add(&One::one())
902
82
                .ok_or(Error::<T>::StreamIdOverflow)?;
903
81
            NextStreamId::<T>::set(next_stream_id);
904

            
905
            // Hold opening deposit for the storage used by Stream
906
81
            let opening_deposit = T::OpenStreamHoldAmount::get();
907
81
            if opening_deposit > 0u32.into() {
908
81
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
909
            }
910

            
911
            // Freeze initial deposit.
912
81
            T::AssetsManager::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
913

            
914
            // Create stream data.
915
78
            let now =
916
79
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
917
78
            let stream = Stream {
918
78
                source: origin.clone(),
919
78
                target: target.clone(),
920
78
                config,
921
78
                deposit: initial_deposit,
922
78
                last_time_updated: now,
923
78
                request_nonce: 0,
924
78
                pending_request: None,
925
78
                opening_deposit,
926
78
            };
927

            
928
            // Insert stream in storage.
929
78
            Streams::<T>::insert(stream_id, stream);
930
78
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
931
78
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
932

            
933
            // Emit event.
934
78
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
935

            
936
78
            Ok(stream_id)
937
84
        }
938

            
939
        /// Get the stream payment current status, telling how much payment is
940
        /// pending, how much deposit will be left and whenever the stream is stalled.
941
        /// The stream is considered stalled if no funds are left or if the provided
942
        /// time is past a mandatory request deadline. If the provided `now` is `None`
943
        /// then the current time will be fetched. Being able to provide a custom `now`
944
        /// allows to check the status in the future. It is invalid to provide a `now` that is
945
        /// before `last_time_updated`.
946
1
        pub fn stream_payment_status(
947
1
            stream_id: T::StreamId,
948
1
            now: Option<T::Balance>,
949
1
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
950
1
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
951
1
            let now = match now {
952
                Some(v) => v,
953
1
                None => T::TimeProvider::now(&stream.config.time_unit)
954
1
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
955
            };
956

            
957
1
            let last_time_updated = stream.last_time_updated;
958

            
959
1
            ensure!(
960
1
                now >= last_time_updated,
961
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
962
            );
963

            
964
1
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
965
1
        }
966

            
967
58
        fn stream_payment_status_by_ref(
968
58
            stream: &StreamOf<T>,
969
58
            last_time_updated: T::Balance,
970
58
            mut now: T::Balance,
971
58
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
972
58
            let mut stalled_by_deadline = false;
973

            
974
            // Take into account mandatory change request deadline. Note that
975
            // while it'll perform payment up to deadline,
976
            // `stream.last_time_updated` is still the "real now" to avoid
977
            // retroactive payment in case the deadline changes.
978
            if let Some(ChangeRequest {
979
4
                kind: ChangeKind::Mandatory { deadline },
980
                ..
981
16
            }) = &stream.pending_request
982
            {
983
4
                now = min(now, *deadline);
984

            
985
4
                if now == *deadline {
986
3
                    stalled_by_deadline = true;
987
3
                }
988
54
            }
989

            
990
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
991
58
            if stream.deposit.is_zero() {
992
                return Ok(StreamPaymentStatus {
993
                    payment: 0u32.into(),
994
                    deposit_left: stream.deposit,
995
                    stalled: true,
996
                });
997
58
            }
998

            
999
            // Dont perform payment if now is before or equal to `last_time_updated`.
            // It can be before due to the deadline adjustment.
58
            let Some(delta) = now.checked_sub(&last_time_updated) else {
2
                return Ok(StreamPaymentStatus {
2
                    payment: 0u32.into(),
2
                    deposit_left: stream.deposit,
2
                    stalled: true,
2
                });
            };
            // We compute the amount due to the target according to the rate, which may be
            // lowered if the stream deposit is lower.
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
            // considering it an error can make a stream un-updatable if too much time has passed
            // without updates.
56
            let mut payment = delta.saturating_mul(stream.config.rate);
            // We compute the new amount of locked funds. If it underflows it
            // means that there is more to pay that what is left, in which case
            // we pay all that is left.
56
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
54
                Some(v) if v.is_zero() => (v, true),
51
                Some(v) => (v, stalled_by_deadline),
                None => {
2
                    payment = stream.deposit;
2
                    (Zero::zero(), true)
                }
            };
56
            Ok(StreamPaymentStatus {
56
                payment,
56
                deposit_left,
56
                stalled,
56
            })
58
        }
        /// Behavior:
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
        /// last time the stream was updated. When updating the stream, **at most**
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
        /// account. If this amount is greater than the left deposit, the stream is considered
        /// drained **but not closed**. The source can come back later and refill the stream,
        /// however there will be no retroactive payment for the time spent as drained.
        /// If the stream payment is used to rent a service, the target should pause the service
        /// while the stream is drained, and resume it once it is refilled.
57
        fn perform_stream_payment(
57
            stream_id: T::StreamId,
57
            stream: &mut StreamOf<T>,
57
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
57
            let now = T::TimeProvider::now(&stream.config.time_unit)
57
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
            // We want to update `stream.last_time_updated` to `now` as soon
            // as possible to avoid forgetting to do it. We copy the old value
            // for payment computation.
57
            let last_time_updated = stream.last_time_updated;
57
            stream.last_time_updated = now;
            let StreamPaymentStatus {
57
                payment,
57
                deposit_left,
57
                stalled,
57
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
57
            if payment.is_zero() {
39
                return Ok(0u32.into());
18
            }
            // Transfer from the source to target.
18
            T::AssetsManager::transfer_deposit(
18
                &stream.config.asset_id,
18
                &stream.source,
18
                &stream.target,
18
                payment,
            )?;
            // Update stream info.
18
            stream.deposit = deposit_left;
            // Emit event.
18
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
18
                stream_id,
18
                source: stream.source.clone(),
18
                target: stream.target.clone(),
18
                amount: payment,
18
                stalled,
18
            });
18
            Ok(payment)
57
        }
19
        fn apply_deposit_change(
19
            stream: &mut StreamOf<T>,
19
            change: DepositChange<T::Balance>,
19
        ) -> DispatchResultWithPostInfo {
19
            match change {
8
                DepositChange::Absolute(amount) => {
8
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::AssetsManager::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
7
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
7
                        if amount < stream.config.soft_minimum_deposit {
2
                            return Err(
2
                                Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into()
2
                            );
5
                        }
5
                        T::AssetsManager::decrease_deposit(
5
                            &stream.config.asset_id,
5
                            &stream.source,
5
                            decrease,
                        )?;
                    }
5
                    stream.deposit = amount;
                }
5
                DepositChange::Increase(increase) => {
5
                    stream.deposit = stream
5
                        .deposit
5
                        .checked_add(&increase)
5
                        .ok_or(ArithmeticError::Overflow)?;
3
                    T::AssetsManager::increase_deposit(
3
                        &stream.config.asset_id,
3
                        &stream.source,
3
                        increase,
                    )?;
                }
6
                DepositChange::Decrease(decrease) => {
6
                    let new_deposit = stream
6
                        .deposit
6
                        .checked_sub(&decrease)
6
                        .ok_or(ArithmeticError::Underflow)?;
4
                    if new_deposit < stream.config.soft_minimum_deposit {
2
                        return Err(Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into());
2
                    }
2
                    stream.deposit = new_deposit;
2
                    T::AssetsManager::decrease_deposit(
2
                        &stream.config.asset_id,
2
                        &stream.source,
2
                        decrease,
                    )?;
                }
            }
10
            Ok(().into())
19
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
43
        fn maybe_immediate_change(
43
            stream_id: T::StreamId,
43
            stream: &mut StreamOf<T>,
43
            new_config: &StreamConfigOf<T>,
43
            deposit_change: Option<DepositChange<T::Balance>>,
43
            requester: Party,
43
        ) -> Result<bool, DispatchErrorWithPostInfo> {
43
            if new_config.time_unit != stream.config.time_unit
25
                || new_config.asset_id != stream.config.asset_id
            {
24
                return Ok(false);
19
            }
19
            if requester == Party::Source && new_config.rate < stream.config.rate {
4
                return Ok(false);
15
            }
15
            if requester == Party::Target && new_config.rate > stream.config.rate {
4
                return Ok(false);
11
            }
            // Perform pending payment before changing config.
11
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
11
            if let Some(change) = deposit_change {
9
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
7
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
7
                stream_id,
7
                old_config: stream.config.clone(),
7
                new_config: new_config.clone(),
7
                deposit_change,
7
            });
            // Update storage.
7
            stream.config = new_config.clone();
7
            Streams::<T>::insert(stream_id, stream);
7
            Ok(true)
43
        }
    }
}