1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
#[cfg(feature = "migrations")]
30
pub mod migrations;
31

            
32
pub mod weights;
33
pub use weights::WeightInfo;
34

            
35
use {
36
    core::cmp::min,
37
    frame_support::{
38
        dispatch::DispatchErrorWithPostInfo,
39
        pallet,
40
        pallet_prelude::*,
41
        storage::types::{StorageDoubleMap, StorageMap},
42
        traits::{
43
            fungible::{Inspect, MutateHold},
44
            tokens::{Balance, Precision},
45
        },
46
        Blake2_128Concat,
47
    },
48
    frame_system::pallet_prelude::*,
49
    parity_scale_codec::{FullCodec, MaxEncodedLen},
50
    scale_info::TypeInfo,
51
    serde::{Deserialize, Serialize},
52
    sp_runtime::{
53
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
54
        ArithmeticError,
55
    },
56
    sp_std::{fmt::Debug, marker::PhantomData},
57
};
58

            
59
pub use pallet::*;
60

            
61
/// Type able to provide the current time for given unit.
62
/// For each unit the returned number should monotonically increase and not
63
/// overflow.
64
pub trait TimeProvider<Unit, Number> {
65
    fn now(unit: &Unit) -> Option<Number>;
66

            
67
    /// Benchmarks: should return the time unit which has the worst performance calling
68
    /// `TimeProvider::now(unit)` with.
69
    #[cfg(feature = "runtime-benchmarks")]
70
    fn bench_worst_case_time_unit() -> Unit;
71

            
72
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
73
    #[cfg(feature = "runtime-benchmarks")]
74
    fn bench_set_now(instant: Number);
75
}
76

            
77
/// Interactions the pallet needs with assets.
78
pub trait Assets<AccountId, AssetId, Balance> {
79
    /// Transfer assets deposited by an account to another account.
80
    /// Those assets should not be considered deposited in the target account.
81
    fn transfer_deposit(
82
        asset_id: &AssetId,
83
        from: &AccountId,
84
        to: &AccountId,
85
        amount: Balance,
86
    ) -> DispatchResult;
87

            
88
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
89
    /// enough of that asset. Funds should be safe and not slashable.
90
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
91
        -> DispatchResult;
92

            
93
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
94
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
95
        -> DispatchResult;
96

            
97
    /// Return the deposit for given asset and account.
98
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
99

            
100
    /// Benchmarks: should return the asset id which has the worst performance when interacting
101
    /// with it.
102
    #[cfg(feature = "runtime-benchmarks")]
103
    fn bench_worst_case_asset_id() -> AssetId;
104

            
105
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
106
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
107
    /// from one asset to another. If there is only one asset id it is fine to return it in both
108
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
109
    #[cfg(feature = "runtime-benchmarks")]
110
    fn bench_worst_case_asset_id2() -> AssetId;
111

            
112
    /// Benchmarks: should set the balance.
113
    #[cfg(feature = "runtime-benchmarks")]
114
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
115
}
116

            
117
10233
#[pallet]
118
pub mod pallet {
119
    use super::*;
120

            
121
    /// Pooled Staking pallet.
122
43647
    #[pallet::pallet]
123
    pub struct Pallet<T>(PhantomData<T>);
124

            
125
    #[pallet::config]
126
    pub trait Config: frame_system::Config {
127
        /// Overarching event type
128
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
129

            
130
        /// Type used to represent stream ids. Should be large enough to not overflow.
131
        type StreamId: AtLeast32BitUnsigned
132
            + Default
133
            + Debug
134
            + Copy
135
            + Clone
136
            + FullCodec
137
            + TypeInfo
138
            + MaxEncodedLen;
139

            
140
        /// The balance type, which is also the type representing time (as this
141
        /// pallet will do math with both time and balances to compute how
142
        /// much should be paid).
143
        type Balance: Balance;
144

            
145
        /// Type representing an asset id, a identifier allowing distinguishing assets.
146
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
147

            
148
        /// Provide interaction with assets.
149
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
150

            
151
        /// Currency for the opening balance hold for the storage used by the Stream.
152
        /// NOT to be confused with Assets.
153
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
154
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
155

            
156
        type RuntimeHoldReason: From<HoldReason>;
157

            
158
        #[pallet::constant]
159
        type OpenStreamHoldAmount: Get<Self::Balance>;
160

            
161
        /// Represents which units of time can be used. Designed to be an enum
162
        /// with a variant for each kind of time source/scale supported.
163
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
164

            
165
        /// Provide the current time in given unit.
166
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
167

            
168
        type WeightInfo: weights::WeightInfo;
169
    }
170

            
171
    pub type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
172
    pub type AssetIdOf<T> = <T as Config>::AssetId;
173

            
174
    pub type RequestNonce = u32;
175

            
176
    /// A stream payment from source to target.
177
    /// Stores the last time the stream was updated, which allows to compute
178
    /// elapsed time and perform payment.
179
    #[derive(
180
        RuntimeDebug,
181
        PartialEq,
182
        Eq,
183
        Encode,
184
        Decode,
185
        Clone,
186
4704
        TypeInfo,
187
        Serialize,
188
        Deserialize,
189
        MaxEncodedLen,
190
    )]
