1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
pub mod weights;
30
pub use weights::WeightInfo;
31

            
32
use {
33
    core::cmp::min,
34
    frame_support::{
35
        dispatch::DispatchErrorWithPostInfo,
36
        pallet,
37
        pallet_prelude::*,
38
        storage::types::{StorageDoubleMap, StorageMap},
39
        traits::{
40
            fungible::{Inspect, MutateHold},
41
            tokens::{Balance, Precision},
42
        },
43
        Blake2_128Concat,
44
    },
45
    frame_system::pallet_prelude::*,
46
    parity_scale_codec::{FullCodec, MaxEncodedLen},
47
    scale_info::TypeInfo,
48
    serde::{Deserialize, Serialize},
49
    sp_runtime::{
50
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
51
        ArithmeticError,
52
    },
53
    sp_std::{fmt::Debug, marker::PhantomData},
54
};
55

            
56
pub use pallet::*;
57

            
58
/// Type able to provide the current time for given unit.
59
/// For each unit the returned number should monotonically increase and not
60
/// overflow.
61
pub trait TimeProvider<Unit, Number> {
62
    fn now(unit: &Unit) -> Option<Number>;
63

            
64
    /// Benchmarks: should return the time unit which has the worst performance calling
65
    /// `TimeProvider::now(unit)` with.
66
    #[cfg(feature = "runtime-benchmarks")]
67
    fn bench_worst_case_time_unit() -> Unit;
68

            
69
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
70
    #[cfg(feature = "runtime-benchmarks")]
71
    fn bench_set_now(instant: Number);
72
}
73

            
74
/// Interactions the pallet needs with assets.
75
pub trait Assets<AccountId, AssetId, Balance> {
76
    /// Transfer assets deposited by an account to another account.
77
    /// Those assets should not be considered deposited in the target account.
78
    fn transfer_deposit(
79
        asset_id: &AssetId,
80
        from: &AccountId,
81
        to: &AccountId,
82
        amount: Balance,
83
    ) -> DispatchResult;
84

            
85
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
86
    /// enough of that asset. Funds should be safe and not slashable.
87
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
88
        -> DispatchResult;
89

            
90
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
91
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
92
        -> DispatchResult;
93

            
94
    /// Return the deposit for given asset and account.
95
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
96

            
97
    /// Benchmarks: should return the asset id which has the worst performance when interacting
98
    /// with it.
99
    #[cfg(feature = "runtime-benchmarks")]
100
    fn bench_worst_case_asset_id() -> AssetId;
101

            
102
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
103
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
104
    /// from one asset to another. If there is only one asset id it is fine to return it in both
105
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
106
    #[cfg(feature = "runtime-benchmarks")]
107
    fn bench_worst_case_asset_id2() -> AssetId;
108

            
109
    /// Benchmarks: should set the balance.
110
    #[cfg(feature = "runtime-benchmarks")]
111
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
112
}
113

            
114
6786
#[pallet]
115
pub mod pallet {
116
    use super::*;
117

            
118
    /// Pooled Staking pallet.
119
27971
    #[pallet::pallet]
120
    pub struct Pallet<T>(PhantomData<T>);
121

            
122
    #[pallet::config]
123
    pub trait Config: frame_system::Config {
124
        /// Overarching event type
125
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
126

            
127
        /// Type used to represent stream ids. Should be large enough to not overflow.
128
        type StreamId: AtLeast32BitUnsigned
129
            + Default
130
            + Debug
131
            + Copy
132
            + Clone
133
            + FullCodec
134
            + TypeInfo
135
            + MaxEncodedLen;
136

            
137
        /// The balance type, which is also the type representing time (as this
138
        /// pallet will do math with both time and balances to compute how
139
        /// much should be paid).
140
        type Balance: Balance;
141

            
142
        /// Type representing an asset id, a identifier allowing distinguishing assets.
143
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
144

            
145
        /// Provide interaction with assets.
146
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
147

            
148
        /// Currency for the opening balance hold for the storage used by the Stream.
149
        /// NOT to be confused with Assets.
150
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
151
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
152

            
153
        type RuntimeHoldReason: From<HoldReason>;
154

            
155
        #[pallet::constant]
156
        type OpenStreamHoldAmount: Get<Self::Balance>;
157

            
158
        /// Represents which units of time can be used. Designed to be an enum
159
        /// with a variant for each kind of time source/scale supported.
160
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
161

            
162
        /// Provide the current time in given unit.
163
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
164

            
165
        type WeightInfo: weights::WeightInfo;
166
    }
167

            
168
    type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
169
    type AssetIdOf<T> = <T as Config>::AssetId;
170

            
171
    pub type RequestNonce = u32;
172

            
173
    /// A stream payment from source to target.
174
    /// Stores the last time the stream was updated, which allows to compute
175
    /// elapsed time and perform payment.
176
    #[derive(
177
        RuntimeDebug,
178
        PartialEq,
179
        Eq,
180
        Encode,
181
        Decode,
182
        Clone,
183
3104
        TypeInfo,
184
        Serialize,
185
        Deserialize,
186
        MaxEncodedLen,
187
    )]
