1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
#[cfg(feature = "migrations")]
30
pub mod migrations;
31

            
32
pub mod weights;
33
pub use weights::WeightInfo;
34

            
35
use {
36
    core::cmp::min,
37
    frame_support::{
38
        dispatch::DispatchErrorWithPostInfo,
39
        pallet,
40
        pallet_prelude::*,
41
        storage::types::{StorageDoubleMap, StorageMap},
42
        traits::{
43
            fungible::{Inspect, MutateHold},
44
            tokens::{Balance, Precision},
45
        },
46
        Blake2_128Concat,
47
    },
48
    frame_system::pallet_prelude::*,
49
    parity_scale_codec::{DecodeWithMemTracking, FullCodec, MaxEncodedLen},
50
    scale_info::TypeInfo,
51
    serde::{Deserialize, Serialize},
52
    sp_runtime::{
53
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
54
        ArithmeticError,
55
    },
56
    sp_std::{fmt::Debug, marker::PhantomData},
57
};
58

            
59
pub use pallet::*;
60

            
61
/// Type able to provide the current time for given unit.
62
/// For each unit the returned number should monotonically increase and not
63
/// overflow.
64
pub trait TimeProvider<Unit, Number> {
65
    fn now(unit: &Unit) -> Option<Number>;
66

            
67
    /// Benchmarks: should return the time unit which has the worst performance calling
68
    /// `TimeProvider::now(unit)` with.
69
    #[cfg(feature = "runtime-benchmarks")]
70
    fn bench_worst_case_time_unit() -> Unit;
71

            
72
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
73
    #[cfg(feature = "runtime-benchmarks")]
74
    fn bench_set_now(instant: Number);
75
}
76

            
77
/// Interactions the pallet needs with assets.
78
pub trait AssetsManager<AccountId, AssetId, Balance> {
79
    /// Transfer assets deposited by an account to another account.
80
    /// Those assets should not be considered deposited in the target account.
81
    fn transfer_deposit(
82
        asset_id: &AssetId,
83
        from: &AccountId,
84
        to: &AccountId,
85
        amount: Balance,
86
    ) -> DispatchResult;
87

            
88
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
89
    /// enough of that asset. Funds should be safe and not slashable.
90
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
91
        -> DispatchResult;
92

            
93
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
94
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
95
        -> DispatchResult;
96

            
97
    /// Return the deposit for given asset and account.
98
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
99

            
100
    /// Benchmarks: should return the asset id which has the worst performance when interacting
101
    /// with it.
102
    #[cfg(feature = "runtime-benchmarks")]
103
    fn bench_worst_case_asset_id() -> AssetId;
104

            
105
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
106
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
107
    /// from one asset to another. If there is only one asset id it is fine to return it in both
108
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
109
    #[cfg(feature = "runtime-benchmarks")]
110
    fn bench_worst_case_asset_id2() -> AssetId;
111

            
112
    /// Benchmarks: should set the balance.
113
    #[cfg(feature = "runtime-benchmarks")]
114
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
115
}
116

            
117
40170
#[pallet]
118
pub mod pallet {
119
    use super::*;
120

            
121
    /// Pooled Staking pallet.
122
68755
    #[pallet::pallet]
123
    pub struct Pallet<T>(PhantomData<T>);
124

            
125
    #[pallet::config]
126
    pub trait Config: frame_system::Config {
127
        /// Overarching event type
128
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
129

            
130
        /// Type used to represent stream ids. Should be large enough to not overflow.
131
        type StreamId: AtLeast32BitUnsigned
132
            + Default
133
            + Debug
134
            + Copy
135
            + Clone
136
            + FullCodec
137
            + TypeInfo
138
            + MaxEncodedLen;
139

            
140
        /// The balance type, which is also the type representing time (as this
141
        /// pallet will do math with both time and balances to compute how
142
        /// much should be paid).
143
        type Balance: Balance;
144

            
145
        /// Type representing an asset id, a identifier allowing distinguishing assets.
146
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
147

            
148
        /// Provide interaction with assets.
149
        type AssetsManager: AssetsManager<Self::AccountId, Self::AssetId, Self::Balance>;
150

            
151
        /// Currency for the opening balance hold for the storage used by the Stream.
152
        /// NOT to be confused with Assets.
153
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
154
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
155

            
156
        type RuntimeHoldReason: From<HoldReason>;
157

            
158
        #[pallet::constant]
159
        type OpenStreamHoldAmount: Get<Self::Balance>;
160

            
161
        /// Represents which units of time can be used. Designed to be an enum
162
        /// with a variant for each kind of time source/scale supported.
163
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
164

            
165
        /// Provide the current time in given unit.
166
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
167

            
168
        type WeightInfo: weights::WeightInfo;
169
    }
170

            
171
    pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
172
    pub type AssetIdOf<T> = <T as Config>::AssetId;
173

            
174
    pub type RequestNonce = u32;
175

            
176
    /// A stream payment from source to target.
177
    /// Stores the last time the stream was updated, which allows to compute
178
    /// elapsed time and perform payment.
179
    #[derive(
180
        RuntimeDebug,
181
        PartialEq,
182
        Eq,
183
        Encode,
184
        Decode,
185
        Clone,
186
4944
        TypeInfo,
187
        Serialize,
188
        Deserialize,
189
        MaxEncodedLen,
190
    )]
