1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>
16

            
17
#![doc = include_str!("../README.md")]
18
#![cfg_attr(not(feature = "std"), no_std)]
19

            
20
#[cfg(test)]
21
mod mock;
22

            
23
#[cfg(test)]
24
mod tests;
25

            
26
#[cfg(feature = "runtime-benchmarks")]
27
mod benchmarking;
28

            
29
pub mod weights;
30
pub use weights::WeightInfo;
31

            
32
use {
33
    core::cmp::min,
34
    frame_support::{
35
        dispatch::DispatchErrorWithPostInfo,
36
        pallet,
37
        pallet_prelude::*,
38
        storage::types::{StorageDoubleMap, StorageMap},
39
        traits::{
40
            fungible::{Inspect, MutateHold},
41
            tokens::{Balance, Precision},
42
        },
43
        Blake2_128Concat,
44
    },
45
    frame_system::pallet_prelude::*,
46
    parity_scale_codec::{FullCodec, MaxEncodedLen},
47
    scale_info::TypeInfo,
48
    serde::{Deserialize, Serialize},
49
    sp_runtime::{
50
        traits::{AtLeast32BitUnsigned, CheckedAdd, CheckedSub, One, Saturating, Zero},
51
        ArithmeticError,
52
    },
53
    sp_std::{fmt::Debug, marker::PhantomData},
54
};
55

            
56
pub use pallet::*;
57

            
58
/// Type able to provide the current time for given unit.
59
/// For each unit the returned number should monotonically increase and not
60
/// overflow.
61
pub trait TimeProvider<Unit, Number> {
62
    fn now(unit: &Unit) -> Option<Number>;
63

            
64
    /// Benchmarks: should return the time unit which has the worst performance calling
65
    /// `TimeProvider::now(unit)` with.
66
    #[cfg(feature = "runtime-benchmarks")]
67
    fn bench_worst_case_time_unit() -> Unit;
68

            
69
    /// Benchmarks: sets the "now" time for time unit returned by `bench_worst_case_time_unit`.
70
    #[cfg(feature = "runtime-benchmarks")]
71
    fn bench_set_now(instant: Number);
72
}
73

            
74
/// Interactions the pallet needs with assets.
75
pub trait Assets<AccountId, AssetId, Balance> {
76
    /// Transfer assets deposited by an account to another account.
77
    /// Those assets should not be considered deposited in the target account.
78
    fn transfer_deposit(
79
        asset_id: &AssetId,
80
        from: &AccountId,
81
        to: &AccountId,
82
        amount: Balance,
83
    ) -> DispatchResult;
84

            
85
    /// Increase the deposit for an account and asset id. Should fail if account doesn't have
86
    /// enough of that asset. Funds should be safe and not slashable.
87
    fn increase_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
88
        -> DispatchResult;
89

            
90
    /// Decrease the deposit for an account and asset id. Should fail on underflow.
91
    fn decrease_deposit(asset_id: &AssetId, account: &AccountId, amount: Balance)
92
        -> DispatchResult;
93

            
94
    /// Return the deposit for given asset and account.
95
    fn get_deposit(asset_id: &AssetId, account: &AccountId) -> Balance;
96

            
97
    /// Benchmarks: should return the asset id which has the worst performance when interacting
98
    /// with it.
99
    #[cfg(feature = "runtime-benchmarks")]
100
    fn bench_worst_case_asset_id() -> AssetId;
101

            
102
    /// Benchmarks: should return the another asset id which has the worst performance when interacting
103
    /// with it afther `bench_worst_case_asset_id`. This is to benchmark the worst case when changing config
104
    /// from one asset to another. If there is only one asset id it is fine to return it in both
105
    /// `bench_worst_case_asset_id` and `bench_worst_case_asset_id2`.
106
    #[cfg(feature = "runtime-benchmarks")]
107
    fn bench_worst_case_asset_id2() -> AssetId;
108

            
109
    /// Benchmarks: should set the balance.
110
    #[cfg(feature = "runtime-benchmarks")]
111
    fn bench_set_balance(asset_id: &AssetId, account: &AccountId, amount: Balance);
112
}
113

            
114
10104
#[pallet]
115
pub mod pallet {
116
    use super::*;
117

            
118
    /// Pooled Staking pallet.
119
41955
    #[pallet::pallet]
120
    pub struct Pallet<T>(PhantomData<T>);
121

            
122
    #[pallet::config]
123
    pub trait Config: frame_system::Config {
124
        /// Overarching event type
125
        type RuntimeEvent: From<Event<Self>> + IsType<<Self as frame_system::Config>::RuntimeEvent>;
126

            
127
        /// Type used to represent stream ids. Should be large enough to not overflow.
128
        type StreamId: AtLeast32BitUnsigned
129
            + Default
130
            + Debug
131
            + Copy
132
            + Clone
133
            + FullCodec
134
            + TypeInfo
135
            + MaxEncodedLen;
136

            
137
        /// The balance type, which is also the type representing time (as this
138
        /// pallet will do math with both time and balances to compute how
139
        /// much should be paid).
140
        type Balance: Balance;
141

            
142
        /// Type representing an asset id, a identifier allowing distinguishing assets.
143
        type AssetId: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + PartialEq + Eq;
144

            
145
        /// Provide interaction with assets.
146
        type Assets: Assets<Self::AccountId, Self::AssetId, Self::Balance>;
147

            
148
        /// Currency for the opening balance hold for the storage used by the Stream.
149
        /// NOT to be confused with Assets.
150
        type Currency: Inspect<Self::AccountId, Balance = Self::Balance>
151
            + MutateHold<Self::AccountId, Reason = Self::RuntimeHoldReason>;
152

            
153
        type RuntimeHoldReason: From<HoldReason>;
154

            
155
        #[pallet::constant]
156
        type OpenStreamHoldAmount: Get<Self::Balance>;
157

            
158
        /// Represents which units of time can be used. Designed to be an enum
159
        /// with a variant for each kind of time source/scale supported.
160
        type TimeUnit: Debug + Clone + FullCodec + TypeInfo + MaxEncodedLen + Eq;
161

            
162
        /// Provide the current time in given unit.
163
        type TimeProvider: TimeProvider<Self::TimeUnit, Self::Balance>;
164

            
165
        type WeightInfo: weights::WeightInfo;
166
    }
167

            
168
    type AccountIdOf<T> = <T as frame_system::Config>::AccountId;
169
    type AssetIdOf<T> = <T as Config>::AssetId;
170

            
171
    pub type RequestNonce = u32;
172

            
173
    /// A stream payment from source to target.
174
    /// Stores the last time the stream was updated, which allows to compute
175
    /// elapsed time and perform payment.
176
    #[derive(
177
        RuntimeDebug,
178
        PartialEq,
179
        Eq,
180
        Encode,
181
        Decode,
182
        Clone,
183
4656
        TypeInfo,
184
        Serialize,
185
        Deserialize,
186
        MaxEncodedLen,
187
    )]
