1
// Copyright (C) Moondance Labs Ltd.
2
// This file is part of Tanssi.
3

            
4
// Tanssi is free software: you can redistribute it and/or modify
5
// it under the terms of the GNU General Public License as published by
6
// the Free Software Foundation, either version 3 of the License, or
7
// (at your option) any later version.
8

            
9
// Tanssi is distributed in the hope that it will be useful,
10
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12
// GNU General Public License for more details.
13

            
14
// You should have received a copy of the GNU General Public License
15
// along with Tanssi.  If not, see <http://www.gnu.org/licenses/>.
16

            
17
//! A collection of node-specific RPC methods.
18
//! Substrate provides the `sc-rpc` crate, which defines the core RPC layer
19
//! used by Substrate nodes. This file extends those RPC definitions with
20
//! capabilities that are specific to this project's runtime configuration.
21

            
22
#![warn(missing_docs)]
23

            
24
pub use sc_rpc::SubscriptionTaskExecutor;
25

            
26
use {
27
    container_chain_template_frontier_runtime::{opaque::Block, AccountId, Hash, Index},
28
    core::marker::PhantomData,
29
    cumulus_client_parachain_inherent::ParachainInherentData,
30
    cumulus_primitives_core::{ParaId, PersistedValidationData},
31
    cumulus_test_relay_sproof_builder::RelayStateSproofBuilder,
32
    fc_mapping_sync::{kv::MappingSyncWorker, SyncStrategy},
33
    fc_rpc::{
34
        EthApiServer, EthFilterApiServer, EthPubSubApiServer, EthTask, TxPool, TxPoolApiServer,
35
    },
36
    fc_storage::StorageOverride,
37
    fp_rpc::{EthereumRuntimeRPCApi, NoTransactionConverter},
38
    frame_support::CloneNoBound,
39
    futures::StreamExt,
40
    jsonrpsee::RpcModule,
41
    manual_xcm_rpc::{ManualXcm, ManualXcmApiServer},
42
    sc_client_api::{
43
        backend::{Backend, StateBackend},
44
        client::BlockchainEvents,
45
        AuxStore, BlockOf, StorageProvider,
46
    },
47
    sc_consensus_manual_seal::rpc::{EngineCommand, ManualSeal, ManualSealApiServer},
48
    sc_network_sync::SyncingService,
49
    sc_service::TaskManager,
50
    sc_transaction_pool_api::TransactionPool,
51
    sp_api::{CallApiAt, ProvideRuntimeApi},
52
    sp_block_builder::BlockBuilder,
53
    sp_blockchain::{
54
        Backend as BlockchainBackend, Error as BlockChainError, HeaderBackend, HeaderMetadata,
55
    },
56
    sp_consensus_aura::SlotDuration,
57
    sp_core::H256,
58
    sp_runtime::traits::{BlakeTwo256, Block as BlockT, Header as HeaderT},
59
    std::{
60
        collections::BTreeMap,
61
        sync::{Arc, Mutex},
62
        time::Duration,
63
    },
64
    tc_service_container_chain_spawner::service::{
65
        ContainerChainClient, MinimalContainerRuntimeApi,
66
    },
67
};
68

            
69
pub struct DefaultEthConfig<C, BE>(std::marker::PhantomData<(C, BE)>);
70

            
71
impl<C, BE> fc_rpc::EthConfig<Block, C> for DefaultEthConfig<C, BE>
72
where
73
    C: StorageProvider<Block, BE> + Sync + Send + 'static,
74
    BE: Backend<Block> + 'static,