191
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
192
        /// Payer, source of the stream.
193
        pub source: AccountId,
194
        /// Payee, target of the stream.
195
        pub target: AccountId,
196
        /// Stream config
197
        pub config: StreamConfig<Unit, AssetId, Balance>,
198
        /// How much is deposited to fund this stream.
199
        pub deposit: Balance,
200
        /// Last time the stream was updated in `config.time_unit`.
201
        pub last_time_updated: Balance,
202
        /// Nonce for requests. This prevents a request to make a first request
203
        /// then change it to another request to frontrun the other party
204
        /// accepting.
205
        pub request_nonce: RequestNonce,
206
        /// A pending change request if any.
207
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
208
        /// One-time opening deposit. Will be released on close.
209
        pub opening_deposit: Balance,
210
    }
211

            
212
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
213
95
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
214
95
            match account {
215
95
                a if a == self.source => Some(Party::Source),
216
39
                a if a == self.target => Some(Party::Target),
217
3
                _ => None,
218
            }
219
95
        }
220
    }
221

            
222
    /// Stream configuration.
223
    #[derive(
224
        RuntimeDebug,
225
        PartialEq,
226
        Eq,
227
        Encode,
228
        Decode,
229
        Copy,
230
        Clone,
231
3090
        TypeInfo,
232
        Serialize,
233
        Deserialize,
234
        MaxEncodedLen,
235
        DecodeWithMemTracking,
236
    )]
237
    pub struct StreamConfig<Unit, AssetId, BalanceOrDuration> {
238
        /// Unit in which time is measured using a `TimeProvider`.
239
        pub time_unit: Unit,
240
        /// Asset used for payment.
241
        pub asset_id: AssetId,
242
        /// Amount of asset / unit.
243
        pub rate: BalanceOrDuration,
244
        /// Minimum amount of time that can be used for mandatory change requests.
245
        pub minimum_request_deadline_delay: BalanceOrDuration,
246
        /// Minimal amount the source must deposit in the stream. Deposit can go
247
        /// lower due to time passing, but deposit cannot be **decreased** lower
248
        /// explicitly by the source. If this is non-zero, the stream can only
249
        /// be closed by the target or if the deposit is zero. If deposit is lower
250
        /// than minimum due to time passing, source is allowed to **increase** the
251
        /// deposit to a value lower than the soft minimum.
252
        ///
253
        /// This system guarantees to the target that the source cannot instantly
254
        /// stop paying, and may prepare for termination of service if the deposit
255
        /// is below this minimum.
256
        pub soft_minimum_deposit: BalanceOrDuration,
257
    }
258

            
259
    /// Origin of a change request.
260
    #[derive(
261
        RuntimeDebug,
262
        PartialEq,
263
        Eq,
264
        Encode,
265
        Decode,
266
        DecodeWithMemTracking,
267
        Copy,
268
        Clone,
269
2472
        TypeInfo,
270
        Serialize,
271
        Deserialize,
272
        MaxEncodedLen,
273
    )]
274
    pub enum Party {
275
53
        Source,
276
25
        Target,
277
    }
278

            
279
    impl Party {
280
4
        pub fn inverse(self) -> Self {
281
4
            match self {
282
2
                Party::Source => Party::Target,
283
2
                Party::Target => Party::Source,
284
            }
285
4
        }
286
    }
287

            
288
    /// Kind of change requested.
289
    #[derive(
290
        RuntimeDebug,
291
        PartialEq,
292
        Eq,
293
        Encode,
294
        Decode,
295
        Copy,
296
        Clone,
297
1236
        TypeInfo,
298
        Serialize,
299
        Deserialize,
300
        MaxEncodedLen,
301
        DecodeWithMemTracking,
302
    )]
303
    pub enum ChangeKind<Time> {
304
22
        /// The requested change is a suggestion, and the other party doesn't
305
        /// need to accept it.
306
        Suggestion,
307
        /// The requested change is mandatory, and the other party must either
308
        /// accept the change or close the stream. Reaching the deadline will
309
        /// close the stream too.
310
        Mandatory { deadline: Time },
311
    }
312

            
313
    /// Describe how the deposit should change.
314
    #[derive(
315
        RuntimeDebug,
316
        PartialEq,
317
        Eq,
318
        Encode,
319
        Decode,
320
        Copy,
321
        Clone,
322
1854
        TypeInfo,
323
        Serialize,
324
        Deserialize,
325
        MaxEncodedLen,
326
        DecodeWithMemTracking,
327
    )]