191
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
192
        /// Payer, source of the stream.
193
        pub source: AccountId,
194
        /// Payee, target of the stream.
195
        pub target: AccountId,
196
        /// Stream config
197
        pub config: StreamConfig<Unit, AssetId, Balance>,
198
        /// How much is deposited to fund this stream.
199
        pub deposit: Balance,
200
        /// Last time the stream was updated in `config.time_unit`.
201
        pub last_time_updated: Balance,
202
        /// Nonce for requests. This prevents a request to make a first request
203
        /// then change it to another request to frontrun the other party
204
        /// accepting.
205
        pub request_nonce: RequestNonce,
206
        /// A pending change request if any.
207
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
208
        /// One-time opening deposit. Will be released on close.
209
        pub opening_deposit: Balance,
210
    }
211

            
212
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
213
88
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
214
88
            match account {
215
88
                a if a == self.source => Some(Party::Source),
216
37
                a if a == self.target => Some(Party::Target),
217
3
                _ => None,
218
            }
219
88
        }
220
    }
221

            
222
    /// Stream configuration.
223
    #[derive(
224
        RuntimeDebug,
225
        PartialEq,
226
        Eq,
227
        Encode,
228
        Decode,
229
        Copy,
230
        Clone,
231
2940
        TypeInfo,
232
        Serialize,
233
        Deserialize,
234
        MaxEncodedLen,
235
    )]
236
    pub struct StreamConfig<Unit, AssetId, BalanceOrDuration> {
237
        /// Unit in which time is measured using a `TimeProvider`.
238
        pub time_unit: Unit,
239
        /// Asset used for payment.
240
        pub asset_id: AssetId,
241
        /// Amount of asset / unit.
242
        pub rate: BalanceOrDuration,
243
        /// Minimum amount of time that can be used for mandatory change requests.
244
        pub minimum_request_deadline_delay: BalanceOrDuration,
245
        /// Minimal amount the source must deposit in the stream. Deposit can go
246
        /// lower due to time passing, but deposit cannot be **decreased** lower
247
        /// explicitly by the source. If this is non-zero, the stream can only
248
        /// be closed by the target or if the deposit is zero. If deposit is lower
249
        /// than minimum due to time passing, source is allowed to **increase** the
250
        /// deposit to a value lower than the soft minimum.
251
        ///
252
        /// This system guarantees to the target that the source cannot instantly
253
        /// stop paying, and may prepare for termination of service if the deposit
254
        /// is below this minimum.
255
        pub soft_minimum_deposit: BalanceOrDuration,
256
    }
257

            
258
    /// Origin of a change request.
259
    #[derive(
260
        RuntimeDebug,
261
        PartialEq,
262
        Eq,
263
        Encode,
264
        Decode,
265
        Copy,
266
        Clone,
267
1176
        TypeInfo,
268
        Serialize,
269
        Deserialize,
270
        MaxEncodedLen,
271
    )]
272
    pub enum Party {
273
50
        Source,
274
25
        Target,
275
    }
276

            
277
    impl Party {
278
4
        pub fn inverse(self) -> Self {
279
4
            match self {
280
2
                Party::Source => Party::Target,
281
2
                Party::Target => Party::Source,
282
            }
283
4
        }
284
    }
285

            
286
    /// Kind of change requested.
287
    #[derive(
288
        RuntimeDebug,
289
        PartialEq,
290
        Eq,
291
        Encode,
292
        Decode,
293
        Copy,
294
        Clone,
295
1176
        TypeInfo,
296
        Serialize,
297
        Deserialize,
298
        MaxEncodedLen,
299
    )]
300
    pub enum ChangeKind<Time> {
301
19
        /// The requested change is a suggestion, and the other party doesn't
302
        /// need to accept it.
303
        Suggestion,
304
68
        /// The requested change is mandatory, and the other party must either
305
        /// accept the change or close the stream. Reaching the deadline will
306
        /// close the stream too.
307
        Mandatory { deadline: Time },
308
    }
309

            
310
    /// Describe how the deposit should change.
311
    #[derive(
312
        RuntimeDebug,
313
        PartialEq,
314
        Eq,
315
        Encode,
316
        Decode,
317
        Copy,
318
        Clone,
319
2352
        TypeInfo,
320
        Serialize,
321
        Deserialize,
322
        MaxEncodedLen,
323
    )]