75
{
76
    type EstimateGasAdapter = ();
77
    type RuntimeStorageOverride =
78
        fc_rpc::frontier_backend_client::SystemAccountId20StorageOverride<Block, C, BE>;
79
}
80

            
81
mod eth;
82
pub use eth::*;
83
mod finality;
84

            
85
/// Full client dependencies.
86
pub struct FullDeps<C, P, BE> {
87
    /// The client instance to use.
88
    pub client: Arc<C>,
89
    /// Transaction pool instance.
90
    pub pool: Arc<P>,
91
    /// Graph pool instance.
92
    pub graph: Arc<P>,
93
    /// Network service
94
    pub network: Arc<dyn sc_network::service::traits::NetworkService>,
95
    /// Chain syncing service
96
    pub sync: Arc<SyncingService<Block>>,
97
    /// EthFilterApi pool.
98
    pub filter_pool: Option<FilterPool>,
99
    /// Frontier Backend.
100
    // TODO: log indexer?
101
    pub frontier_backend: Arc<dyn fc_api::Backend<Block>>,
102
    /// Backend.
103
    #[allow(dead_code)] // not used but keep nice type inference
104
    pub backend: Arc<BE>,
105
    /// Maximum number of logs in a query.
106
    pub max_past_logs: u32,
107
    /// Maximum block range in a query.
108
    pub max_block_range: u32,
109
    /// Maximum fee history cache size.
110
    pub fee_history_limit: u64,
111
    /// Fee history cache.
112
    pub fee_history_cache: FeeHistoryCache,
113
    /// Ethereum data access overrides.
114
    pub overrides: Arc<dyn StorageOverride<Block>>,
115
    /// Cache for Ethereum block data.
116
    pub block_data_cache: Arc<EthBlockDataCacheTask<Block>>,
117
    /// The Node authority flag
118
    pub is_authority: bool,
119
    /// Manual seal command sink
120
    pub command_sink: Option<futures::channel::mpsc::Sender<EngineCommand<Hash>>>,
121
    /// Channels for manual xcm messages (downward, hrmp)
122
    pub xcm_senders: Option<(flume::Sender<Vec<u8>>, flume::Sender<(ParaId, Vec<u8>)>)>,
123
}
124

            
125
/// Instantiate all Full RPC extensions.
126
320
pub fn create_full<C, P, BE>(
127
320
    deps: FullDeps<C, P, BE>,
128
320
    subscription_task_executor: SubscriptionTaskExecutor,
129
320
    pubsub_notification_sinks: Arc<
130
320
        fc_mapping_sync::EthereumBlockNotificationSinks<
131
320
            fc_mapping_sync::EthereumBlockNotification<Block>,
132
320
        >,
133
320
    >,
134
320
) -> Result<RpcModule<()>, Box<dyn std::error::Error + Send + Sync>>
135
320
where
136
320
    BE: Backend<Block> + 'static,
137
320
    BE::State: StateBackend<BlakeTwo256>,
138
320
    BE::Blockchain: BlockchainBackend<Block>,
139
320
    C: ProvideRuntimeApi<Block> + StorageProvider<Block, BE> + AuxStore,
140
320
    C: BlockchainEvents<Block>,
141
320
    C: HeaderBackend<Block> + HeaderMetadata<Block, Error = BlockChainError> + 'static,
142
320
    C: CallApiAt<Block>,
143
320
    C: Send + Sync + 'static,
144
320
    C::Api: RuntimeApiCollection,
145
320
    P: TransactionPool<Block = Block, Hash = <Block as BlockT>::Hash> + 'static,