328
    pub enum DepositChange<Balance> {
329
        /// Increase deposit by given amount.
330
        Increase(Balance),
331
        /// Decrease deposit by given amount.
332
        Decrease(Balance),
333
        /// Set deposit to given amount.
334
        Absolute(Balance),
335
    }
336

            
337
    /// A request to change a stream config.
338
    #[derive(
339
        RuntimeDebug,
340
        PartialEq,
341
        Eq,
342
        Encode,
343
        Decode,
344
        Clone,
345
2472
        TypeInfo,
346
        Serialize,
347
        Deserialize,
348
        MaxEncodedLen,
349
    )]
350
    pub struct ChangeRequest<Unit, AssetId, Balance> {
351
        pub requester: Party,
352
        pub kind: ChangeKind<Balance>,
353
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
354
        pub deposit_change: Option<DepositChange<Balance>>,
355
    }
356

            
357
    pub type StreamOf<T> =
358
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
359

            
360
    pub type StreamConfigOf<T> =
361
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
362

            
363
    pub type ChangeRequestOf<T> =
364
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
365

            
366
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
367
    pub struct StreamPaymentStatus<Balance> {
368
        pub payment: Balance,
369
        pub deposit_left: Balance,
370
        /// Whenever the stream is stalled, which can occur either when no funds are left or
371
        /// if the time is past a mandatory request deadline.
372
        pub stalled: bool,
373
    }
374

            
375
    /// Store the next available stream id.
376
994
    #[pallet::storage]
377
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
378

            
379
    /// Store each stream indexed by an Id.
380
1092
    #[pallet::storage]
381
    pub type Streams<T: Config> = StorageMap<
382
        Hasher = Blake2_128Concat,
383
        Key = T::StreamId,
384
        Value = StreamOf<T>,
385
        QueryKind = OptionQuery,
386
    >;
387

            
388
    /// Lookup for all streams with given source.
389
    /// To avoid maintaining a growing list of stream ids, they are stored in
390
    /// the form of an entry (AccountId, StreamId). If such entry exists then
391
    /// this AccountId is a source in StreamId. One can iterate over all storage
392
    /// keys starting with the AccountId to find all StreamIds.
393
734
    #[pallet::storage]
394
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
395
        Key1 = AccountIdOf<T>,
396
        Hasher1 = Blake2_128Concat,
397
        Key2 = T::StreamId,
398
        Hasher2 = Blake2_128Concat,
399
        Value = (),
400
        QueryKind = OptionQuery,
401
    >;
402

            
403
    /// Lookup for all streams with given target.
404
    /// To avoid maintaining a growing list of stream ids, they are stored in
405
    /// the form of an entry (AccountId, StreamId). If such entry exists then
406
    /// this AccountId is a target in StreamId. One can iterate over all storage
407
    /// keys starting with the AccountId to find all StreamIds.
408
734
    #[pallet::storage]
409
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
410
        Key1 = AccountIdOf<T>,
411
        Hasher1 = Blake2_128Concat,
412
        Key2 = T::StreamId,
413
        Hasher2 = Blake2_128Concat,
414
        Value = (),
415
        QueryKind = OptionQuery,
416
    >;
417

            
418
618
    #[pallet::error]
419
    #[derive(Clone, PartialEq, Eq)]
420
    pub enum Error<T> {
421
        UnknownStreamId,
422
        StreamIdOverflow,
423
        UnauthorizedOrigin,
424
        CantBeBothSourceAndTarget,
425
        CantFetchCurrentTime,
426
        SourceCantDecreaseRate,
427
        TargetCantIncreaseRate,
428
        CantOverrideMandatoryChange,
429
        NoPendingRequest,
430
        CantAcceptOwnRequest,
431
        CanOnlyCancelOwnRequest,
432
        WrongRequestNonce,
433
        ChangingAssetRequiresAbsoluteDepositChange,
434
        TargetCantChangeDeposit,
435
        ImmediateDepositChangeRequiresSameAssetId,
436
        DeadlineCantBeInPast,
437
        CantFetchStatusBeforeLastTimeUpdated,
438
        DeadlineDelayIsBelowMinium,
439
        CantDecreaseDepositUnderSoftDepositMinimum,
440
        SourceCantCloseActiveStreamWithSoftDepositMinimum,
441
        CantCreateStreamWithDepositUnderSoftMinimum,
442
    }
443

            
444
618
    #[pallet::event]
445
236
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
446
    pub enum Event<T: Config> {
447
        StreamOpened {
448
            stream_id: T::StreamId,
449
        },
450
        StreamClosed {
451
            stream_id: T::StreamId,
452
            refunded: T::Balance,
453
        },
454
        StreamPayment {
455
            stream_id: T::StreamId,
456
            source: AccountIdOf<T>,
457
            target: AccountIdOf<T>,
458
            amount: T::Balance,
459
            stalled: bool,
460
        },
461
        StreamConfigChangeRequested {
462
            stream_id: T::StreamId,
463
            request_nonce: RequestNonce,
464
            requester: Party,
465
            old_config: StreamConfigOf<T>,
466
            new_config: StreamConfigOf<T>,
467
        },
468
        StreamConfigChanged {
469
            stream_id: T::StreamId,
470
            old_config: StreamConfigOf<T>,
471
            new_config: StreamConfigOf<T>,
472
            deposit_change: Option<DepositChange<T::Balance>>,
473
        },
474
    }