188
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
189
        /// Payer, source of the stream.
190
        pub source: AccountId,
191
        /// Payee, target of the stream.
192
        pub target: AccountId,
193
        /// Steam config (time unit, asset id, rate)
194
        pub config: StreamConfig<Unit, AssetId, Balance>,
195
        /// How much is deposited to fund this stream.
196
        pub deposit: Balance,
197
        /// Last time the stream was updated in `config.time_unit`.
198
        pub last_time_updated: Balance,
199
        /// Nonce for requests. This prevents a request to make a first request
200
        /// then change it to another request to frontrun the other party
201
        /// accepting.
202
        pub request_nonce: RequestNonce,
203
        /// A pending change request if any.
204
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
205
        /// One-time opening deposit. Will be released on close.
206
        pub opening_deposit: Balance,
207
    }
208

            
209
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
210
77
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
211
77
            match account {
212
77
                a if a == self.source => Some(Party::Source),
213
30
                a if a == self.target => Some(Party::Target),
214
3
                _ => None,
215
            }
216
77
        }
217
    }
218

            
219
    /// Stream configuration.
220
    #[derive(
221
        RuntimeDebug,
222
        PartialEq,
223
        Eq,
224
        Encode,
225
        Decode,
226
        Copy,
227
        Clone,
228
1164
        TypeInfo,
229
        Serialize,
230
        Deserialize,
231
        MaxEncodedLen,
232
    )]
233
    pub struct StreamConfig<Unit, AssetId, Balance> {
234
        /// Unit in which time is measured using a `TimeProvider`.
235
        pub time_unit: Unit,
236
        /// Asset used for payment.
237
        pub asset_id: AssetId,
238
        /// Amount of asset / unit.
239
        pub rate: Balance,
240
    }
241

            
242
    /// Origin of a change request.
243
    #[derive(
244
        RuntimeDebug,
245
        PartialEq,
246
        Eq,
247
        Encode,
248
        Decode,
249
        Copy,
250
        Clone,
251
776
        TypeInfo,
252
        Serialize,
253
        Deserialize,
254
        MaxEncodedLen,
255
    )]
256
    pub enum Party {
257
42
        Source,
258
23
        Target,
259
    }
260

            
261
    impl Party {
262
4
        pub fn inverse(self) -> Self {
263
4
            match self {
264
2
                Party::Source => Party::Target,
265
2
                Party::Target => Party::Source,
266
            }
267
4
        }
268
    }
269

            
270
    /// Kind of change requested.
271
    #[derive(
272
        RuntimeDebug,
273
        PartialEq,
274
        Eq,
275
        Encode,
276
        Decode,
277
        Copy,
278
        Clone,
279
776
        TypeInfo,
280
        Serialize,
281
        Deserialize,
282
        MaxEncodedLen,
283
    )]
284
    pub enum ChangeKind<Time> {
285
23
        /// The requested change is a suggestion, and the other party doesn't
286
        /// need to accept it.
287
        Suggestion,
288
46
        /// The requested change is mandatory, and the other party must either
289
        /// accept the change or close the stream. Reaching the deadline will
290
        /// close the stream too.
291
        Mandatory { deadline: Time },
292
    }
293

            
294
    /// Describe how the deposit should change.
295
    #[derive(
296
        RuntimeDebug,
297
        PartialEq,
298
        Eq,
299
        Encode,
300
        Decode,
301
        Copy,
302
        Clone,
303
1552
        TypeInfo,
304
        Serialize,
305
        Deserialize,
306
        MaxEncodedLen,
307
    )]
308
    pub enum DepositChange<Balance> {
309
37
        /// Increase deposit by given amount.
310
        Increase(Balance),
311
1
        /// Decrease deposit by given amount.
312
        Decrease(Balance),
313
1
        /// Set deposit to given amount.
314
        Absolute(Balance),
315
    }
316

            
317
    /// A request to change a stream config.