188
    pub struct Stream<AccountId, Unit, AssetId, Balance> {
189
        /// Payer, source of the stream.
190
        pub source: AccountId,
191
        /// Payee, target of the stream.
192
        pub target: AccountId,
193
        /// Steam config (time unit, asset id, rate)
194
        pub config: StreamConfig<Unit, AssetId, Balance>,
195
        /// How much is deposited to fund this stream.
196
        pub deposit: Balance,
197
        /// Last time the stream was updated in `config.time_unit`.
198
        pub last_time_updated: Balance,
199
        /// Nonce for requests. This prevents a request to make a first request
200
        /// then change it to another request to frontrun the other party
201
        /// accepting.
202
        pub request_nonce: RequestNonce,
203
        /// A pending change request if any.
204
        pub pending_request: Option<ChangeRequest<Unit, AssetId, Balance>>,
205
        /// One-time opening deposit. Will be released on close.
206
        pub opening_deposit: Balance,
207
    }
208

            
209
    impl<AccountId: PartialEq, Unit, AssetId, Balance> Stream<AccountId, Unit, AssetId, Balance> {
210
85
        pub fn account_to_party(&self, account: AccountId) -> Option<Party> {
211
85
            match account {
212
85
                a if a == self.source => Some(Party::Source),
213
34
                a if a == self.target => Some(Party::Target),
214
3
                _ => None,
215
            }
216
85
        }
217
    }
218

            
219
    /// Stream configuration.
220
    #[derive(
221
        RuntimeDebug,
222
        PartialEq,
223
        Eq,
224
        Encode,
225
        Decode,
226
        Copy,
227
        Clone,
228
1746
        TypeInfo,
229
        Serialize,
230
        Deserialize,
231
        MaxEncodedLen,
232
    )]
233
    pub struct StreamConfig<Unit, AssetId, Balance> {
234
        /// Unit in which time is measured using a `TimeProvider`.
235
        pub time_unit: Unit,
236
        /// Asset used for payment.
237
        pub asset_id: AssetId,
238
        /// Amount of asset / unit.
239
        pub rate: Balance,
240
    }
241

            
242
    /// Origin of a change request.
243
    #[derive(
244
        RuntimeDebug,
245
        PartialEq,
246
        Eq,
247
        Encode,
248
        Decode,
249
        Copy,
250
        Clone,
251
1164
        TypeInfo,
252
        Serialize,
253
        Deserialize,
254
        MaxEncodedLen,
255
    )]
256
    pub enum Party {
257
52
        Source,
258
23
        Target,
259
    }
260

            
261
    impl Party {
262
4
        pub fn inverse(self) -> Self {
263
4
            match self {
264
2
                Party::Source => Party::Target,
265
2
                Party::Target => Party::Source,
266
            }
267
4
        }
268
    }
269

            
270
    /// Kind of change requested.
271
    #[derive(
272
        RuntimeDebug,
273
        PartialEq,
274
        Eq,
275
        Encode,
276
        Decode,
277
        Copy,
278
        Clone,
279
1164
        TypeInfo,
280
        Serialize,
281
        Deserialize,
282
        MaxEncodedLen,
283
    )]
284
    pub enum ChangeKind<Time> {
285
23
        /// The requested change is a suggestion, and the other party doesn't
286
        /// need to accept it.
287
        Suggestion,
288
64
        /// The requested change is mandatory, and the other party must either
289
        /// accept the change or close the stream. Reaching the deadline will
290
        /// close the stream too.
291
        Mandatory { deadline: Time },
292
    }
293

            
294
    /// Describe how the deposit should change.
295
    #[derive(
296
        RuntimeDebug,
297
        PartialEq,
298
        Eq,
299
        Encode,