475

            
476
    /// Freeze reason to use if needed.
477
    #[pallet::composite_enum]
478
    pub enum FreezeReason {
479
        StreamPayment,
480
    }
481

            
482
    /// Hold reason to use if needed.
483
    #[pallet::composite_enum]
484
    pub enum HoldReason {
485
248
        StreamPayment,
486
562
        StreamOpened,
487
    }
488

            
489
618
    #[pallet::call]
490
    impl<T: Config> Pallet<T> {
491
        /// Create a payment stream from the origin to the target with provided config
492
        /// and initial deposit (in the asset defined in the config).
493
        #[pallet::call_index(0)]
494
        #[pallet::weight(T::WeightInfo::open_stream())]
495
        #[allow(clippy::useless_conversion)]
496
        pub fn open_stream(
497
            origin: OriginFor<T>,
498
            target: AccountIdOf<T>,
499
            config: StreamConfigOf<T>,
500
            initial_deposit: T::Balance,
501
92
        ) -> DispatchResultWithPostInfo {
502
92
            let origin = ensure_signed(origin)?;
503

            
504
92
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
505

            
506
86
            Ok(().into())
507
        }
508

            
509
        /// Close a given stream in which the origin is involved. It performs the pending payment
510
        /// before closing the stream.
511
        #[pallet::call_index(1)]
512
        #[pallet::weight(T::WeightInfo::close_stream())]
513
        #[allow(clippy::useless_conversion)]
514
        pub fn close_stream(
515
            origin: OriginFor<T>,
516
            stream_id: T::StreamId,
517
24
        ) -> DispatchResultWithPostInfo {
518
24
            let origin = ensure_signed(origin)?;
519
24
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
520

            
521
            // Only source or target can close a stream.
522
23
            ensure!(
523
23
                origin == stream.source || origin == stream.target,
524
1
                Error::<T>::UnauthorizedOrigin
525
            );
526

            
527
            // Update stream before closing it to ensure fair payment.
528
22
            Self::perform_stream_payment(stream_id, &mut stream)?;
529

            
530
            // If there is a soft minimum deposit, stream can be closed only by target or if deposit is empty.
531
22
            ensure!(
532
22
                stream.config.soft_minimum_deposit.is_zero()
533
3
                    || stream.deposit.is_zero()
534
2
                    || origin == stream.target,
535
1
                Error::<T>::SourceCantCloseActiveStreamWithSoftDepositMinimum
536
            );
537

            
538
            // Unfreeze funds left in the stream.
539
21
            T::AssetsManager::decrease_deposit(
540
21
                &stream.config.asset_id,
541
21
                &stream.source,
542
21
                stream.deposit,
543
21
            )?;
544

            
545
            // Release opening deposit
546
21
            if stream.opening_deposit > 0u32.into() {
547
21
                T::Currency::release(
548
21
                    &HoldReason::StreamOpened.into(),
549
21
                    &stream.source,
550
21
                    stream.opening_deposit,
551
21
                    Precision::Exact,
552
21
                )?;
553
            }
554

            
555
            // Remove stream from storage.
556
21
            Streams::<T>::remove(stream_id);
557
21
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
558
21
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
559
21

            
560
21
            // Emit event.
561
21
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
562
21
                stream_id,
563
21
                // TODO: Should `refunded` in event really include the opening_deposit?
564
21
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
565
21
            });
566
21

            
567
21
            Ok(().into())
568
        }
569

            
570
        /// Perform the pending payment of a stream. Anyone can call this.
571
        #[pallet::call_index(2)]
572
        #[pallet::weight(T::WeightInfo::perform_payment())]
573
        #[allow(clippy::useless_conversion)]
574
        pub fn perform_payment(
575
            origin: OriginFor<T>,
576
            stream_id: T::StreamId,
577
27
        ) -> DispatchResultWithPostInfo {
578
27
            // No problem with anyone updating any stream.
579
27
            let _ = ensure_signed(origin)?;
580

            
581
27
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
582
26
            Self::perform_stream_payment(stream_id, &mut stream)?;
583
26
            Streams::<T>::insert(stream_id, stream);
584
26

            
585
26
            Ok(().into())
586
        }
587

            
588
        /// Requests a change to a stream config or deposit.
589
        ///
590
        /// If the new config don't change the time unit and asset id, the change will be applied
591
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
592
        /// in the stream and will have to be approved by the other party.
593
        ///
594
        /// This call accepts a deposit change, which can only be provided by the source of the
595
        /// stream. An absolute change is required when changing asset id, as the current deposit