324
    pub enum DepositChange<Balance> {
325
56
        /// Increase deposit by given amount.
326
        Increase(Balance),
327
1
        /// Decrease deposit by given amount.
328
        Decrease(Balance),
329
6
        /// Set deposit to given amount.
330
        Absolute(Balance),
331
    }
332

            
333
    /// A request to change a stream config.
334
    #[derive(
335
        RuntimeDebug,
336
        PartialEq,
337
        Eq,
338
        Encode,
339
        Decode,
340
        Clone,
341
2352
        TypeInfo,
342
        Serialize,
343
        Deserialize,
344
        MaxEncodedLen,
345
    )]
346
    pub struct ChangeRequest<Unit, AssetId, Balance> {
347
        pub requester: Party,
348
        pub kind: ChangeKind<Balance>,
349
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
350
        pub deposit_change: Option<DepositChange<Balance>>,
351
    }
352

            
353
    pub type StreamOf<T> =
354
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
355

            
356
    pub type StreamConfigOf<T> =
357
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
358

            
359
    pub type ChangeRequestOf<T> =
360
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
361

            
362
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
363
    pub struct StreamPaymentStatus<Balance> {
364
        pub payment: Balance,
365
        pub deposit_left: Balance,
366
        /// Whenever the stream is stalled, which can occur either when no funds are left or
367
        /// if the time is past a mandatory request deadline.
368
        pub stalled: bool,
369
    }
370

            
371
    /// Store the next available stream id.
372
928
    #[pallet::storage]
373
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
374

            
375
    /// Store each stream indexed by an Id.
376
1031
    #[pallet::storage]
377
    pub type Streams<T: Config> = StorageMap<
378
        Hasher = Blake2_128Concat,
379
        Key = T::StreamId,
380
        Value = StreamOf<T>,
381
        QueryKind = OptionQuery,
382
    >;
383

            
384
    /// Lookup for all streams with given source.
385
    /// To avoid maintaining a growing list of stream ids, they are stored in
386
    /// the form of an entry (AccountId, StreamId). If such entry exists then
387
    /// this AccountId is a source in StreamId. One can iterate over all storage
388
    /// keys starting with the AccountId to find all StreamIds.
389
693
    #[pallet::storage]
390
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
391
        Key1 = AccountIdOf<T>,
392
        Hasher1 = Blake2_128Concat,
393
        Key2 = T::StreamId,
394
        Hasher2 = Blake2_128Concat,
395
        Value = (),
396
        QueryKind = OptionQuery,
397
    >;
398

            
399
    /// Lookup for all streams with given target.
400
    /// To avoid maintaining a growing list of stream ids, they are stored in
401
    /// the form of an entry (AccountId, StreamId). If such entry exists then
402
    /// this AccountId is a target in StreamId. One can iterate over all storage
403
    /// keys starting with the AccountId to find all StreamIds.
404
693
    #[pallet::storage]
405
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
406
        Key1 = AccountIdOf<T>,
407
        Hasher1 = Blake2_128Concat,
408
        Key2 = T::StreamId,
409
        Hasher2 = Blake2_128Concat,
410
        Value = (),
411
        QueryKind = OptionQuery,
412
    >;
413

            
414
156
    #[pallet::error]
415
    #[derive(Clone, PartialEq, Eq)]
416
    pub enum Error<T> {
417
        UnknownStreamId,
418
        StreamIdOverflow,
419
        UnauthorizedOrigin,
420
        CantBeBothSourceAndTarget,
421
        CantFetchCurrentTime,
422
        SourceCantDecreaseRate,
423
        TargetCantIncreaseRate,
424
        CantOverrideMandatoryChange,
425
        NoPendingRequest,
426
        CantAcceptOwnRequest,
427
        CanOnlyCancelOwnRequest,
428
        WrongRequestNonce,
429
        ChangingAssetRequiresAbsoluteDepositChange,
430
        TargetCantChangeDeposit,
431
        ImmediateDepositChangeRequiresSameAssetId,
432
        DeadlineCantBeInPast,
433
        CantFetchStatusBeforeLastTimeUpdated,
434
        DeadlineDelayIsBelowMinium,
435
        CantDecreaseDepositUnderSoftDepositMinimum,
436
        SourceCantCloseActiveStreamWithSoftDepositMinimum,
437
        CantCreateStreamWithDepositUnderSoftMinimum,
438
    }
439

            
440
588
    #[pallet::event]