300
        Decode,
301
        Copy,
302
        Clone,
303
2328
        TypeInfo,
304
        Serialize,
305
        Deserialize,
306
        MaxEncodedLen,
307
    )]
308
    pub enum DepositChange<Balance> {
309
55
        /// Increase deposit by given amount.
310
        Increase(Balance),
311
1
        /// Decrease deposit by given amount.
312
        Decrease(Balance),
313
1
        /// Set deposit to given amount.
314
        Absolute(Balance),
315
    }
316

            
317
    /// A request to change a stream config.
318
    #[derive(
319
        RuntimeDebug,
320
        PartialEq,
321
        Eq,
322
        Encode,
323
        Decode,
324
        Clone,
325
2328
        TypeInfo,
326
        Serialize,
327
        Deserialize,
328
        MaxEncodedLen,
329
    )]
330
    pub struct ChangeRequest<Unit, AssetId, Balance> {
331
        pub requester: Party,
332
        pub kind: ChangeKind<Balance>,
333
        pub new_config: StreamConfig<Unit, AssetId, Balance>,
334
        pub deposit_change: Option<DepositChange<Balance>>,
335
    }
336

            
337
    pub type StreamOf<T> =
338
        Stream<AccountIdOf<T>, <T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
339

            
340
    pub type StreamConfigOf<T> =
341
        StreamConfig<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
342

            
343
    pub type ChangeRequestOf<T> =
344
        ChangeRequest<<T as Config>::TimeUnit, AssetIdOf<T>, <T as Config>::Balance>;
345

            
346
    #[derive(Debug, Copy, Clone, PartialEq, Eq)]
347
    pub struct StreamPaymentStatus<Balance> {
348
        pub payment: Balance,
349
        pub deposit_left: Balance,
350
        /// Whenever the stream is stalled, which can occur either when no funds are left or
351
        /// if the time is past a mandatory request deadline.
352
        pub stalled: bool,
353
    }
354

            
355
    /// Store the next available stream id.
356
280
    #[pallet::storage]
357
    pub type NextStreamId<T: Config> = StorageValue<Value = T::StreamId, QueryKind = ValueQuery>;
358

            
359
    /// Store each stream indexed by an Id.
360
389
    #[pallet::storage]
361
    pub type Streams<T: Config> = StorageMap<
362
        Hasher = Blake2_128Concat,
363
        Key = T::StreamId,
364
        Value = StreamOf<T>,
365
        QueryKind = OptionQuery,
366
    >;
367

            
368
    /// Lookup for all streams with given source.
369
    /// To avoid maintaining a growing list of stream ids, they are stored in
370
    /// the form of an entry (AccountId, StreamId). If such entry exists then
371
    /// this AccountId is a source in StreamId. One can iterate over all storage
372
    /// keys starting with the AccountId to find all StreamIds.
373
88
    #[pallet::storage]
374
    pub type LookupStreamsWithSource<T: Config> = StorageDoubleMap<
375
        Key1 = AccountIdOf<T>,
376
        Hasher1 = Blake2_128Concat,
377
        Key2 = T::StreamId,
378
        Hasher2 = Blake2_128Concat,
379
        Value = (),
380
        QueryKind = OptionQuery,
381
    >;
382

            
383
    /// Lookup for all streams with given target.
384
    /// To avoid maintaining a growing list of stream ids, they are stored in
385
    /// the form of an entry (AccountId, StreamId). If such entry exists then
386
    /// this AccountId is a target in StreamId. One can iterate over all storage
387
    /// keys starting with the AccountId to find all StreamIds.
388
88
    #[pallet::storage]
389
    pub type LookupStreamsWithTarget<T: Config> = StorageDoubleMap<
390
        Key1 = AccountIdOf<T>,
391
        Hasher1 = Blake2_128Concat,
392
        Key2 = T::StreamId,
393
        Hasher2 = Blake2_128Concat,
394
        Value = (),
395
        QueryKind = OptionQuery,
396
    >;
397

            
398
124
    #[pallet::error]
399
    #[derive(Clone, PartialEq, Eq)]
400
    pub enum Error<T> {
401
        UnknownStreamId,
402
        StreamIdOverflow,
403
        UnauthorizedOrigin,
404
        CantBeBothSourceAndTarget,
405
        CantFetchCurrentTime,
406
        SourceCantDecreaseRate,
407
        TargetCantIncreaseRate,
408
        CantOverrideMandatoryChange,
409
        NoPendingRequest,
410
        CantAcceptOwnRequest,
411
        CanOnlyCancelOwnRequest,
412
        WrongRequestNonce,
413
        ChangingAssetRequiresAbsoluteDepositChange,
414
        TargetCantChangeDeposit,
415
        ImmediateDepositChangeRequiresSameAssetId,
416
        DeadlineCantBeInPast,
417
        CantFetchStatusBeforeLastTimeUpdated,
418
    }
419

            
420
    #[pallet::event]
421
195
    #[pallet::generate_deposit(pub(super) fn deposit_event)]