318
    #[derive(
319
        RuntimeDebug,
320
        PartialEq,
321
        Eq,
322
        Encode,
323
        Decode,
324
        Clone,
325
1552
        TypeInfo,
326
        Serialize,
327
        Deserialize,
328
        MaxEncodedLen,
329
    )]
330
    pub struct ChangeRequest<Unit, AssetId, Balance> {
331
        pub requester: Party,
332
        pub kind: ChangeKind<Balance>,
333
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
334
        pub deposit_change: Option<DepositChange<Balance>>,
335
    }
336

            
337
    pub type StreamOf<T> =
338
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
339

            
340
    pub type StreamConfigOf<T> =
341
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
342

            
343
    pub type ChangeRequestOf<T> =
344
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
345

            
346
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
347
    pub struct StreamPaymentStatus<Balance> {
348
        pub payment: Balance,
349
        pub deposit_left: Balance,
350
        /// Whenever the stream is stalled, which can occur either when no funds are left or
351
        /// if the time is past a mandatory request deadline.
352
        pub stalled: bool,
353
    }
354

            
355
    /// Store the next available stream id.
356
264
    #[pallet::storage]
357
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
358

            
359
    /// Store each stream indexed by an Id.
360
335
    #[pallet::storage]
361
    pub type Streams<T: Config> = StorageMap<
362
        Hasher = Blake2_128Concat,
363
        Key = T::StreamId,
364
        Value = StreamOf<T>,
365
        QueryKind = OptionQuery,
366
    >;
367

            
368
    /// Lookup for all streams with given source.
369
    /// To avoid maintaining a growing list of stream ids, they are stored in
370
    /// the form of an entry (AccountId, StreamId). If such entry exists then
371
    /// this AccountId is a source in StreamId. One can iterate over all storage
372
    /// keys starting with the AccountId to find all StreamIds.
373
80
    #[pallet::storage]
374
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
375
        Key1 = AccountIdOf<T>,
376
        Hasher1 = Blake2_128Concat,
377
        Key2 = T::StreamId,
378
        Hasher2 = Blake2_128Concat,
379
        Value = (),
380
        QueryKind = OptionQuery,
381
    >;
382

            
383
    /// Lookup for all streams with given target.
384
    /// To avoid maintaining a growing list of stream ids, they are stored in
385
    /// the form of an entry (AccountId, StreamId). If such entry exists then
386
    /// this AccountId is a target in StreamId. One can iterate over all storage
387
    /// keys starting with the AccountId to find all StreamIds.
388
80
    #[pallet::storage]
389
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
390
        Key1 = AccountIdOf<T>,
391
        Hasher1 = Blake2_128Concat,
392
        Key2 = T::StreamId,
393
        Hasher2 = Blake2_128Concat,
394
        Value = (),
395
        QueryKind = OptionQuery,
396
    >;
397

            
398
124
    #[pallet::error]
399
    #[derive(Clone, PartialEq, Eq)]
400
    pub enum Error<T> {
401
        UnknownStreamId,
402
        StreamIdOverflow,
403
        UnauthorizedOrigin,
404
        CantBeBothSourceAndTarget,
405
        CantFetchCurrentTime,
406
        SourceCantDecreaseRate,
407
        TargetCantIncreaseRate,
408
        CantOverrideMandatoryChange,
409
        NoPendingRequest,
410
        CantAcceptOwnRequest,
411
        CanOnlyCancelOwnRequest,
412
        WrongRequestNonce,
413
        ChangingAssetRequiresAbsoluteDepositChange,
414
        TargetCantChangeDeposit,
415
        ImmediateDepositChangeRequiresSameAssetId,
416
        DeadlineCantBeInPast,
417
        CantFetchStatusBeforeLastTimeUpdated,
418
    }
419

            
420
    #[pallet::event]
421
167
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
422
    pub enum Event<T: Config> {
423
43
        StreamOpened {
424
            stream_id: T::StreamId,
425
        },
426
4
        StreamClosed {
427
            stream_id: T::StreamId,
428
            refunded: T::Balance,
429
        },
430
9
        StreamPayment {
431
            stream_id: T::StreamId,
432
            source: AccountIdOf<T>,
433
            target: AccountIdOf<T>,
434
            amount: T::Balance,
435
            stalled: bool,
436
        },
437
12
        StreamConfigChangeRequested {
438
            stream_id: T::StreamId,
439
            request_nonce: RequestNonce,
440
            requester: Party,
441
            old_config: StreamConfigOf<T>,
442
            new_config: StreamConfigOf<T>,
443
        },
444
5
        StreamConfigChanged {
445
            stream_id: T::StreamId,
446
            old_config: StreamConfigOf<T>,
447
            new_config: StreamConfigOf<T>,
448
            deposit_change: Option<DepositChange<T::Balance>>,
449
        },
450
    }