146
{
147
    use {
148
        fc_rpc::{Eth, EthFilter, EthPubSub, Net, NetApiServer, Web3, Web3ApiServer},
149
        finality::{FrontierFinality, FrontierFinalityApiServer},
150
        substrate_frame_rpc_system::{System, SystemApiServer},
151
    };
152

            
153
320
    let mut io = RpcModule::new(());
154
    let FullDeps {
155
320
        client,
156
320
        pool,
157
320
        graph,
158
320
        network,
159
320
        sync,
160
320
        filter_pool,
161
320
        frontier_backend,
162
        backend: _,
163
320
        max_past_logs,
164
320
        max_block_range,
165
320
        fee_history_limit,
166
320
        fee_history_cache,
167
320
        overrides,
168
320
        block_data_cache,
169
320
        is_authority,
170
320
        command_sink,
171
320
        xcm_senders,
172
320
    } = deps;
173

            
174
320
    io.merge(System::new(Arc::clone(&client), Arc::clone(&pool)).into_rpc())?;
175

            
176
    // TODO: are we supporting signing?
177
320
    let signers = Vec::new();
178

            
179
320
    let convert_transaction: Option<NoTransactionConverter> = None;
180
320
    let authorities = vec![tc_consensus::get_aura_id_from_seed("alice")];
181
320
    let authorities_for_cdp = authorities.clone();
182

            
183
320
    let pending_create_inherent_data_providers = move |_, ()| {
184
4
        let authorities_for_cidp = authorities.clone();
185

            
186
4
        async move {
187
4
            let mocked_authorities_noting =
188
4
                ccp_authorities_noting_inherent::MockAuthoritiesNotingInherentDataProvider {
189
4
                    current_para_block: 1000,
190
4
                    relay_offset: 1000,
191
4
                    relay_blocks_per_para_block: 2,
192
4
                    orchestrator_para_id: 1000u32.into(),
193
4
                    container_para_id: 2000u32.into(),
194
4
                    authorities: authorities_for_cidp,
195
4
                };
196

            
197
4
            let timestamp = sp_timestamp::InherentDataProvider::from_system_time();
198
            // Create a dummy parachain inherent data provider which is required to pass
199
            // the checks by the para chain system. We use dummy values because in the 'pending context'
200
            // neither do we have access to the real values nor do we need them.
201
4
            let (relay_parent_storage_root, relay_chain_state) = RelayStateSproofBuilder {
202
4
                additional_key_values: mocked_authorities_noting.get_key_values(),
203
4
                ..Default::default()
204
4
            }
205
4
            .into_state_root_and_proof();
206
4
            let vfp = PersistedValidationData {
207
4
                // This is a hack to make `cumulus_pallet_parachain_system::RelayNumberStrictlyIncreases`
208
4
                // happy. Relay parent number can't be bigger than u32::MAX.
209
4
                relay_parent_number: u32::MAX,
210
4
                relay_parent_storage_root,
211
4
                ..Default::default()
212
4
            };
213
4
            let parachain_inherent_data = ParachainInherentData {
214
4
                validation_data: vfp,
215
4
                relay_chain_state,
216
4
                downward_messages: Default::default(),
217
4
                horizontal_messages: Default::default(),
218
4
                relay_parent_descendants: Default::default(),
219
4
                collator_peer_id: Default::default(),
220
4
            };
221
4
            Ok((
222
4
                timestamp,
223
4
                parachain_inherent_data,
224
4
                mocked_authorities_noting,
225
4
            ))
226
4
        }
227
4
    };
228

            
229
320
    let pending_consensus_data_provider_frontier: Option<
230
320
        Box<(dyn fc_rpc::pending::ConsensusDataProvider<_>)>,
231
320
    > = Some(Box::new(
232
320
        tc_consensus::ContainerManualSealAuraConsensusDataProvider::new(
233
320
            SlotDuration::from_millis(container_chain_template_frontier_runtime::SLOT_DURATION),
234
320
            authorities_for_cdp,
235
320
        ),
236
320
    ));
237

            
238
320
    io.merge(
239
320
        Eth::<_, _, _, _, _, _, DefaultEthConfig<C, BE>>::new(
240
320
            Arc::clone(&client),
241
320
            Arc::clone(&pool),
242
320
            Arc::clone(&graph),
243
320
            convert_transaction,
244
320
            Arc::clone(&sync),
245
320
            signers,
246
320
            Arc::clone(&overrides),
247
320
            Arc::clone(&frontier_backend),
248
320
            is_authority,
249
320
            Arc::clone(&block_data_cache),
250
320
            fee_history_cache,
251
320
            fee_history_limit,
252
320
            10,
253
320
            None,
254
320
            pending_create_inherent_data_providers,
255
320
            pending_consensus_data_provider_frontier,
256
320
        )
257
320
        .into_rpc(),
258
320
    )?;
259

            
260
320
    let tx_pool: TxPool<Block, _, _> = TxPool::new(client.clone(), graph.clone());
261
320
    if let Some(filter_pool) = filter_pool {
262
320
        io.merge(
263
320
            EthFilter::new(
264
320
                client.clone(),
265
320
                frontier_backend.clone(),
266
320
                graph,
267
320
                filter_pool,
268
320
                500_usize, // max stored filters
269
320
                max_past_logs,
270
320
                max_block_range,
271
320
                block_data_cache,
272
320
            )
273
320
            .into_rpc(),
274
320
        )?;
275
    }
276

            
277
320
    io.merge(
278
320
        Net::new(
279
320
            Arc::clone(&client),
280
320
            network,
281
320
            // Whether to format the `peer_count` response as Hex (default) or not.
282
320
            true,
283
320
        )
284
320
        .into_rpc(),
285
320
    )?;
286

            
287
320
    if let Some(command_sink) = command_sink {
288
320
        io.merge(
289
320
            // We provide the rpc handler with the sending end of the channel to allow the rpc
290
320
            // send EngineCommands to the background block authorship task.
291
320
            ManualSeal::new(command_sink).into_rpc(),
292
320
        )?;
293
    };
294

            
295
320
    io.merge(Web3::new(Arc::clone(&client)).into_rpc())?;
296
320
    io.merge(
297
320
        EthPubSub::new(
298
320
            pool,
299
320
            Arc::clone(&client),
300
320
            sync,
301
320
            subscription_task_executor,
302
320
            overrides,
303
320
            pubsub_notification_sinks,
304
320
        )
305
320
        .into_rpc(),
306
320
    )?;
307
320
    io.merge(tx_pool.into_rpc())?;
308

            
309
320
    if let Some((downward_message_channel, hrmp_message_channel)) = xcm_senders {
310
320
        io.merge(
311
320
            ManualXcm {
312
320
                downward_message_channel,
313
320
                hrmp_message_channel,
314
320
            }
315
320
            .into_rpc(),
316
320
        )?;
317
    }
318

            
319
320
    io.merge(FrontierFinality::new(client.clone(), frontier_backend.clone()).into_rpc())?;
320

            
321
320
    Ok(io)
322
320
}
323

            
324
pub struct SpawnTasksParams<'a, B: BlockT, C, BE> {
325
    pub task_manager: &'a TaskManager,