422
    pub enum Event<T: Config> {
423
43
        StreamOpened {
424
            stream_id: T::StreamId,
425
        },
426
4
        StreamClosed {
427
            stream_id: T::StreamId,
428
            refunded: T::Balance,
429
        },
430
9
        StreamPayment {
431
            stream_id: T::StreamId,
432
            source: AccountIdOf<T>,
433
            target: AccountIdOf<T>,
434
            amount: T::Balance,
435
            stalled: bool,
436
        },
437
12
        StreamConfigChangeRequested {
438
            stream_id: T::StreamId,
439
            request_nonce: RequestNonce,
440
            requester: Party,
441
            old_config: StreamConfigOf<T>,
442
            new_config: StreamConfigOf<T>,
443
        },
444
5
        StreamConfigChanged {
445
            stream_id: T::StreamId,
446
            old_config: StreamConfigOf<T>,
447
            new_config: StreamConfigOf<T>,
448
            deposit_change: Option<DepositChange<T::Balance>>,
449
        },
450
    }
451

            
452
    /// Freeze reason to use if needed.
453
    #[pallet::composite_enum]
454
    pub enum FreezeReason {
455
        StreamPayment,
456
    }
457

            
458
    /// Hold reason to use if needed.
459
    #[pallet::composite_enum]
460
    pub enum HoldReason {
461
211
        StreamPayment,
462
443
        StreamOpened,
463
    }
464

            
465
420
    #[pallet::call]
466
    impl<T: Config> Pallet<T> {
467
        /// Create a payment stream from the origin to the target with provided config
468
        /// and initial deposit (in the asset defined in the config).
469
        #[pallet::call_index(0)]
470
        #[pallet::weight(T::WeightInfo::open_stream())]
471
        pub fn open_stream(
472
            origin: OriginFor<T>,
473
            target: AccountIdOf<T>,
474
            config: StreamConfigOf<T>,
475
            initial_deposit: T::Balance,
476
71
        ) -> DispatchResultWithPostInfo {
477
71
            let origin = ensure_signed(origin)?;
478

            
479
71
            let _stream_id = Self::open_stream_returns_id(origin, target, config, initial_deposit)?;
480

            
481
66
            Ok(().into())
482
        }
483

            
484
        /// Close a given stream in which the origin is involved. It performs the pending payment
485
        /// before closing the stream.
486
        #[pallet::call_index(1)]
487
        #[pallet::weight(T::WeightInfo::close_stream())]
488
        pub fn close_stream(
489
            origin: OriginFor<T>,
490
            stream_id: T::StreamId,
491
19
        ) -> DispatchResultWithPostInfo {
492
19
            let origin = ensure_signed(origin)?;
493
19
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
494

            
495
            // Only source or target can close a stream.
496
18
            ensure!(
497
18
                origin == stream.source || origin == stream.target,
498
1
                Error::<T>::UnauthorizedOrigin
499
            );
500

            
501
            // Update stream before closing it to ensure fair payment.
502
17
            Self::perform_stream_payment(stream_id, &mut stream)?;
503

            
504
            // Unfreeze funds left in the stream.
505
17
            T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, stream.deposit)?;
506

            
507
            // Release opening deposit
508
17
            if stream.opening_deposit > 0u32.into() {
509
17
                T::Currency::release(
510
17
                    &HoldReason::StreamOpened.into(),
511
17
                    &stream.source,
512
17
                    stream.opening_deposit,
513
17
                    Precision::Exact,
514
17
                )?;
515
            }
516

            
517
            // Remove stream from storage.
518
17
            Streams::<T>::remove(stream_id);
519
17
            LookupStreamsWithSource::<T>::remove(stream.source, stream_id);
520
17
            LookupStreamsWithTarget::<T>::remove(stream.target, stream_id);
521
17

            
522
17
            // Emit event.
523
17
            Pallet::<T>::deposit_event(Event::<T>::StreamClosed {
524
17
                stream_id,
525
17
                refunded: stream.deposit.saturating_add(stream.opening_deposit),
526
17
            });
527
17

            
528
17
            Ok(().into())
529
        }
530

            
531
        /// Perform the pending payment of a stream. Anyone can call this.
532
        #[pallet::call_index(2)]
533
        #[pallet::weight(T::WeightInfo::perform_payment())]
534
        pub fn perform_payment(
535
            origin: OriginFor<T>,
536
            stream_id: T::StreamId,
537
25
        ) -> DispatchResultWithPostInfo {
538
25
            // No problem with anyone updating any stream.
539
25
            let _ = ensure_signed(origin)?;
540

            
541
25
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
542
24
            Self::perform_stream_payment(stream_id, &mut stream)?;
543
24
            Streams::<T>::insert(stream_id, stream);
544
24

            
545
24
            Ok(().into())
546
        }
547

            
548
        /// Requests a change to a stream config or deposit.
549
        ///
550
        /// If the new config don't change the time unit and asset id, the change will be applied
551
        /// immediately if it is at the desadvantage of the caller. Otherwise, the request is stored
552
        /// in the stream and will have to be approved by the other party.
553
        ///
554
        /// This call accepts a deposit change, which can only be provided by the source of the
555
        /// stream. An absolute change is required when changing asset id, as the current deposit
556
        /// will be released and a new deposit is required in the new asset.
557
        #[pallet::call_index(3)]
558
        #[pallet::weight(
559
            T::WeightInfo::request_change_immediate()
560
            .max(T::WeightInfo::request_change_delayed())
561
        )]