451

            
452
    /// Freeze reason to use if needed.
453
    #[pallet::composite_enum]
454
    pub enum FreezeReason {
455
        StreamPayment,
456
    }
457

            
458
    /// Hold reason to use if needed.
459
    #[pallet::composite_enum]
460
    pub enum HoldReason {
461
167
        StreamPayment,
462
379
        StreamOpened,
463
    }
464

            
465
280
    #[pallet::call]
466
    impl<T: Config> Pallet<T> {
467
        /// Create a payment stream from the origin to the target with provided config
468
        /// and initial deposit (in the asset defined in the config).
469
        #[pallet::call_index(0)]
470
        #[pallet::weight(T::WeightInfo::open_stream())]
471
        pub fn open_stream(
472
            origin: OriginFor<T>,
473
            target: AccountIdOf<T>,
474
            config: StreamConfigOf<T>,
475
            initial_deposit: T::Balance,
476
67
        ) -> DispatchResultWithPostInfo {
477
67
            let origin = ensure_signed(origin)?;
478
67
            let opening_deposit = T::OpenStreamHoldAmount::get();
479

            
480
67
            let _stream_id = Self::open_stream_returns_id(
481
67
                origin,
482
67
                target,
483
67
                config,
484
67
                initial_deposit,
485
67
                opening_deposit,
486
67
            )?;
487

            
488
62
            Ok(().into())
489
        }
490

            
491
        /// Close a given stream in which the origin is involved. It performs the pending payment
492
        /// before closing the stream.
493
        #[pallet::call_index(1)]
494
        #[pallet::weight(T::WeightInfo::close_stream())]
495
        pub fn close_stream(
496
            origin: OriginFor<T>,
497
            stream_id: T::StreamId,
498
15
        ) -> DispatchResultWithPostInfo {
499
15
            let origin = ensure_signed(origin)?;
500
15
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
501

            
502
            // Only source or target can close a stream.
503
14
            ensure!(
504
14
                origin == stream.source || origin == stream.target,
505
1
                Error::<T>::UnauthorizedOrigin
506
            );
507

            
508
            // Update stream before closing it to ensure fair payment.
509
13
            Self::perform_stream_payment(stream_id, &mut stream)?;
510

            
511
            // Unfreeze funds left in the stream.
512
13
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
513

            
514
            // Release opening deposit
515
13
            if stream.opening_deposit > 0u32.into() {
516
13
                T::Currency::release(
517
13
                    &HoldReason::StreamOpened.into(),
518
13
                    &stream.source,
519
13
                    stream.opening_deposit,
520
13
                    Precision::Exact,
521
13
                )?;
522
            }
523

            
524
            // Remove stream from storage.
525
13
            Streams::<T>::remove(stream_id);
526
13
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
527
13
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
528
13

            
529
13
            // Emit event.
530
13
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
531
13
                stream_id,
532
13
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
533
13
            });
534
13

            
535
13
            Ok(().into())
536
        }
537

            
538
        /// Perform the pending payment of a stream. Anyone can call this.
539
        #[pallet::call_index(2)]
540
        #[pallet::weight(T::WeightInfo::perform_payment())]
541
        pub fn perform_payment(
542
            origin: OriginFor<T>,
543
            stream_id: T::StreamId,
544
21
        ) -> DispatchResultWithPostInfo {
545
21
            // No problem with anyone updating any stream.
546
21
            let _ = ensure_signed(origin)?;
547

            
548
21
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
549
20
            Self::perform_stream_payment(stream_id, &mut stream)?;
550
20
            Streams::<T>::insert(stream_id, stream);
551
20

            
552
20
            Ok(().into())
553
        }
554

            
555
        /// Requests a change to a stream config or deposit.
556
        ///
557
        /// If the new config don't change the time unit and asset id, the change will be applied
558
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
559
        /// in the stream and will have to be approved by the other party.
560
        ///
561
        /// This call accepts a deposit change, which can only be provided by the source of the
562
        /// stream. An absolute change is required when changing asset id, as the current deposit
563
        /// will be released and a new deposit is required in the new asset.
564
        #[pallet::call_index(3)]
565
        #[pallet::weight(
566
            T::WeightInfo::request_change_immediate()