596
        /// will be released and a new deposit is required in the new asset.
597
        #[pallet::call_index(3)]
598
        #[pallet::weight(
599
            T::WeightInfo::request_change_immediate()
600
            .max(T::WeightInfo::request_change_delayed())
601
        )]
602
        #[allow(clippy::useless_conversion)]
603
        pub fn request_change(
604
            origin: OriginFor<T>,
605
            stream_id: T::StreamId,
606
            kind: ChangeKind<T::Balance>,
607
            new_config: StreamConfigOf<T>,
608
            deposit_change: Option<DepositChange<T::Balance>>,
609
61
        ) -> DispatchResultWithPostInfo {
610
61
            let origin = ensure_signed(origin)?;
611
61
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
612

            
613
60
            let requester = stream
614
60
                .account_to_party(origin)
615
60
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
616

            
617
59
            ensure!(
618
59
                requester == Party::Source || deposit_change.is_none(),
619
1
                Error::<T>::TargetCantChangeDeposit
620
            );
621

            
622
58
            if stream.config == new_config && deposit_change.is_none() {
623
1
                return Ok(().into());
624
57
            }
625

            
626
57
            if let ChangeKind::Mandatory { deadline } = kind {
627
20
                let now = T::TimeProvider::now(&stream.config.time_unit)
628
20
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
629

            
630
20
                let Some(diff) = deadline.checked_sub(&now) else {
631
1
                    return Err(Error::<T>::DeadlineCantBeInPast.into());
632
                };
633

            
634
19
                ensure!(
635
19
                    diff >= stream.config.minimum_request_deadline_delay,
636
1
                    Error::<T>::DeadlineDelayIsBelowMinium
637
                );
638
37
            }
639

            
640
            // If asset id and time unit are the same, we allow to make the change
641
            // immediatly if the origin is at a disadvantage.
642
            // We allow this even if there is already a pending request.
643
            // This checks that the deposit can't be **decreased** under
644
            // config.soft_minimum_deposit.
645
55
            if Self::maybe_immediate_change(
646
55
                stream_id,
647
55
                &mut stream,
648
55
                &new_config,
649
55
                deposit_change,
650
55
                requester,
651
55
            )? {
652
7
                return Ok(().into());
653
44
            }
654
44

            
655
44
            // If the source is requesting a change of asset, they must provide an absolute change.
656
44
            if requester == Party::Source
657
32
                && new_config.asset_id != stream.config.asset_id
658
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
659
            {
660
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
661
41
            }
662

            
663
            // If there is already a mandatory change request, only the origin
664
            // of this request can change it.
665
            if let Some(ChangeRequest {
666
                kind: ChangeKind::Mandatory { .. },
667
3
                requester: pending_requester,
668
                ..
669
5
            }) = &stream.pending_request
670
            {
671
3
                ensure!(
672
3
                    &requester == pending_requester,
673
2
                    Error::<T>::CantOverrideMandatoryChange
674
                );
675
38
            }
676

            
677
39
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
678
39
            stream.pending_request = Some(ChangeRequest {
679
39
                requester,
680
39
                kind,
681
39
                new_config: new_config.clone(),
682
39
                deposit_change,
683
39
            });
684
39

            
685
39
            // Emit event.
686
39
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
687
39
                stream_id,
688
39
                request_nonce: stream.request_nonce,
689
39
                requester,
690
39
                old_config: stream.config.clone(),
691
39
                new_config,
692
39
            });
693
39

            
694
39
            // Update storage.
695
39
            Streams::<T>::insert(stream_id, stream);
696
39

            
697
39
            Ok(().into())
698
        }
699

            
700
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
701
        /// frontrunning attacks. If the target made a request, the source is able to change their
702
        /// deposit.
703
        #[pallet::call_index(4)]
704
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
705
        #[allow(clippy::useless_conversion)]