562
        pub fn request_change(
563
            origin: OriginFor<T>,
564
            stream_id: T::StreamId,
565
            kind: ChangeKind<T::Balance>,
566
            new_config: StreamConfigOf<T>,
567
            deposit_change: Option<DepositChange<T::Balance>>,
568
57
        ) -> DispatchResultWithPostInfo {
569
57
            let origin = ensure_signed(origin)?;
570
57
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
571

            
572
56
            let requester = stream
573
56
                .account_to_party(origin)
574
56
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
575

            
576
55
            ensure!(
577
55
                requester == Party::Source || deposit_change.is_none(),
578
1
                Error::<T>::TargetCantChangeDeposit
579
            );
580

            
581
54
            if stream.config == new_config && deposit_change.is_none() {
582
1
                return Ok(().into());
583
53
            }
584

            
585
53
            if let ChangeKind::Mandatory { deadline } = kind {
586
18
                let now = T::TimeProvider::now(&stream.config.time_unit)
587
18
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
588

            
589
18
                ensure!(deadline >= now, Error::<T>::DeadlineCantBeInPast);
590
35
            }
591

            
592
            // If asset id and time unit are the same, we allow to make the change
593
            // immediatly if the origin is at a disadvantage.
594
            // We allow this even if there is already a pending request.
595
52
            if Self::maybe_immediate_change(
596
52
                stream_id,
597
52
                &mut stream,
598
52
                &new_config,
599
52
                deposit_change,
600
52
                requester,
601
52
            )? {
602
5
                return Ok(().into());
603
45
            }
604
45

            
605
45
            // If the source is requesting a change of asset, they must provide an absolute change.
606
45
            if requester == Party::Source
607
35
                && new_config.asset_id != stream.config.asset_id
608
3
                && !matches!(deposit_change, Some(DepositChange::Absolute(_)))
609
            {
610
3
                Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?;
611
42
            }
612

            
613
            // If there is already a mandatory change request, only the origin
614
            // of this request can change it.
615
            if let Some(ChangeRequest {
616
                kind: ChangeKind::Mandatory { .. },
617
3
                requester: pending_requester,
618
                ..
619
5
            }) = &stream.pending_request
620
            {
621
3
                ensure!(
622
3
                    &requester == pending_requester,
623
2
                    Error::<T>::CantOverrideMandatoryChange
624
                );
625
39
            }
626

            
627
40
            stream.request_nonce = stream.request_nonce.wrapping_add(1);
628
40
            stream.pending_request = Some(ChangeRequest {
629
40
                requester,
630
40
                kind,
631
40
                new_config: new_config.clone(),
632
40
                deposit_change,
633
40
            });
634
40

            
635
40
            // Emit event.
636
40
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChangeRequested {
637
40
                stream_id,
638
40
                request_nonce: stream.request_nonce,
639
40
                requester,
640
40
                old_config: stream.config.clone(),
641
40
                new_config,
642
40
            });
643
40

            
644
40
            // Update storage.
645
40
            Streams::<T>::insert(stream_id, stream);
646
40

            
647
40
            Ok(().into())
648
        }
649

            
650
        /// Accepts a change requested before by the other party. Takes a nonce to prevent
651
        /// frontrunning attacks. If the target made a request, the source is able to change their
652
        /// deposit.
653
        #[pallet::call_index(4)]
654
        #[pallet::weight(T::WeightInfo::accept_requested_change())]
655
        pub fn accept_requested_change(
656
            origin: OriginFor<T>,
657
            stream_id: T::StreamId,
658
            request_nonce: RequestNonce,
659
            deposit_change: Option<DepositChange<T::Balance>>,
660
26
        ) -> DispatchResultWithPostInfo {
661
26
            let origin = ensure_signed(origin)?;
662
26
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
663

            
664
25
            let accepter = stream
665
25
                .account_to_party(origin)
666
25
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
667

            
668
24
            let Some(request) = stream.pending_request.clone() else {
669
                return Err(Error::<T>::NoPendingRequest.into());
670
            };
671

            
672
24
            ensure!(
673
24
                request_nonce == stream.request_nonce,
674
1
                Error::<T>::WrongRequestNonce
675
            );
676
23
            ensure!(
677
23
                accepter != request.requester,
678
1
                Error::<T>::CantAcceptOwnRequest
679
            );
680

            
681
22
            ensure!(
682
22
                accepter == Party::Source || deposit_change.is_none(),
683
1
                Error::<T>::TargetCantChangeDeposit
684
            );
685

            
686
            // Perform pending payment before changing config.
687
21
            Self::perform_stream_payment(stream_id, &mut stream)?;
688

            
689
            // Apply change.
690
21
            let deposit_change = deposit_change.or(request.deposit_change);
691
21
            match (
692
21
                stream.config.asset_id == request.new_config.asset_id,
693
21
                deposit_change,
694
            ) {
695
                // Same asset and a change, we apply it like in `change_deposit` call.
696
13
                (true, Some(change)) => {
697
13
                    Self::apply_deposit_change(&mut stream, change)?;
698
                }
699
                // Same asset and no change, no problem.
700
4
                (true, None) => (),
701
                // Change in asset with absolute new amount
702
1
                (false, Some(DepositChange::Absolute(amount))) => {
703
1
                    // Release deposit in old asset.
704
1
                    T::Assets::decrease_deposit(
705
1
                        &stream.config.asset_id,
706
1
                        &stream.source,
707
1
                        stream.deposit,
708
1
                    )?;
709

            
710
                    // Make deposit in new asset.
711
1
                    T::Assets::increase_deposit(
712
1
                        &request.new_config.asset_id,
713
1
                        &stream.source,
714
1
                        amount,
715
1
                    )?;
716
1
                    stream.deposit = amount;
717
                }
718
                // It doesn't make sense to change asset while not providing an absolute new
719
                // amount.
720
3
                (false, _) => Err(Error::<T>::ChangingAssetRequiresAbsoluteDepositChange)?,
721
            }
722

            
723
            // If time unit changes we need to update `last_time_updated` to be in the
724
            // new unit.
725
18
            if stream.config.time_unit != request.new_config.time_unit {
726
2
                stream.last_time_updated = T::TimeProvider::now(&request.new_config.time_unit)
727
2
                    .ok_or(Error::<T>::CantFetchCurrentTime)?;
728
16
            }
729

            
730
            // Event
731
18
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
732
18
                stream_id,
733
18
                old_config: stream.config,
734
18
                new_config: request.new_config.clone(),
735
18
                deposit_change,
736
18
            });