441
216
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
442
    pub enum Event<T: Config> {
443
49
        StreamOpened {
444
            stream_id: T::StreamId,
445
        },
446
8
        StreamClosed {
447
            stream_id: T::StreamId,
448
            refunded: T::Balance,
449
        },
450
13
        StreamPayment {
451
            stream_id: T::StreamId,
452
            source: AccountIdOf<T>,
453
            target: AccountIdOf<T>,
454
            amount: T::Balance,
455
            stalled: bool,
456
        },
457
12
        StreamConfigChangeRequested {
458
            stream_id: T::StreamId,
459
            request_nonce: RequestNonce,
460
            requester: Party,
461
            old_config: StreamConfigOf<T>,
462
            new_config: StreamConfigOf<T>,
463
        },
464
7
        StreamConfigChanged {
465
            stream_id: T::StreamId,
466
            old_config: StreamConfigOf<T>,
467
            new_config: StreamConfigOf<T>,
468
            deposit_change: Option<DepositChange<T::Balance>>,
469
        },
470
    }
471

            
472
    /// Freeze reason to use if needed.
473
    #[pallet::composite_enum]
474
    pub enum FreezeReason {
475
        StreamPayment,
476
    }
477

            
478
    /// Hold reason to use if needed.
479
    #[pallet::composite_enum]
480
    pub enum HoldReason {
481
236
        StreamPayment,
482
519
        StreamOpened,
483
    }
484

            
485
420
    #[pallet::call]
486
    impl<T: Config> Pallet<T> {
487
        /// Create a payment stream from the origin to the target with provided config
488
        /// and initial deposit (in the asset defined in the config).
489
        #[pallet::call_index(0)]
490
        #[pallet::weight(T::WeightInfo::open_stream())]
491
        pub fn open_stream(
492
            origin: OriginFor<T>,
493
            target: AccountIdOf<T>,
494
            config: StreamConfigOf<T>,
495
            initial_deposit: T::Balance,
496
87
        ) -> DispatchResultWithPostInfo {
497
87
            let origin = ensure_signed(origin)?;
498

            
499
87
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
500

            
501
81
            Ok(().into())
502
        }
503

            
504
        /// Close a given stream in which the origin is involved. It performs the pending payment
505
        /// before closing the stream.
506
        #[pallet::call_index(1)]
507
        #[pallet::weight(T::WeightInfo::close_stream())]
508
        pub fn close_stream(
509
            origin: OriginFor<T>,
510
            stream_id: T::StreamId,
511
22
        ) -> DispatchResultWithPostInfo {
512
22
            let origin = ensure_signed(origin)?;
513
22
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
514

            
515
            // Only source or target can close a stream.
516
21
            ensure!(
517
21
                origin == stream.source || origin == stream.target,
518
1
                Error::<T>::UnauthorizedOrigin
519
            );
520

            
521
            // Update stream before closing it to ensure fair payment.
522
20
            Self::perform_stream_payment(stream_id, &mut stream)?;
523

            
524
            // If there is a soft minimum deposit, stream can be closed only by target or if deposit is empty.
525
20
            ensure!(
526
20
                stream.config.soft_minimum_deposit.is_zero()
527
3
                    || stream.deposit.is_zero()
528
2
                    || origin == stream.target,
529
1
                Error::<T>::SourceCantCloseActiveStreamWithSoftDepositMinimum
530
            );
531

            
532
            // Unfreeze funds left in the stream.
533
19
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
534

            
535
            // Release opening deposit
536
19
            if stream.opening_deposit > 0u32.into() {
537
19
                T::Currency::release(
538
19
                    &HoldReason::StreamOpened.into(),
539
19
                    &stream.source,
540
19
                    stream.opening_deposit,
541
19
                    Precision::Exact,
542
19
                )?;
543
            }
544

            
545
            // Remove stream from storage.
546
19
            Streams::<T>::remove(stream_id);
547
19
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
548
19
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
549
19

            
550
19
            // Emit event.
551
19
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
552
19
                stream_id,
553
19
                // TODO: Should `refunded` in event really include the opening_deposit?
554
19
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
555
19
            });
556
19

            
557
19
            Ok(().into())
558
        }
559

            
560
        /// Perform the pending payment of a stream. Anyone can call this.
561
        #[pallet::call_index(2)]
562
        #[pallet::weight(T::WeightInfo::perform_payment())]
563
        pub fn perform_payment(
564
            origin: OriginFor<T>,
565
            stream_id: T::StreamId,
566
25
        ) -> DispatchResultWithPostInfo {
567
25
            // No problem with anyone updating any stream.
568
25
            let _ = ensure_signed(origin)?;
569

            
570
25
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
571
24
            Self::perform_stream_payment(stream_id, &mut stream)?;
572
24
            Streams::<T>::insert(stream_id, stream);
573
24

            
574
24
            Ok(().into())
575
        }
576

            
577
        /// Requests a change to a stream config or deposit.
578
        ///
579
        /// If the new config don't change the time unit and asset id, the change will be applied
580
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
581
        /// in the stream and will have to be approved by the other party.
582
        ///
583
        /// This call accepts a deposit change, which can only be provided by the source of the
584
        /// stream. An absolute change is required when changing asset id, as the current deposit