706
        pub fn accept_requested_change(
707
            origin: OriginFor<T>,
708
            stream_id: T::StreamId,
709
            request_nonce: RequestNonce,
710
            deposit_change: Option<DepositChange<T::Balance>>,
711
31
        ) -> DispatchResultWithPostInfo {
712
31
            let origin = ensure_signed(origin)?;
713
31
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
714

            
715
30
            let accepter = stream
716
30
                .account_to_party(origin)
717
30
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
718

            
719
29
            let Some(request) = stream.pending_request.clone() else {
720
1
                return Err(Error::<T>::NoPendingRequest.into());
721
            };
722

            
723
28
            ensure!(
724
28
                request_nonce == stream.request_nonce,
725
1
                Error::<T>::WrongRequestNonce
726
            );
727
27
            ensure!(
728
27
                accepter != request.requester,
729
1
                Error::<T>::CantAcceptOwnRequest
730
            );
731

            
732
26
            ensure!(
733
26
                accepter == Party::Source || deposit_change.is_none(),
734
1
                Error::<T>::TargetCantChangeDeposit
735
            );
736

            
737
            // Perform pending payment before changing config.
738
25
            Self::perform_stream_payment(stream_id, &mut stream)?;
739

            
740
            // Apply change.
741
            // It is safe to override config now as we have already performed the payment.
742
            // Checks made in apply_deposit_change needs to be done with new config.
743
25
            let old_config = stream.config;
744
25
            stream.config = request.new_config;
745
25
            let deposit_change = deposit_change.or(request.deposit_change);
746
25

            
747
25
            match (
748
25
                old_config.asset_id == stream.config.asset_id,
749
25
                deposit_change,
750
            ) {
751
                // Same asset and a change, we apply it like in `change_deposit` call.
752
13
                (true, Some(change)) => {
753
13
                    Self::apply_deposit_change(&mut stream, change)?;
754
                }
755
                // Same asset and no change, no problem.
756
6
                (true, None) => (),
757
                // Change in asset with absolute new amount
758
3
                (false, Some(DepositChange::Absolute(amount))) => {
759
3
                    // As target chooses the deposit amount in new currency, we must ensure
760
3
                    // it is greater than the soft minimum deposit.
761
3
                    ensure!(
762
3
                        amount >= stream.config.soft_minimum_deposit,
763
1
                        Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum
764
                    );
765

            
766
                    // Release deposit in old asset.
767
2
                    T::AssetsManager::decrease_deposit(
768
2
                        &old_config.asset_id,
769
2
                        &stream.source,
770
2
                        stream.deposit,
771
2
                    )?;
772

            
773
                    // Make deposit in new asset.
774
2
                    T::AssetsManager::increase_deposit(
775
2
                        &stream.config.asset_id,
776
2
                        &stream.source,
777
2
                        amount,
778
2
                    )?;
779
2
                    stream.deposit = amount;
780
                }
781
                // It doesn't make sense to change asset while not providing an absolute new
782
                // amount.
783
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
784
            }
785

            
786
            // If time unit changes we need to update `last_time_updated` to be in the
787
            // new unit.
788
21
            if old_config.time_unit != stream.config.time_unit {
789
2
                stream.last_time_updated = T::TimeProvider::now(&stream.config.time_unit)
790
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
791
19
            }
792

            
793
            // Event
794
21
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
795
21
                stream_id,
796
21
                old_config,
797
21
                new_config: stream.config.clone(),
798
21
                deposit_change,
799
21
            });
800
21

            
801
21
            // Update stream in storage.
802
21
            stream.pending_request = None;
803
21
            Streams::<T>::insert(stream_id, stream);
804
21

            
805
21
            Ok(().into())
806
        }
807

            
808
        #[pallet::call_index(5)]
809
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
810
        #[allow(clippy::useless_conversion)]
811
        pub fn cancel_change_request(
812
            origin: OriginFor<T>,
813
            stream_id: T::StreamId,
814
6
        ) -> DispatchResultWithPostInfo {
815
6
            let origin = ensure_signed(origin)?;
816
6
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
817

            
818
5
            let accepter = stream
819
5
                .account_to_party(origin)
820
5
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
821

            
822
4
            let Some(request) = stream.pending_request.take() else {
823
1
                return Err(Error::<T>::NoPendingRequest.into());
824
            };
825

            
826
3
            ensure!(
827
3
                accepter == request.requester,
828
2
                Error::<T>::CanOnlyCancelOwnRequest
829
            );
830

            
831
            // Update storage.
832
            // Pending request is removed by calling `.take()`.
833
1
            Streams::<T>::insert(stream_id, stream);
834
1

            
835
1
            Ok(().into())
836
        }
837

            
838
        /// Allows immediately changing the deposit for a stream, which is simpler than
839
        /// calling `request_change` with the proper parameters.
840
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
841
        /// the call is included in a block, in which case the unit is no longer the same and quantities
842
        /// will not have the same scale/value.
843
        #[pallet::call_index(6)]
844
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
845
        #[allow(clippy::useless_conversion)]
846
        pub fn immediately_change_deposit(
847
            origin: OriginFor<T>,
848
            stream_id: T::StreamId,
849
            asset_id: T::AssetId,
850
            change: DepositChange<T::Balance>,
851
13
        ) -> DispatchResultWithPostInfo {
852
13
            let origin = ensure_signed(origin)?;
853
13
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
854

            
855
12
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
856
10
            ensure!(
857
10
                stream.config.asset_id == asset_id,
858
1
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
859
            );
860

            
861
            // Perform pending payment before changing deposit.
862
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
863

            
864
            // Apply change.
865
9
            Self::apply_deposit_change(&mut stream, change)?;
866

            
867
            // Event
868
4
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
869
4
                stream_id,
870
4
                old_config: stream.config.clone(),
871
4
                new_config: stream.config.clone(),
872
4
                deposit_change: Some(change),
873
4
            });
874
4

            
875
4
            // Update stream in storage.
876
4
            Streams::<T>::insert(stream_id, stream);