567
            .max(T::WeightInfo::request_change_delayed())
568
        )]
569
        pub fn request_change(
570
            origin: OriginFor<T>,
571
            stream_id: T::StreamId,
572
            kind: ChangeKind<T::Balance>,
573
            new_config: StreamConfigOf<T>,
574
            deposit_change: Option<DepositChange<T::Balance>>,
575
53
        ) -> DispatchResultWithPostInfo {
576
53
            let origin = ensure_signed(origin)?;
577
53
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
578

            
579
52
            let requester = stream
580
52
                .account_to_party(origin)
581
52
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
582

            
583
51
            ensure!(
584
51
                requester == Party::Source || deposit_change.is_none(),
585
1
                Error::<T>::TargetCantChangeDeposit
586
            );
587

            
588
50
            if stream.config == new_config && deposit_change.is_none() {
589
1
                return Ok(().into());
590
49
            }
591

            
592
49
            if let ChangeKind::Mandatory { deadline } = kind {
593
14
                let now = T::TimeProvider::now(&stream.config.time_unit)
594
14
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
595

            
596
14
                ensure!(deadline >= now, Error::<T>::DeadlineCantBeInPast);
597
35
            }
598

            
599
            // If asset id and time unit are the same, we allow to make the change
600
            // immediatly if the origin is at a disadvantage.
601
            // We allow this even if there is already a pending request.
602
48
            if Self::maybe_immediate_change(
603
48
                stream_id,
604
48
                &mut stream,
605
48
                &new_config,
606
48
                deposit_change,
607
48
                requester,
608
48
            )? {
609
5
                return Ok(().into());
610
41
            }
611
41

            
612
41
            // If the source is requesting a change of asset, they must provide an absolute change.
613
41
            if requester == Party::Source
614
31
                && new_config.asset_id != stream.config.asset_id
615
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
616
            {
617
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
618
38
            }
619

            
620
            // If there is already a mandatory change request, only the origin
621
            // of this request can change it.
622
            if let Some(ChangeRequest {
623
                kind: ChangeKind::Mandatory { .. },
624
3
                requester: pending_requester,
625
                ..
626
5
            }) = &stream.pending_request
627
            {
628
3
                ensure!(
629
3
                    &requester == pending_requester,
630
2
                    Error::<T>::CantOverrideMandatoryChange
631
                );
632
35
            }
633

            
634
36
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
635
36
            stream.pending_request = Some(ChangeRequest {
636
36
                requester,
637
36
                kind,
638
36
                new_config: new_config.clone(),
639
36
                deposit_change,
640
36
            });
641
36

            
642
36
            // Emit event.
643
36
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
644
36
                stream_id,
645
36
                request_nonce: stream.request_nonce,
646
36
                requester,
647
36
                old_config: stream.config.clone(),
648
36
                new_config,
649
36
            });
650
36

            
651
36
            // Update storage.
652
36
            Streams::<T>::insert(stream_id, stream);
653
36

            
654
36
            Ok(().into())
655
        }
656

            
657
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
658
        /// frontrunning attacks. If the target made a request, the source is able to change their
659
        /// deposit.
660
        #[pallet::call_index(4)]