585
        /// will be released and a new deposit is required in the new asset.
586
        #[pallet::call_index(3)]
587
        #[pallet::weight(
588
            T::WeightInfo::request_change_immediate()
589
            .max(T::WeightInfo::request_change_delayed())
590
        )]
591
        pub fn request_change(
592
            origin: OriginFor<T>,
593
            stream_id: T::StreamId,
594
            kind: ChangeKind<T::Balance>,
595
            new_config: StreamConfigOf<T>,
596
            deposit_change: Option<DepositChange<T::Balance>>,
597
58
        ) -> DispatchResultWithPostInfo {
598
58
            let origin = ensure_signed(origin)?;
599
58
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
600

            
601
57
            let requester = stream
602
57
                .account_to_party(origin)
603
57
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
604

            
605
56
            ensure!(
606
56
                requester == Party::Source || deposit_change.is_none(),
607
1
                Error::<T>::TargetCantChangeDeposit
608
            );
609

            
610
55
            if stream.config == new_config && deposit_change.is_none() {
611
1
                return Ok(().into());
612
54
            }
613

            
614
54
            if let ChangeKind::Mandatory { deadline } = kind {
615
20
                let now = T::TimeProvider::now(&stream.config.time_unit)
616
20
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
617

            
618
20
                let Some(diff) = deadline.checked_sub(&now) else {
619
1
                    return Err(Error::<T>::DeadlineCantBeInPast.into());
620
                };
621

            
622
19
                ensure!(
623
19
                    diff >= stream.config.minimum_request_deadline_delay,
624
1
                    Error::<T>::DeadlineDelayIsBelowMinium
625
                );
626
34
            }
627

            
628
            // If asset id and time unit are the same, we allow to make the change
629
            // immediatly if the origin is at a disadvantage.
630
            // We allow this even if there is already a pending request.
631
            // This checks that the deposit can't be **decreased** under
632
            // config.soft_minimum_deposit.
633
52
            if Self::maybe_immediate_change(
634
52
                stream_id,
635
52
                &mut stream,
636
52
                &new_config,
637
52
                deposit_change,
638
52
                requester,
639
52
            )? {
640
7
                return Ok(().into());
641
41
            }
642
41

            
643
41
            // If the source is requesting a change of asset, they must provide an absolute change.
644
41
            if requester == Party::Source
645
29
                && new_config.asset_id != stream.config.asset_id
646
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
647
            {
648
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
649
38
            }
650

            
651
            // If there is already a mandatory change request, only the origin
652
            // of this request can change it.
653
            if let Some(ChangeRequest {
654
                kind: ChangeKind::Mandatory { .. },
655
3
                requester: pending_requester,
656
                ..
657
5
            }) = &stream.pending_request
658
            {
659
3
                ensure!(
660
3
                    &requester == pending_requester,
661
2
                    Error::<T>::CantOverrideMandatoryChange
662
                );
663
35
            }
664

            
665
36
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
666
36
            stream.pending_request = Some(ChangeRequest {
667
36
                requester,
668
36
                kind,
669
36
                new_config: new_config.clone(),
670
36
                deposit_change,
671
36
            });
672
36

            
673
36
            // Emit event.
674
36
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
675
36
                stream_id,
676
36
                request_nonce: stream.request_nonce,
677
36
                requester,
678
36
                old_config: stream.config.clone(),
679
36
                new_config,
680
36
            });
681
36

            
682
36
            // Update storage.
683
36
            Streams::<T>::insert(stream_id, stream);
684
36

            
685
36
            Ok(().into())
686
        }
687

            
688
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
689
        /// frontrunning attacks. If the target made a request, the source is able to change their
690
        /// deposit.
691
        #[pallet::call_index(4)]