326
    pub client: Arc<C>,
327
    pub substrate_backend: Arc<BE>,
328
    pub frontier_backend: Arc<fc_db::Backend<B, C>>,
329
    pub filter_pool: Option<FilterPool>,
330
    pub overrides: Arc<dyn StorageOverride<B>>,
331
    pub fee_history_limit: u64,
332
    pub fee_history_cache: FeeHistoryCache,
333
    /// Chain syncing service
334
    pub sync_service: Arc<SyncingService<B>>,
335
    /// Chain syncing service
336
    pub pubsub_notification_sinks: Arc<
337
        fc_mapping_sync::EthereumBlockNotificationSinks<
338
            fc_mapping_sync::EthereumBlockNotification<B>,
339
        >,
340
    >,
341
}
342

            
343
/// Spawn the tasks that are required to run Moonbeam.
344
160
pub fn spawn_essential_tasks<B, C, BE>(params: SpawnTasksParams<B, C, BE>)
345
160
where
346
160
    C: ProvideRuntimeApi<B> + BlockOf,
347
160
    C: HeaderBackend<B> + HeaderMetadata<B, Error = BlockChainError> + 'static,
348
160
    C: BlockchainEvents<B> + StorageProvider<B, BE>,
349
160
    C: Send + Sync + 'static,
350
160
    C::Api: EthereumRuntimeRPCApi<B>,
351
160
    C::Api: BlockBuilder<B>,
352
160
    B: BlockT<Hash = H256> + Send + Sync + 'static,
353
160
    B::Header: HeaderT<Number = u32>,
354
160
    BE: Backend<B> + 'static,
355
160
    BE::State: StateBackend<BlakeTwo256>,