661
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
662
        pub fn accept_requested_change(
663
            origin: OriginFor<T>,
664
            stream_id: T::StreamId,
665
            request_nonce: RequestNonce,
666
            deposit_change: Option<DepositChange<T::Balance>>,
667
22
        ) -> DispatchResultWithPostInfo {
668
22
            let origin = ensure_signed(origin)?;
669
22
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
670

            
671
21
            let accepter = stream
672
21
                .account_to_party(origin)
673
21
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
674

            
675
20
            let Some(request) = stream.pending_request.clone() else {
676
                return Err(Error::<T>::NoPendingRequest.into());
677
            };
678

            
679
20
            ensure!(
680
20
                request_nonce == stream.request_nonce,
681
1
                Error::<T>::WrongRequestNonce
682
            );
683
19
            ensure!(
684
19
                accepter != request.requester,
685
1
                Error::<T>::CantAcceptOwnRequest
686
            );
687

            
688
18
            ensure!(
689
18
                accepter == Party::Source || deposit_change.is_none(),
690
1
                Error::<T>::TargetCantChangeDeposit
691
            );
692

            
693
            // Perform pending payment before changing config.
694
17
            Self::perform_stream_payment(stream_id, &mut stream)?;
695

            
696
            // Apply change.
697
17
            let deposit_change = deposit_change.or(request.deposit_change);
698
17
            match (
699
17
                stream.config.asset_id == request.new_config.asset_id,
700
17
                deposit_change,
701
            ) {
702
                // Same asset and a change, we apply it like in `change_deposit` call.
703
9
                (true, Some(change)) => {
704
9
                    Self::apply_deposit_change(&mut stream, change)?;
705
                }
706
                // Same asset and no change, no problem.
707
4
                (true, None) => (),
708
                // Change in asset with absolute new amount
709
1
                (false, Some(DepositChange::Absolute(amount))) => {
710
1
                    // Release deposit in old asset.
711
1
                    T::Assets::decrease_deposit(
712
1
                        &stream.config.asset_id,
713
1
                        &stream.source,
714
1
                        stream.deposit,
715
1
                    )?;
716

            
717
                    // Make deposit in new asset.
718
1
                    T::Assets::increase_deposit(
719
1
                        &request.new_config.asset_id,
720
1
                        &stream.source,
721
1
                        amount,
722
1
                    )?;
723
1
                    stream.deposit = amount;
724
                }
725
                // It doesn't make sense to change asset while not providing an absolute new
726
                // amount.
727
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
728
            }
729

            
730
            // If time unit changes we need to update `last_time_updated` to be in the
731
            // new unit.
732
14
            if stream.config.time_unit != request.new_config.time_unit {
733
2
                stream.last_time_updated = T::TimeProvider::now(&request.new_config.time_unit)
734
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
735
12
            }
736

            
737
            // Event
738
14
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
739
14
                stream_id,
740
14
                old_config: stream.config,
741
14
                new_config: request.new_config.clone(),
742
14
                deposit_change,
743
14
            });
744
14

            
745
14
            // Update config in storage.
746
14
            stream.config = request.new_config;
747
14
            stream.pending_request = None;
748
14
            Streams::<T>::insert(stream_id, stream);
749
14

            
750
14
            Ok(().into())
751
        }
752

            
753
        #[pallet::call_index(5)]
754
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
755
        pub fn cancel_change_request(
756
            origin: OriginFor<T>,
757
            stream_id: T::StreamId,
758
5
        ) -> DispatchResultWithPostInfo {
759
5
            let origin = ensure_signed(origin)?;
760
5
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
761

            
762
4
            let accepter = stream
763
4
                .account_to_party(origin)
764
4
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
765

            
766
3
            let Some(request) = stream.pending_request.take() else {
767
1
                return Err(Error::<T>::NoPendingRequest.into());
768
            };
769

            
770
2
            ensure!(
771
2
                accepter == request.requester,
772
2
                Error::<T>::CanOnlyCancelOwnRequest
773
            );
774

            
775
            // Update storage.
776
            // Pending request is removed by calling `.take()`.
777
            Streams::<T>::insert(stream_id, stream);
778

            
779
            Ok(().into())
780
        }
781

            
782
        /// Allows immediately changing the deposit for a stream, which is simpler than
783
        /// calling `request_change` with the proper parameters.
784
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
785
        /// the call is included in a block, in which case the unit is no longer the same and quantities
786
        /// will not have the same scale/value.
787
        #[pallet::call_index(6)]
788
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
789
        pub fn immediately_change_deposit(
790
            origin: OriginFor<T>,
791
            stream_id: T::StreamId,
792
            asset_id: T::AssetId,
793
            change: DepositChange<T::Balance>,
794
7
        ) -> DispatchResultWithPostInfo {
795
7
            let origin = ensure_signed(origin)?;
796
7
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
797

            
798
6
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
799
4
            ensure!(
800
4
                stream.config.asset_id == asset_id,
801
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
802
            );
803

            
804
            // Perform pending payment before changing deposit.
805
4
            Self::perform_stream_payment(stream_id, &mut stream)?;
806

            
807
            // Apply change.
808
4
            Self::apply_deposit_change(&mut stream, change)?;
809

            
810
            // Event
811
1
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
812
1
                stream_id,
813
1
                old_config: stream.config.clone(),
814
1
                new_config: stream.config.clone(),
815
1
                deposit_change: Some(change),
816
1
            });
817
1

            
818
1
            // Update stream in storage.
819
1
            Streams::<T>::insert(stream_id, stream);
820
1

            
821
1
            Ok(().into())
822
        }
823
    }