692
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
693
        pub fn accept_requested_change(
694
            origin: OriginFor<T>,
695
            stream_id: T::StreamId,
696
            request_nonce: RequestNonce,
697
            deposit_change: Option<DepositChange<T::Balance>>,
698
28
        ) -> DispatchResultWithPostInfo {
699
28
            let origin = ensure_signed(origin)?;
700
28
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
701

            
702
27
            let accepter = stream
703
27
                .account_to_party(origin)
704
27
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
705

            
706
26
            let Some(request) = stream.pending_request.clone() else {
707
                return Err(Error::<T>::NoPendingRequest.into());
708
            };
709

            
710
26
            ensure!(
711
26
                request_nonce == stream.request_nonce,
712
1
                Error::<T>::WrongRequestNonce
713
            );
714
25
            ensure!(
715
25
                accepter != request.requester,
716
1
                Error::<T>::CantAcceptOwnRequest
717
            );
718

            
719
24
            ensure!(
720
24
                accepter == Party::Source || deposit_change.is_none(),
721
1
                Error::<T>::TargetCantChangeDeposit
722
            );
723

            
724
            // Perform pending payment before changing config.
725
23
            Self::perform_stream_payment(stream_id, &mut stream)?;
726

            
727
            // Apply change.
728
            // It is safe to override config now as we have already performed the payment.
729
            // Checks made in apply_deposit_change needs to be done with new config.
730
23
            let old_config = stream.config;
731
23
            stream.config = request.new_config;
732
23
            let deposit_change = deposit_change.or(request.deposit_change);
733
23

            
734
23
            match (
735
23
                old_config.asset_id == stream.config.asset_id,
736
23
                deposit_change,
737
            ) {
738
                // Same asset and a change, we apply it like in `change_deposit` call.
739
13
                (true, Some(change)) => {
740
13
                    Self::apply_deposit_change(&mut stream, change)?;
741
                }
742
                // Same asset and no change, no problem.
743
4
                (true, None) => (),
744
                // Change in asset with absolute new amount
745
3
                (false, Some(DepositChange::Absolute(amount))) => {
746
3
                    // As target chooses the deposit amount in new currency, we must ensure
747
3
                    // it is greater than the soft minimum deposit.
748
3
                    ensure!(
749
3
                        amount >= stream.config.soft_minimum_deposit,
750
1
                        Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum
751
                    );
752

            
753
                    // Release deposit in old asset.
754
2
                    T::Assets::decrease_deposit(
755
2
                        &old_config.asset_id,
756
2
                        &stream.source,
757
2
                        stream.deposit,
758
2
                    )?;
759

            
760
                    // Make deposit in new asset.
761
2
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, amount)?;
762
2
                    stream.deposit = amount;
763
                }
764
                // It doesn't make sense to change asset while not providing an absolute new
765
                // amount.
766
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
767
            }
768

            
769
            // If time unit changes we need to update `last_time_updated` to be in the
770
            // new unit.
771
19
            if old_config.time_unit != stream.config.time_unit {
772
2
                stream.last_time_updated = T::TimeProvider::now(&stream.config.time_unit)
773
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
774
17
            }
775

            
776
            // Event
777
19
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
778
19
                stream_id,
779
19
                old_config,
780
19
                new_config: stream.config.clone(),
781
19
                deposit_change,
782
19
            });
783
19

            
784
19
            // Update stream in storage.
785
19
            stream.pending_request = None;
786
19
            Streams::<T>::insert(stream_id, stream);
787
19

            
788
19
            Ok(().into())
789
        }
790

            
791
        #[pallet::call_index(5)]
792
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
793
        pub fn cancel_change_request(
794
            origin: OriginFor<T>,
795
            stream_id: T::StreamId,
796
5
        ) -> DispatchResultWithPostInfo {
797
5
            let origin = ensure_signed(origin)?;
798
5
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
799

            
800
4
            let accepter = stream
801
4
                .account_to_party(origin)
802
4
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
803

            
804
3
            let Some(request) = stream.pending_request.take() else {
805
1
                return Err(Error::<T>::NoPendingRequest.into());
806
            };
807

            
808
2
            ensure!(
809
2
                accepter == request.requester,
810
2
                Error::<T>::CanOnlyCancelOwnRequest
811
            );
812

            
813
            // Update storage.
814
            // Pending request is removed by calling `.take()`.
815
            Streams::<T>::insert(stream_id, stream);
816

            
817
            Ok(().into())
818
        }
819

            
820
        /// Allows immediately changing the deposit for a stream, which is simpler than
821
        /// calling `request_change` with the proper parameters.
822
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
823
        /// the call is included in a block, in which case the unit is no longer the same and quantities
824
        /// will not have the same scale/value.
825
        #[pallet::call_index(6)]
826
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
827
        pub fn immediately_change_deposit(
828
            origin: OriginFor<T>,
829
            stream_id: T::StreamId,
830
            asset_id: T::AssetId,
831
            change: DepositChange<T::Balance>,
832
12
        ) -> DispatchResultWithPostInfo {
833
12
            let origin = ensure_signed(origin)?;
834
12
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
835

            
836
11
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
837
9
            ensure!(
838
9
                stream.config.asset_id == asset_id,
839
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
840
            );
841

            
842
            // Perform pending payment before changing deposit.
843
9
            Self::perform_stream_payment(stream_id, &mut stream)?;
844

            
845
            // Apply change.
846
9
            Self::apply_deposit_change(&mut stream, change)?;
847

            
848
            // Event
849
4
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
850
4
                stream_id,
851
4
                old_config: stream.config.clone(),
852
4
                new_config: stream.config.clone(),
853
4
                deposit_change: Some(change),
854
4
            });
855
4

            
856
4
            // Update stream in storage.
857
4
            Streams::<T>::insert(stream_id, stream);
858
4

            
859
4
            Ok(().into())
860
        }
861
    }