356
{
357
    // Frontier offchain DB task. Essential.
358
    // Maps emulated ethereum data to substrate native data.
359
160
    match &*params.frontier_backend {
360
160
        fc_db::Backend::KeyValue(b) => {
361
160
            params.task_manager.spawn_essential_handle().spawn(
362
                "frontier-mapping-sync-worker",
363
160
                Some("frontier"),
364
160
                MappingSyncWorker::new(
365
160
                    params.client.import_notification_stream(),
366
160
                    Duration::new(6, 0),
367
160
                    params.client.clone(),
368
160
                    params.substrate_backend.clone(),
369
160
                    params.overrides.clone(),
370
160
                    b.clone(),
371
                    3,
372
                    0,
373
160
                    SyncStrategy::Parachain,
374
160
                    params.sync_service.clone(),
375
160
                    params.pubsub_notification_sinks.clone(),
376
                )
377
6536
                .for_each(|()| futures::future::ready(())),
378
            );
379
        }
380
        fc_db::Backend::Sql(b) => {
381
            params.task_manager.spawn_essential_handle().spawn_blocking(
382
                "frontier-mapping-sync-worker",
383
                Some("frontier"),
384
                fc_mapping_sync::sql::SyncWorker::run(
385
                    params.client.clone(),
386
                    params.substrate_backend.clone(),
387
                    b.clone(),
388
                    params.client.import_notification_stream(),
389
                    fc_mapping_sync::sql::SyncWorkerConfig {
390
                        read_notification_timeout: Duration::from_secs(10),
391
                        check_indexed_blocks_interval: Duration::from_secs(60),
392
                    },
393
                    fc_mapping_sync::SyncStrategy::Parachain,
394
                    params.sync_service.clone(),
395
                    params.pubsub_notification_sinks.clone(),
396
                ),
397
            );
398
        }
399
    }
400

            
401
    // Frontier `EthFilterApi` maintenance.
402
    // Manages the pool of user-created Filters.
403
160
    if let Some(filter_pool) = params.filter_pool {
404
        // Each filter is allowed to stay in the pool for 100 blocks.
405
        // TODO: Re-visit this assumption with parathreads, as they
406
        // might have a block every good amount of time, and can be abused
407
        // likely we will need to implement a time-based filter
408
        const FILTER_RETAIN_THRESHOLD: u64 = 100;
409
160
        params.task_manager.spawn_essential_handle().spawn(
410
            "frontier-filter-pool",
411
160
            Some("frontier"),
412
160
            EthTask::filter_pool_task(
413
160
                Arc::clone(&params.client),
414
160
                filter_pool,
415
                FILTER_RETAIN_THRESHOLD,
416
            ),
417
        );
418
    }
419

            
420
    // Spawn Frontier FeeHistory cache maintenance task.
421
160
    params.task_manager.spawn_essential_handle().spawn(
422
        "frontier-fee-history",
423
160
        Some("frontier"),
424
160
        EthTask::fee_history_task(
425
160
            Arc::clone(&params.client),
426
160
            Arc::clone(&params.overrides),
427
160
            params.fee_history_cache,
428
160
            params.fee_history_limit,
429
        ),
430
    );
431
160
}
432

            
433
/// A set of APIs that polkadot-like runtimes must implement.
434
///
435
/// This trait has no methods or associated type. It is a concise marker for all the trait bounds
436
/// that it contains.
437
pub trait RuntimeApiCollection:
438
    sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
439
    + sp_api::ApiExt<Block>
440
    + sp_block_builder::BlockBuilder<Block>
441
    + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Index>
442
    + sp_api::Metadata<Block>
443
    + sp_offchain::OffchainWorkerApi<Block>
444
    + sp_session::SessionKeys<Block>
445
    + fp_rpc::ConvertTransactionRuntimeApi<Block>
446
    + fp_rpc::EthereumRuntimeRPCApi<Block>
447
    + cumulus_primitives_core::CollectCollationInfo<Block>
448
{
449
}
450

            
451
impl<Api> RuntimeApiCollection for Api where
452
    Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>
453
        + sp_api::ApiExt<Block>
454
        + sp_block_builder::BlockBuilder<Block>
455
        + substrate_frame_rpc_system::AccountNonceApi<Block, AccountId, Index>
456
        + sp_api::Metadata<Block>
457
        + sp_offchain::OffchainWorkerApi<Block>
458
        + sp_session::SessionKeys<Block>
459
        + fp_rpc::ConvertTransactionRuntimeApi<Block>
460
        + fp_rpc::EthereumRuntimeRPCApi<Block>
461
        + cumulus_primitives_core::CollectCollationInfo<Block>