824

            
825
    impl<T: Config> Pallet<T> {
826
        /// Try to open a stream and returns its id.
827
        /// Prefers calling this function from other pallets instead of `open_stream` as the
828
        /// latter can't return the id.
829
67
        pub fn open_stream_returns_id(
830
67
            origin: AccountIdOf<T>,
831
67
            target: AccountIdOf<T>,
832
67
            config: StreamConfigOf<T>,
833
67
            initial_deposit: T::Balance,
834
67
            opening_deposit: T::Balance,
835
67
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
836
67
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
837

            
838
            // Generate a new stream id.
839
66
            let stream_id = NextStreamId::<T>::get();
840
66
            let next_stream_id = stream_id
841
66
                .checked_add(&One::one())
842
66
                .ok_or(Error::<T>::StreamIdOverflow)?;
843
65
            NextStreamId::<T>::set(next_stream_id);
844
65

            
845
65
            // Hold opening deposit for the storage used by Stream
846
65
            if opening_deposit > 0u32.into() {
847
65
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
848
            }
849

            
850
            // Freeze initial deposit.
851
65
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
852

            
853
            // Create stream data.
854
62
            let now =
855
63
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
856
62
            let stream = Stream {
857
62
                source: origin.clone(),
858
62
                target: target.clone(),
859
62
                config,
860
62
                deposit: initial_deposit,
861
62
                last_time_updated: now,
862
62
                request_nonce: 0,
863
62
                pending_request: None,
864
62
                opening_deposit,
865
62
            };
866
62

            
867
62
            // Insert stream in storage.
868
62
            Streams::<T>::insert(stream_id, stream);
869
62
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
870
62
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
871
62

            
872
62
            // Emit event.
873
62
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
874
62

            
875
62
            Ok(stream_id)
876
67
        }
877

            
878
        /// Get the stream payment current status, telling how much payment is
879
        /// pending, how much deposit will be left and whenever the stream is stalled.
880
        /// The stream is considered stalled if no funds are left or if the provided
881
        /// time is past a mandatory request deadline. If the provided `now` is `None`
882
        /// then the current time will be fetched. Being able to provide a custom `now`
883
        /// allows to check the status in the future. It is invalid to provide a `now` that is
884
        /// before `last_time_updated`.
885
36
        pub fn stream_payment_status(
886
36
            stream_id: T::StreamId,
887
36
            now: Option<T::Balance>,
888
36
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
889
36
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
890
28
            let now = match now {
891
                Some(v) => v,
892
28
                None => T::TimeProvider::now(&stream.config.time_unit)
893
28
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
894
            };
895

            
896
28
            let last_time_updated = stream.last_time_updated;
897
28

            
898
28
            ensure!(
899
28
                now >= last_time_updated,
900
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
901
            );
902

            
903
28
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
904
36
        }
905

            
906
89
        fn stream_payment_status_by_ref(
907
89
            stream: &StreamOf<T>,
908
89
            last_time_updated: T::Balance,
909
89
            mut now: T::Balance,
910
89
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
911
89
            let mut stalled_by_deadline = false;
912

            
913
            // Take into account mandatory change request deadline. Note that
914
            // while it'll perform payment up to deadline,
915
            // `stream.last_time_updated` is still the "real now" to avoid
916
            // retroactive payment in case the deadline changes.
917
            if let Some(ChangeRequest {
918
24
                kind: ChangeKind::Mandatory { deadline },
919
                ..
920
36
            }) = &stream.pending_request
921
            {
922
24
                now = min(now, *deadline);
923
24

            
924
24
                if now == *deadline {
925
11
                    stalled_by_deadline = true;
926
15
                }
927
65
            }
928

            
929
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
930
89
            if stream.deposit.is_zero() {
931
                return Ok(StreamPaymentStatus {
932
                    payment: 0u32.into(),
933
                    deposit_left: stream.deposit,
934
                    stalled: true,
935
                });
936
89
            }
937

            
938
            // Dont perform payment if now is before or equal to `last_time_updated`.
939
            // It can be before due to the deadline adjustment.
940
89
            let Some(delta) = now.checked_sub(&last_time_updated) else {
941
2
                return Ok(StreamPaymentStatus {
942
2
                    payment: 0u32.into(),
943
2
                    deposit_left: stream.deposit,
944
2
                    stalled: true,
945
2
                });
946
            };
947

            
948
            // We compute the amount due to the target according to the rate, which may be
949
            // lowered if the stream deposit is lower.
950
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
951
            // considering it an error can make a stream un-updatable if too much time has passed
952
            // without updates.
953
87
            let mut payment = delta.saturating_mul(stream.config.rate);
954

            
955
            // We compute the new amount of locked funds. If it underflows it
956
            // means that there is more to pay that what is left, in which case
957
            // we pay all that is left.
958
87
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
959
85
                Some(v) if v.is_zero() => (v, true),
960
84
                Some(v) => (v, stalled_by_deadline),
961
                None => {
962
2
                    payment = stream.deposit;
963
2
                    (Zero::zero(), true)
964
                }
965
            };
966

            
967
87
            Ok(StreamPaymentStatus {
968
87
                payment,
969
87
                deposit_left,
970
87
                stalled,
971
87
            })
972
89
        }