862

            
863
    impl<T: Config> Pallet<T> {
864
        /// Try to open a stream and returns its id.
865
        /// Prefers calling this function from other pallets instead of `open_stream` as the
866
        /// latter can't return the id.
867
87
        pub fn open_stream_returns_id(
868
87
            origin: AccountIdOf<T>,
869
87
            target: AccountIdOf<T>,
870
87
            config: StreamConfigOf<T>,
871
87
            initial_deposit: T::Balance,
872
87
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
873
87
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
874

            
875
86
            ensure!(
876
86
                initial_deposit >= config.soft_minimum_deposit,
877
1
                Error::<T>::CantCreateStreamWithDepositUnderSoftMinimum
878
            );
879

            
880
            // Generate a new stream id.
881
85
            let stream_id = NextStreamId::<T>::get();
882
85
            let next_stream_id = stream_id
883
85
                .checked_add(&One::one())
884
85
                .ok_or(Error::<T>::StreamIdOverflow)?;
885
84
            NextStreamId::<T>::set(next_stream_id);
886
84

            
887
84
            // Hold opening deposit for the storage used by Stream
888
84
            let opening_deposit = T::OpenStreamHoldAmount::get();
889
84
            if opening_deposit > 0u32.into() {
890
84
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
891
            }
892

            
893
            // Freeze initial deposit.
894
84
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
895

            
896
            // Create stream data.
897
81
            let now =
898
82
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
899
81
            let stream = Stream {
900
81
                source: origin.clone(),
901
81
                target: target.clone(),
902
81
                config,
903
81
                deposit: initial_deposit,
904
81
                last_time_updated: now,
905
81
                request_nonce: 0,
906
81
                pending_request: None,
907
81
                opening_deposit,
908
81
            };
909
81

            
910
81
            // Insert stream in storage.
911
81
            Streams::<T>::insert(stream_id, stream);
912
81
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
913
81
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
914
81

            
915
81
            // Emit event.
916
81
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
917
81

            
918
81
            Ok(stream_id)
919
87
        }
920

            
921
        /// Get the stream payment current status, telling how much payment is
922
        /// pending, how much deposit will be left and whenever the stream is stalled.
923
        /// The stream is considered stalled if no funds are left or if the provided
924
        /// time is past a mandatory request deadline. If the provided `now` is `None`
925
        /// then the current time will be fetched. Being able to provide a custom `now`
926
        /// allows to check the status in the future. It is invalid to provide a `now` that is
927
        /// before `last_time_updated`.
928
55
        pub fn stream_payment_status(
929
55
            stream_id: T::StreamId,
930
55
            now: Option<T::Balance>,
931
55
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
932
55
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
933
43
            let now = match now {
934
                Some(v) => v,
935
43
                None => T::TimeProvider::now(&stream.config.time_unit)
936
43
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
937
            };
938

            
939
43
            let last_time_updated = stream.last_time_updated;
940
43

            
941
43
            ensure!(
942
43
                now >= last_time_updated,
943
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
944
            );
945

            
946
43
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
947
55
        }
948

            
949
130
        fn stream_payment_status_by_ref(
950
130
            stream: &StreamOf<T>,
951
130
            last_time_updated: T::Balance,
952
130
            mut now: T::Balance,
953
130
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
954
130
            let mut stalled_by_deadline = false;
955

            
956
            // Take into account mandatory change request deadline. Note that
957
            // while it'll perform payment up to deadline,
958
            // `stream.last_time_updated` is still the "real now" to avoid
959
            // retroactive payment in case the deadline changes.
960
            if let Some(ChangeRequest {
961
34
                kind: ChangeKind::Mandatory { deadline },
962
                ..
963
44
            }) = &stream.pending_request
964
            {
965
34
                now = min(now, *deadline);
966
34

            
967
34
                if now == *deadline {
968
15
                    stalled_by_deadline = true;
969
21
                }
970
96
            }
971

            
972
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
973
130
            if stream.deposit.is_zero() {
974
                return Ok(StreamPaymentStatus {
975
                    payment: 0u32.into(),
976
                    deposit_left: stream.deposit,
977
                    stalled: true,
978
                });
979
130
            }
980

            
981
            // Dont perform payment if now is before or equal to `last_time_updated`.
982
            // It can be before due to the deadline adjustment.
983
130
            let Some(delta) = now.checked_sub(&last_time_updated) else {
984
2
                return Ok(StreamPaymentStatus {
985
2
                    payment: 0u32.into(),
986
2
                    deposit_left: stream.deposit,
987
2
                    stalled: true,
988
2
                });
989
            };
990

            
991
            // We compute the amount due to the target according to the rate, which may be
992
            // lowered if the stream deposit is lower.
993
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
994
            // considering it an error can make a stream un-updatable if too much time has passed
995
            // without updates.
996
128
            let mut payment = delta.saturating_mul(stream.config.rate);
997

            
998
            // We compute the new amount of locked funds. If it underflows it
999
            // means that there is more to pay that what is left, in which case
            // we pay all that is left.
128
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
126
                Some(v) if v.is_zero() => (v, true),
123
                Some(v) => (v, stalled_by_deadline),
                None => {
2
                    payment = stream.deposit;
2
                    (Zero::zero(), true)
                }
            };
128
            Ok(StreamPaymentStatus {
128
                payment,
128
                deposit_left,
128
                stalled,
128
            })
130
        }
        /// Behavior:
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
        /// last time the stream was updated. When updating the stream, **at most**
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
        /// account. If this amount is greater than the left deposit, the stream is considered
        /// drained **but not closed**. The source can come back later and refill the stream,
        /// however there will be no retroactive payment for the time spent as drained.
        /// If the stream payment is used to rent a service, the target should pause the service
        /// while the stream is drained, and resume it once it is refilled.