462
{
463
}
464

            
465
tp_traits::alias!(
466
    pub trait FrontierRpcRuntimeApi:
467
        MinimalContainerRuntimeApi +
468
        sp_api::ConstructRuntimeApi<
469
            Block,
470
            ContainerChainClient<Self>,
471
            RuntimeApi:
472
                RuntimeApiCollection
473
        >
474
);
475

            
476
#[derive(CloneNoBound)]
477
pub struct GenerateFrontierRpcBuilder<RuntimeApi> {
478
    rpc_config: crate::cli::RpcConfig,
479
    phantom: PhantomData<RuntimeApi>,
480
}
481

            
482
impl<RuntimeApi> GenerateFrontierRpcBuilder<RuntimeApi> {
483
    pub fn new(rpc_config: crate::cli::RpcConfig) -> Self {
484
        Self {
485
            rpc_config,
486
            phantom: PhantomData,
487
        }
488
    }
489
}
490

            
491
const _: () = {
492
    use tc_service_container_chain_spawner::rpc::generate_rpc_builder::*;
493

            
494
    impl<RuntimeApi: FrontierRpcRuntimeApi> GenerateRpcBuilder<RuntimeApi>
495
        for GenerateFrontierRpcBuilder<RuntimeApi>
496
    {
497
        fn generate(
498
            &self,
499
            GenerateRpcBuilderParams {
500
                backend,
501
                client,
502
                network,
503
                container_chain_config,
504
                prometheus_registry,
505
                sync_service,
506
                task_manager,
507
                transaction_pool,
508
                ..
509
            }: GenerateRpcBuilderParams<RuntimeApi>,
510
        ) -> Result<CompleteRpcBuilder, ServiceError> {
511
            let max_past_logs = self.rpc_config.max_past_logs;
512
            let max_block_range = self.rpc_config.max_block_range;
513

            
514
            // Frontier specific stuff
515
            let filter_pool: Option<FilterPool> = Some(Arc::new(Mutex::new(BTreeMap::new())));
516
            let fee_history_cache: FeeHistoryCache = Arc::new(Mutex::new(BTreeMap::new()));
517
            let frontier_backend = Arc::new(fc_db::Backend::KeyValue(
518
                crate::service::open_frontier_backend(client.clone(), container_chain_config)?
519
                    .into(),
520
            ));
521
            let overrides = Arc::new(fc_rpc::StorageOverrideHandler::new(client.clone()));
522
            let fee_history_limit = self.rpc_config.fee_history_limit;
523

            
524
            let pubsub_notification_sinks: fc_mapping_sync::EthereumBlockNotificationSinks<
525
                fc_mapping_sync::EthereumBlockNotification<Block>,
526
            > = Default::default();
527
            let pubsub_notification_sinks = Arc::new(pubsub_notification_sinks);
528

            
529
            spawn_essential_tasks(SpawnTasksParams {
530
                task_manager,
531
                client: client.clone(),
532
                substrate_backend: backend.clone(),
533
                frontier_backend: frontier_backend.clone(),
534
                filter_pool: filter_pool.clone(),
535
                overrides: overrides.clone(),
536
                fee_history_limit,
537
                fee_history_cache: fee_history_cache.clone(),
538
                sync_service: sync_service.clone(),
539
                pubsub_notification_sinks: pubsub_notification_sinks.clone(),
540
            });
541

            
542
            let block_data_cache = Arc::new(fc_rpc::EthBlockDataCacheTask::new(
543
                task_manager.spawn_handle(),
544
                overrides.clone(),
545
                self.rpc_config.eth_log_block_cache,
546
                self.rpc_config.eth_statuses_cache,
547
                prometheus_registry.clone(),
548
            ));
549

            
550
            Ok(Box::new(move |subscription_task_executor| {
551
                let deps = crate::rpc::FullDeps {
552
                    backend: backend.clone(),
553
                    client: client.clone(),
554
                    filter_pool: filter_pool.clone(),
555
                    frontier_backend: match &*frontier_backend {
556
                        fc_db::Backend::KeyValue(b) => b.clone(),
557
                        fc_db::Backend::Sql(b) => b.clone(),
558
                    },
559
                    graph: transaction_pool.clone(),
560
                    pool: transaction_pool.clone(),
561
                    max_past_logs,
562
                    max_block_range,
563
                    fee_history_limit,
564
                    fee_history_cache: fee_history_cache.clone(),
565
                    network: Arc::new(network.clone()),
566
                    sync: sync_service.clone(),
567
                    block_data_cache: block_data_cache.clone(),
568
                    overrides: overrides.clone(),
569
                    is_authority: false,
570
                    command_sink: None,
571
                    xcm_senders: None,
572
                };
573
                crate::rpc::create_full(
574
                    deps,
575
                    subscription_task_executor,
576
                    pubsub_notification_sinks.clone(),
577
                )
578
                .map_err(Into::into)
579
            }))
580
        }
581
    }
582
};