737
18

            
738
18
            // Update config in storage.
739
18
            stream.config = request.new_config;
740
18
            stream.pending_request = None;
741
18
            Streams::<T>::insert(stream_id, stream);
742
18

            
743
18
            Ok(().into())
744
        }
745

            
746
        #[pallet::call_index(5)]
747
        #[pallet::weight(T::WeightInfo::cancel_change_request())]
748
        pub fn cancel_change_request(
749
            origin: OriginFor<T>,
750
            stream_id: T::StreamId,
751
5
        ) -> DispatchResultWithPostInfo {
752
5
            let origin = ensure_signed(origin)?;
753
5
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
754

            
755
4
            let accepter = stream
756
4
                .account_to_party(origin)
757
4
                .ok_or(Error::<T>::UnauthorizedOrigin)?;
758

            
759
3
            let Some(request) = stream.pending_request.take() else {
760
1
                return Err(Error::<T>::NoPendingRequest.into());
761
            };
762

            
763
2
            ensure!(
764
2
                accepter == request.requester,
765
2
                Error::<T>::CanOnlyCancelOwnRequest
766
            );
767

            
768
            // Update storage.
769
            // Pending request is removed by calling `.take()`.
770
            Streams::<T>::insert(stream_id, stream);
771

            
772
            Ok(().into())
773
        }
774

            
775
        /// Allows immediately changing the deposit for a stream, which is simpler than
776
        /// calling `request_change` with the proper parameters.
777
        /// The call takes an asset id to ensure it has not changed (by an accepted request) before
778
        /// the call is included in a block, in which case the unit is no longer the same and quantities
779
        /// will not have the same scale/value.
780
        #[pallet::call_index(6)]
781
        #[pallet::weight(T::WeightInfo::immediately_change_deposit())]
782
        pub fn immediately_change_deposit(
783
            origin: OriginFor<T>,
784
            stream_id: T::StreamId,
785
            asset_id: T::AssetId,
786
            change: DepositChange<T::Balance>,
787
7
        ) -> DispatchResultWithPostInfo {
788
7
            let origin = ensure_signed(origin)?;
789
7
            let mut stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
790

            
791
6
            ensure!(stream.source == origin, Error::<T>::UnauthorizedOrigin);
792
4
            ensure!(
793
4
                stream.config.asset_id == asset_id,
794
                Error::<T>::ImmediateDepositChangeRequiresSameAssetId
795
            );
796

            
797
            // Perform pending payment before changing deposit.
798
4
            Self::perform_stream_payment(stream_id, &mut stream)?;
799

            
800
            // Apply change.
801
4
            Self::apply_deposit_change(&mut stream, change)?;
802

            
803
            // Event
804
1
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
805
1
                stream_id,
806
1
                old_config: stream.config.clone(),
807
1
                new_config: stream.config.clone(),
808
1
                deposit_change: Some(change),
809
1
            });
810
1

            
811
1
            // Update stream in storage.
812
1
            Streams::<T>::insert(stream_id, stream);
813
1

            
814
1
            Ok(().into())
815
        }
816
    }