877
4

            
878
4
            Ok(().into())
879
        }
880
    }
881

            
882
    impl<T: Config> Pallet<T> {
883
        /// Try to open a stream and returns its id.
884
        /// Prefers calling this function from other pallets instead of `open_stream` as the
885
        /// latter can't return the id.
886
96
        pub fn open_stream_returns_id(
887
96
            origin: AccountIdOf<T>,
888
96
            target: AccountIdOf<T>,
889
96
            config: StreamConfigOf<T>,
890
96
            initial_deposit: T::Balance,
891
96
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
892
96
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
893

            
894
95
            ensure!(
895
95
                initial_deposit >= config.soft_minimum_deposit,
896
1
                Error::<T>::CantCreateStreamWithDepositUnderSoftMinimum
897
            );
898

            
899
            // Generate a new stream id.
900
94
            let stream_id = NextStreamId::<T>::get();
901
94
            let next_stream_id = stream_id
902
94
                .checked_add(&One::one())
903
94
                .ok_or(Error::<T>::StreamIdOverflow)?;
904
93
            NextStreamId::<T>::set(next_stream_id);
905
93

            
906
93
            // Hold opening deposit for the storage used by Stream
907
93
            let opening_deposit = T::OpenStreamHoldAmount::get();
908
93
            if opening_deposit > 0u32.into() {
909
93
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
910
            }
911

            
912
            // Freeze initial deposit.
913
93
            T::AssetsManager::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
914

            
915
            // Create stream data.
916
90
            let now =
917
91
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
918
90
            let stream = Stream {
919
90
                source: origin.clone(),
920
90
                target: target.clone(),
921
90
                config,
922
90
                deposit: initial_deposit,
923
90
                last_time_updated: now,
924
90
                request_nonce: 0,
925
90
                pending_request: None,
926
90
                opening_deposit,
927
90
            };
928
90

            
929
90
            // Insert stream in storage.
930
90
            Streams::<T>::insert(stream_id, stream);
931
90
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
932
90
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
933
90

            
934
90
            // Emit event.
935
90
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
936
90

            
937
90
            Ok(stream_id)
938
96
        }
939

            
940
        /// Get the stream payment current status, telling how much payment is
941
        /// pending, how much deposit will be left and whenever the stream is stalled.
942
        /// The stream is considered stalled if no funds are left or if the provided
943
        /// time is past a mandatory request deadline. If the provided `now` is `None`
944
        /// then the current time will be fetched. Being able to provide a custom `now`
945
        /// allows to check the status in the future. It is invalid to provide a `now` that is
946
        /// before `last_time_updated`.
947
55
        pub fn stream_payment_status(
948
55
            stream_id: T::StreamId,
949
55
            now: Option<T::Balance>,
950
55
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
951
55
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
952
43
            let now = match now {
953
                Some(v) => v,
954
43
                None => T::TimeProvider::now(&stream.config.time_unit)
955
43
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
956
            };
957

            
958
43
            let last_time_updated = stream.last_time_updated;
959
43

            
960
43
            ensure!(
961
43
                now >= last_time_updated,
962
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
963
            );
964

            
965
43
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
966
55
        }
967

            
968
136
        fn stream_payment_status_by_ref(
969
136
            stream: &StreamOf<T>,
970
136
            last_time_updated: T::Balance,
971
136
            mut now: T::Balance,
972
136
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
973
136
            let mut stalled_by_deadline = false;
974

            
975
            // Take into account mandatory change request deadline. Note that
976
            // while it'll perform payment up to deadline,
977
            // `stream.last_time_updated` is still the "real now" to avoid
978
            // retroactive payment in case the deadline changes.
979
            if let Some(ChangeRequest {
980
34
                kind: ChangeKind::Mandatory { deadline },
981
                ..
982
46
            }) = &stream.pending_request
983
            {
984
34
                now = min(now, *deadline);
985
34

            
986
34
                if now == *deadline {
987
15
                    stalled_by_deadline = true;
988
21
                }
989
102
            }
990

            
991
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
992
136
            if stream.deposit.is_zero() {
993
                return Ok(StreamPaymentStatus {
994
                    payment: 0u32.into(),
995
                    deposit_left: stream.deposit,
996
                    stalled: true,
997
                });
998
136
            }
999

            
            // Dont perform payment if now is before or equal to `last_time_updated`.
            // It can be before due to the deadline adjustment.
136
            let Some(delta) = now.checked_sub(&last_time_updated) else {
2
                return Ok(StreamPaymentStatus {
2
                    payment: 0u32.into(),
2
                    deposit_left: stream.deposit,
2
                    stalled: true,
2
                });
            };
            // We compute the amount due to the target according to the rate, which may be
            // lowered if the stream deposit is lower.
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
            // considering it an error can make a stream un-updatable if too much time has passed
            // without updates.
134
            let mut payment = delta.saturating_mul(stream.config.rate);
            // We compute the new amount of locked funds. If it underflows it
            // means that there is more to pay that what is left, in which case
            // we pay all that is left.
134
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
132
                Some(v) if v.is_zero() => (v, true),
129
                Some(v) => (v, stalled_by_deadline),
                None => {
2
                    payment = stream.deposit;
2
                    (Zero::zero(), true)
                }
            };
134
            Ok(StreamPaymentStatus {
134
                payment,
134
                deposit_left,
134
                stalled,
134
            })
136
        }
        /// Behavior:
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
        /// last time the stream was updated. When updating the stream, **at most**
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
        /// account. If this amount is greater than the left deposit, the stream is considered
        /// drained **but not closed**. The source can come back later and refill the stream,
        /// however there will be no retroactive payment for the time spent as drained.
        /// If the stream payment is used to rent a service, the target should pause the service
        /// while the stream is drained, and resume it once it is refilled.