973

            
974
        /// Behavior:
975
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
976
        /// last time the stream was updated. When updating the stream, **at most**
977
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
978
        /// account. If this amount is greater than the left deposit, the stream is considered
979
        /// drained **but not closed**. The source can come back later and refill the stream,
980
        /// however there will be no retroactive payment for the time spent as drained.
981
        /// If the stream payment is used to rent a service, the target should pause the service
982
        /// while the stream is drained, and resume it once it is refilled.
983
61
        fn perform_stream_payment(
984
61
            stream_id: T::StreamId,
985
61
            stream: &mut StreamOf<T>,
986
61
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
987
61
            let now = T::TimeProvider::now(&stream.config.time_unit)
988
61
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
989

            
990
            // We want to update `stream.last_time_updated` to `now` as soon
991
            // as possible to avoid forgetting to do it. We copy the old value
992
            // for payment computation.
993
61
            let last_time_updated = stream.last_time_updated;
994
61
            stream.last_time_updated = now;
995

            
996
            let StreamPaymentStatus {
997
61
                payment,
998
61
                deposit_left,
999
61
                stalled,
61
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
61
            if payment.is_zero() {
25
                return Ok(0u32.into());
36
            }
36

            
36
            // Transfer from the source to target.
36
            T::Assets::transfer_deposit(
36
                &stream.config.asset_id,
36
                &stream.source,
36
                &stream.target,
36
                payment,
36
            )?;
            // Update stream info.
36
            stream.deposit = deposit_left;
36

            
36
            // Emit event.
36
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
36
                stream_id,
36
                source: stream.source.clone(),
36
                target: stream.target.clone(),
36
                amount: payment,
36
                stalled,
36
            });
36

            
36
            Ok(payment)
61
        }
18
        fn apply_deposit_change(
18
            stream: &mut StreamOf<T>,
18
            change: DepositChange<T::Balance>,
18
        ) -> DispatchResultWithPostInfo {
18
            match change {
3
                DepositChange::Absolute(amount) => {
3
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::Assets::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
2
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
2
                        T::Assets::decrease_deposit(
2
                            &stream.config.asset_id,
2
                            &stream.source,
2
                            decrease,
2
                        )?;
                    }
2
                    stream.deposit = amount;
                }
12
                DepositChange::Increase(increase) => {
12
                    stream.deposit = stream
12
                        .deposit
12
                        .checked_add(&increase)
12
                        .ok_or(ArithmeticError::Overflow)?;
10
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
                }
3
                DepositChange::Decrease(decrease) => {
3
                    stream.deposit = stream
3
                        .deposit
3
                        .checked_sub(&decrease)
3
                        .ok_or(ArithmeticError::Underflow)?;
1
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
                }
            }
13
            Ok(().into())
18
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
48
        fn maybe_immediate_change(
48
            stream_id: T::StreamId,
48
            stream: &mut StreamOf<T>,
48
            new_config: &StreamConfigOf<T>,
48
            deposit_change: Option<DepositChange<T::Balance>>,
48
            requester: Party,
48
        ) -> Result<bool, DispatchErrorWithPostInfo> {
48
            if new_config.time_unit != stream.config.time_unit
25
                || new_config.asset_id != stream.config.asset_id
            {
28
                return Ok(false);
20
            }
20

            
20
            if requester == Party::Source && new_config.rate < stream.config.rate {
10
                return Ok(false);
10
            }
10

            
10
            if requester == Party::Target && new_config.rate > stream.config.rate {
3
                return Ok(false);
7
            }
7

            
7
            // Perform pending payment before changing config.
7
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
7
            if let Some(change) = deposit_change {
5
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
5
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
5
                stream_id,
5
                old_config: stream.config.clone(),
5
                new_config: new_config.clone(),
5
                deposit_change,
5
            });
5

            
5
            // Update storage.
5
            stream.config = new_config.clone();
5
            Streams::<T>::insert(stream_id, stream);
5

            
5
            Ok(true)
48
        }
    }
}