817

            
818
    impl<T: Config> Pallet<T> {
819
        /// Try to open a stream and returns its id.
820
        /// Prefers calling this function from other pallets instead of `open_stream` as the
821
        /// latter can't return the id.
822
71
        pub fn open_stream_returns_id(
823
71
            origin: AccountIdOf<T>,
824
71
            target: AccountIdOf<T>,
825
71
            config: StreamConfigOf<T>,
826
71
            initial_deposit: T::Balance,
827
71
        ) -> Result<T::StreamId, DispatchErrorWithPostInfo> {
828
71
            ensure!(origin != target, Error::<T>::CantBeBothSourceAndTarget);
829

            
830
            // Generate a new stream id.
831
70
            let stream_id = NextStreamId::<T>::get();
832
70
            let next_stream_id = stream_id
833
70
                .checked_add(&One::one())
834
70
                .ok_or(Error::<T>::StreamIdOverflow)?;
835
69
            NextStreamId::<T>::set(next_stream_id);
836
69

            
837
69
            // Hold opening deposit for the storage used by Stream
838
69
            let opening_deposit = T::OpenStreamHoldAmount::get();
839
69
            if opening_deposit > 0u32.into() {
840
69
                T::Currency::hold(&HoldReason::StreamOpened.into(), &origin, opening_deposit)?;
841
            }
842

            
843
            // Freeze initial deposit.
844
69
            T::Assets::increase_deposit(&config.asset_id, &origin, initial_deposit)?;
845

            
846
            // Create stream data.
847
66
            let now =
848
67
                T::TimeProvider::now(&config.time_unit).ok_or(Error::<T>::CantFetchCurrentTime)?;
849
66
            let stream = Stream {
850
66
                source: origin.clone(),
851
66
                target: target.clone(),
852
66
                config,
853
66
                deposit: initial_deposit,
854
66
                last_time_updated: now,
855
66
                request_nonce: 0,
856
66
                pending_request: None,
857
66
                opening_deposit,
858
66
            };
859
66

            
860
66
            // Insert stream in storage.
861
66
            Streams::<T>::insert(stream_id, stream);
862
66
            LookupStreamsWithSource::<T>::insert(origin, stream_id, ());
863
66
            LookupStreamsWithTarget::<T>::insert(target, stream_id, ());
864
66

            
865
66
            // Emit event.
866
66
            Pallet::<T>::deposit_event(Event::<T>::StreamOpened { stream_id });
867
66

            
868
66
            Ok(stream_id)
869
71
        }
870

            
871
        /// Get the stream payment current status, telling how much payment is
872
        /// pending, how much deposit will be left and whenever the stream is stalled.
873
        /// The stream is considered stalled if no funds are left or if the provided
874
        /// time is past a mandatory request deadline. If the provided `now` is `None`
875
        /// then the current time will be fetched. Being able to provide a custom `now`
876
        /// allows to check the status in the future. It is invalid to provide a `now` that is
877
        /// before `last_time_updated`.
878
54
        pub fn stream_payment_status(
879
54
            stream_id: T::StreamId,
880
54
            now: Option<T::Balance>,
881
54
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
882
54
            let stream = Streams::<T>::get(stream_id).ok_or(Error::<T>::UnknownStreamId)?;
883
42
            let now = match now {
884
                Some(v) => v,
885
42
                None => T::TimeProvider::now(&stream.config.time_unit)
886
42
                    .ok_or(Error::<T>::CantFetchCurrentTime)?,
887
            };
888

            
889
42
            let last_time_updated = stream.last_time_updated;
890
42

            
891
42
            ensure!(
892
42
                now >= last_time_updated,
893
                Error::<T>::CantFetchStatusBeforeLastTimeUpdated
894
            );
895

            
896
42
            Self::stream_payment_status_by_ref(&stream, last_time_updated, now)
897
54
        }
898

            
899
115
        fn stream_payment_status_by_ref(
900
115
            stream: &StreamOf<T>,
901
115
            last_time_updated: T::Balance,
902
115
            mut now: T::Balance,
903
115
        ) -> Result<StreamPaymentStatus<T::Balance>, Error<T>> {
904
115
            let mut stalled_by_deadline = false;
905

            
906
            // Take into account mandatory change request deadline. Note that
907
            // while it'll perform payment up to deadline,
908
            // `stream.last_time_updated` is still the "real now" to avoid
909
            // retroactive payment in case the deadline changes.
910
            if let Some(ChangeRequest {
911
34
                kind: ChangeKind::Mandatory { deadline },
912
                ..
913
46
            }) = &stream.pending_request
914
            {
915
34
                now = min(now, *deadline);
916
34

            
917
34
                if now == *deadline {
918
15
                    stalled_by_deadline = true;
919
21
                }
920
81
            }
921

            
922
            // If deposit is zero the stream is fully drained and there is nothing to transfer.
923
115
            if stream.deposit.is_zero() {
924
                return Ok(StreamPaymentStatus {
925
                    payment: 0u32.into(),
926
                    deposit_left: stream.deposit,
927
                    stalled: true,
928
                });
929
115
            }
930

            
931
            // Dont perform payment if now is before or equal to `last_time_updated`.
932
            // It can be before due to the deadline adjustment.
933
115
            let Some(delta) = now.checked_sub(&last_time_updated) else {
934
2
                return Ok(StreamPaymentStatus {
935
2
                    payment: 0u32.into(),
936
2
                    deposit_left: stream.deposit,
937
2
                    stalled: true,
938
2
                });
939
            };
940

            
941
            // We compute the amount due to the target according to the rate, which may be
942
            // lowered if the stream deposit is lower.
943
            // Saturating is fine as it'll be clamped to the source deposit. It is also safer as
944
            // considering it an error can make a stream un-updatable if too much time has passed
945
            // without updates.
946
113
            let mut payment = delta.saturating_mul(stream.config.rate);
947

            
948
            // We compute the new amount of locked funds. If it underflows it
949
            // means that there is more to pay that what is left, in which case
950
            // we pay all that is left.
951
113
            let (deposit_left, stalled) = match stream.deposit.checked_sub(&payment) {
952
111
                Some(v) if v.is_zero() => (v, true),
953
110
                Some(v) => (v, stalled_by_deadline),
954
                None => {
955
2
                    payment = stream.deposit;
956
2
                    (Zero::zero(), true)
957
                }
958
            };
959

            
960
113
            Ok(StreamPaymentStatus {
961
113
                payment,
962
113
                deposit_left,
963
113
                stalled,
964
113
            })
965
115
        }
966

            
967
        /// Behavior:
968
        /// A stream payment consist of a locked deposit, a rate per unit of time and the
969
        /// last time the stream was updated. When updating the stream, **at most**
970
        /// `elapsed_time * rate` is unlocked from the source account and transfered to the target
971
        /// account. If this amount is greater than the left deposit, the stream is considered
972
        /// drained **but not closed**. The source can come back later and refill the stream,
973
        /// however there will be no retroactive payment for the time spent as drained.
974
        /// If the stream payment is used to rent a service, the target should pause the service
975
        /// while the stream is drained, and resume it once it is refilled.
976
73
        fn perform_stream_payment(
977
73
            stream_id: T::StreamId,
978
73
            stream: &mut StreamOf<T>,
979
73
        ) -> Result<T::Balance, DispatchErrorWithPostInfo> {
980
73
            let now = T::TimeProvider::now(&stream.config.time_unit)
981
73
                .ok_or(Error::<T>::CantFetchCurrentTime)?;
982

            
983
            // We want to update `stream.last_time_updated` to `now` as soon
984
            // as possible to avoid forgetting to do it. We copy the old value
985
            // for payment computation.
986
73
            let last_time_updated = stream.last_time_updated;
987
73
            stream.last_time_updated = now;
988

            
989
            let StreamPaymentStatus {
990
73
                payment,
991
73
                deposit_left,
992
73
                stalled,
993
73
            } = Self::stream_payment_status_by_ref(stream, last_time_updated, now)?;
994

            
995
73
            if payment.is_zero() {
996
25
                return Ok(0u32.into());
997
48
            }
998
48

            
999
48
            // Transfer from the source to target.
48
            T::Assets::transfer_deposit(
48
                &stream.config.asset_id,
48
                &stream.source,
48
                &stream.target,
48
                payment,
48
            )?;
            // Update stream info.
48
            stream.deposit = deposit_left;
48

            
48
            // Emit event.
48
            Pallet::<T>::deposit_event(Event::<T>::StreamPayment {
48
                stream_id,
48
                source: stream.source.clone(),
48
                target: stream.target.clone(),
48
                amount: payment,
48
                stalled,
48
            });
48

            
48
            Ok(payment)
73
        }
22
        fn apply_deposit_change(
22
            stream: &mut StreamOf<T>,
22
            change: DepositChange<T::Balance>,
22
        ) -> DispatchResultWithPostInfo {
22
            match change {
3
                DepositChange::Absolute(amount) => {
3
                    if let Some(increase) = amount.checked_sub(&stream.deposit) {
1
                        T::Assets::increase_deposit(
1
                            &stream.config.asset_id,
1
                            &stream.source,
1
                            increase,
1
                        )?;
2
                    } else if let Some(decrease) = stream.deposit.checked_sub(&amount) {
2
                        T::Assets::decrease_deposit(
2
                            &stream.config.asset_id,
2
                            &stream.source,
2
                            decrease,
2
                        )?;
                    }
2
                    stream.deposit = amount;
                }
16
                DepositChange::Increase(increase) => {
16
                    stream.deposit = stream
16
                        .deposit
16
                        .checked_add(&increase)
16
                        .ok_or(ArithmeticError::Overflow)?;
14
                    T::Assets::increase_deposit(&stream.config.asset_id, &stream.source, increase)?;
                }
3
                DepositChange::Decrease(decrease) => {
3
                    stream.deposit = stream
3
                        .deposit
3
                        .checked_sub(&decrease)
3
                        .ok_or(ArithmeticError::Underflow)?;
1
                    T::Assets::decrease_deposit(&stream.config.asset_id, &stream.source, decrease)?;
                }
            }
17
            Ok(().into())
22
        }
        /// Tries to apply a possibly immediate change. Return if that change was immediate and
        /// applied or not.
        ///
        /// If asset id and time unit are the same, we allow to make the change
        /// immediatly if the origin is at a disadvantage.
        /// We allow this even if there is already a pending request.
52
        fn maybe_immediate_change(
52
            stream_id: T::StreamId,
52
            stream: &mut StreamOf<T>,
52
            new_config: &StreamConfigOf<T>,
52
            deposit_change: Option<DepositChange<T::Balance>>,
52
            requester: Party,
52
        ) -> Result<bool, DispatchErrorWithPostInfo> {
52
            if new_config.time_unit != stream.config.time_unit
29
                || new_config.asset_id != stream.config.asset_id
            {
28
                return Ok(false);
24
            }
24

            
24
            if requester == Party::Source && new_config.rate < stream.config.rate {
14
                return Ok(false);
10
            }
10

            
10
            if requester == Party::Target && new_config.rate > stream.config.rate {
3
                return Ok(false);
7
            }
7

            
7
            // Perform pending payment before changing config.
7
            Self::perform_stream_payment(stream_id, stream)?;
            // We apply the requested deposit change.
7
            if let Some(change) = deposit_change {
5
                Self::apply_deposit_change(stream, change)?;
2
            }
            // Emit event.
5
            Pallet::<T>::deposit_event(Event::<T>::StreamConfigChanged {
5
                stream_id,
5
                old_config: stream.config.clone(),
5
                new_config: new_config.clone(),
5
                deposit_change,
5
            });
5

            
5
            // Update storage.
5
            stream.config = new_config.clone();
5
            Streams::<T>::insert(stream_id, stream);
5

            
5
            Ok(true)
52
        }
    }
}