87
        fn perform_stream_payment(
87
            stream_id: T::StreamId,
87
            stream: &mut StreamOf<T>,
87
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
87
            let now = T::TimeProvider::now(&stream.config.time_unit)
87
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
            // We want to update `stream.last_time_updated` to `now` as soon
            // as possible to avoid forgetting to do it. We copy the old value
            // for payment computation.
87
            let last_time_updated = stream.last_time_updated;
87
            stream.last_time_updated = now;
            let StreamPaymentStatus {
87
                payment,
87
                deposit_left,
87
                stalled,
87
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
87
            if payment.is_zero() {
37
                return Ok(0u32.into());
50
            }
50

            
50
            // Transfer from the source to target.
50
            T::Assets::transfer_deposit(
50
                &stream.config.asset_id,
50
                &stream.source,
50
                &stream.target,
50
                payment,
50
            )?;
            // Update stream info.
50
            stream.deposit = deposit_left;
50

            
50
            // Emit event.
50
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
50
                stream_id,
50
                source: stream.source.clone(),
50
                target: stream.target.clone(),
50
                amount: payment,
50
                stalled,
50
            });
50

            
50
            Ok(payment)
87
        }
31
        fn apply_deposit_change(
31
            stream: &mut StreamOf<T>,
31
            change: DepositChange<T::Balance>,
31
        ) -> DispatchResultWithPostInfo {
31
            match change {
8
                DepositChange::Absolute(amount) => {
8
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::Assets::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
7
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
7
                        if amount < stream.config.soft_minimum_deposit {
2
                            return Err(
2
                                Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into()
2
                            );
5
                        }
5

            
5
                        T::Assets::decrease_deposit(
5
                            &stream.config.asset_id,
5
                            &stream.source,
5
                            decrease,
5
                        )?;
                    }
5
                    stream.deposit = amount;
                }
17
                DepositChange::Increase(increase) => {
17
                    stream.deposit = stream
17
                        .deposit
17
                        .checked_add(&increase)
17
                        .ok_or(ArithmeticError::Overflow)?;
15
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
                }
6
                DepositChange::Decrease(decrease) => {
6
                    let new_deposit = stream
6
                        .deposit
6
                        .checked_sub(&decrease)
6
                        .ok_or(ArithmeticError::Underflow)?;
4
                    if new_deposit < stream.config.soft_minimum_deposit {
2
                        return Err(Error::<T>::CantDecreaseDepositUnderSoftDepositMinimum.into());
2
                    }
2

            
2
                    stream.deposit = new_deposit;
2
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
                }
            }
22
            Ok(().into())
31
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
52
        fn maybe_immediate_change(
52
            stream_id: T::StreamId,
52
            stream: &mut StreamOf<T>,
52
            new_config: &StreamConfigOf<T>,
52
            deposit_change: Option<DepositChange<T::Balance>>,
52
            requester: Party,
52
        ) -> Result<bool, DispatchErrorWithPostInfo> {
52
            if new_config.time_unit != stream.config.time_unit
35
                || new_config.asset_id != stream.config.asset_id
            {
23
                return Ok(false);
29
            }
29

            
29
            if requester == Party::Source && new_config.rate < stream.config.rate {
14
                return Ok(false);
15
            }
15

            
15
            if requester == Party::Target && new_config.rate > stream.config.rate {
4
                return Ok(false);
11
            }
11

            
11
            // Perform pending payment before changing config.
11
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
11
            if let Some(change) = deposit_change {
9
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
7
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
7
                stream_id,
7
                old_config: stream.config.clone(),
7
                new_config: new_config.clone(),
7
                deposit_change,
7
            });
7

            
7
            // Update storage.
7
            stream.config = new_config.clone();
7
            Streams::<T>::insert(stream_id, stream);
7

            
7
            Ok(true)
52
        }
    }
}