93
        fn perform_stream_payment(
93
            stream_id: T::StreamId,
93
            stream: &mut StreamOf<T>,
93
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
93
            let now = T::TimeProvider::now(&stream.config.time_unit)
93
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
            // We want to update `stream.last_time_updated` to `now` as soon
            // as possible to avoid forgetting to do it. We copy the old value
            // for payment computation.
93
            let last_time_updated = stream.last_time_updated;
93
            stream.last_time_updated = now;
            let StreamPaymentStatus {
93
                payment,
93
                deposit_left,
93
                stalled,
93
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
93
            if payment.is_zero() {
39
                return Ok(0u32.into());
54
            }
54

            
54
            // Transfer from the source to target.
54
            T::AssetsManager::transfer_deposit(
54
                &stream.config.asset_id,
54
                &stream.source,
54
                &stream.target,
54
                payment,
54
            )?;
            // Update stream info.
54
            stream.deposit = deposit_left;
54

            
54
            // Emit event.
54
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
54
                stream_id,
54
                source: stream.source.clone(),
54
                target: stream.target.clone(),
54
                amount: payment,
54
                stalled,
54
            });
54

            
54
            Ok(payment)
93
        }
31
        fn apply_deposit_change(
31
            stream: &mut StreamOf<T>,
31
            change: DepositChange<T::Balance>,
31
        ) -> DispatchResultWithPostInfo {
31
            match change {
8
                DepositChange::Absolute(amount) => {
8
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::AssetsManager::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
7
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
7
                        if amount < stream.config.soft_minimum_deposit {
2
                            return Err(
2
                                Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into()
2
                            );
5
                        }
5

            
5
                        T::AssetsManager::decrease_deposit(
5
                            &stream.config.asset_id,
5
                            &stream.source,
5
                            decrease,
5
                        )?;
                    }
5
                    stream.deposit = amount;
                }
17
                DepositChange::Increase(increase) => {
17
                    stream.deposit = stream
17
                        .deposit
17
                        .checked_add(&increase)
17
                        .ok_or(ArithmeticError::Overflow)?;
15
                    T::AssetsManager::increase_deposit(
15
                        &stream.config.asset_id,
15
                        &stream.source,
15
                        increase,
15
                    )?;
                }
6
                DepositChange::Decrease(decrease) => {
6
                    let new_deposit = stream
6
                        .deposit
6
                        .checked_sub(&decrease)
6
                        .ok_or(ArithmeticError::Underflow)?;
4
                    if new_deposit < stream.config.soft_minimum_deposit {
2
                        return Err(Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into());
2
                    }
2

            
2
                    stream.deposit = new_deposit;
2
                    T::AssetsManager::decrease_deposit(
2
                        &stream.config.asset_id,
2
                        &stream.source,
2
                        decrease,
2
                    )?;
                }
            }
22
            Ok(().into())
31
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
55
        fn maybe_immediate_change(
55
            stream_id: T::StreamId,
55
            stream: &mut StreamOf<T>,
55
            new_config: &StreamConfigOf<T>,
55
            deposit_change: Option<DepositChange<T::Balance>>,
55
            requester: Party,
55
        ) -> Result<bool, DispatchErrorWithPostInfo> {
55
            if new_config.time_unit != stream.config.time_unit
37
                || new_config.asset_id != stream.config.asset_id
            {
24
                return Ok(false);
31
            }
31

            
31
            if requester == Party::Source && new_config.rate < stream.config.rate {
16
                return Ok(false);
15
            }
15

            
15
            if requester == Party::Target && new_config.rate > stream.config.rate {
4
                return Ok(false);
11
            }
11

            
11
            // Perform pending payment before changing config.
11
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
11
            if let Some(change) = deposit_change {
9
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
7
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
7
                stream_id,
7
                old_config: stream.config.clone(),
7
                new_config: new_config.clone(),
7
                deposit_change,
7
            });
7

            
7
            // Update storage.
7
            stream.config = new_config.clone();
7
            Streams::<T>::insert(stream_id, stream);
7

            
7
            Ok(true)
55
        }
    }
}