Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{consensus_service, database_thread, network_service, LogCallback, LogLevel};
19
use futures_channel::oneshot;
20
use futures_util::FutureExt;
21
use smol::{
22
    future,
23
    net::{TcpListener, TcpStream},
24
};
25
use smoldot::json_rpc::{methods, service};
26
use std::{
27
    future::Future,
28
    io, mem,
29
    net::SocketAddr,
30
    num::{NonZeroU32, NonZeroUsize},
31
    pin::Pin,
32
    sync::{
33
        atomic::{AtomicU32, Ordering},
34
        Arc,
35
    },
36
    time::Duration,
37
};
38
39
mod chain_head_subscriptions;
40
mod legacy_api_subscriptions;
41
mod requests_handler;
42
mod runtime_caches_service;
43
44
/// Configuration for a [`JsonRpcService`].
45
pub struct Config {
46
    /// Function that can be used to spawn background tasks.
47
    ///
48
    /// The tasks passed as parameter must be executed until they shut down.
49
    pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
50
51
    /// Function called in order to notify of something.
52
    pub log_callback: Arc<dyn LogCallback + Send + Sync>,
53
54
    /// Database to access blocks.
55
    pub database: Arc<database_thread::DatabaseThread>,
56
57
    /// Access to the network, and identifier of the chain from the point of view of the network
58
    /// service.
59
    pub network_service: (
60
        Arc<network_service::NetworkService>,
61
        network_service::ChainId,
62
    ),
63
64
    /// Where to bind the WebSocket server. If `None`, no TCP server is started.
65
    pub bind_address: Option<SocketAddr>,
66
67
    /// Maximum number of requests to process in parallel.
68
    pub max_parallel_requests: u32,
69
70
    /// Maximum number of JSON-RPC clients until new ones are rejected.
71
    pub max_json_rpc_clients: u32,
72
73
    /// Name of the chain, as found in the chain specification.
74
    pub chain_name: String,
75
76
    /// Type of the chain, as found in the chain specification.
77
    pub chain_type: String,
78
79
    /// JSON-encoded properties of the chain, as found in the chain specification.
80
    pub chain_properties_json: String,
81
82
    /// Whether the chain is a live network. Found in the chain specification.
83
    pub chain_is_live: bool,
84
85
    /// Hash of the genesis block.
86
    // TODO: load from database maybe?
87
    pub genesis_block_hash: [u8; 32],
88
89
    /// Consensus service of the chain.
90
    pub consensus_service: Arc<consensus_service::ConsensusService>,
91
}
92
93
/// Running JSON-RPC service.
94
///
95
/// If [`Config::bind_address`] is `Some`, holds a TCP server open for as long as it is alive.
96
///
97
/// In addition to a TCP/IP server, this service also provides a virtual JSON-RPC endpoint that
98
/// can be used through [`JsonRpcService::send_request`] and [`JsonRpcService::next_response`].
99
pub struct JsonRpcService {
100
    /// This events listener is notified when the service is dropped.
101
    service_dropped: event_listener::Event,
102
103
    /// Address the server is listening on. Not necessarily equal to [`Config::bind_address`].
104
    listen_addr: Option<SocketAddr>,
105
106
    /// I/O for the virtual endpoint.
107
    virtual_client_io: service::SerializedRequestsIo,
108
}
109
110
impl Drop for JsonRpcService {
111
21
    fn drop(&mut self) {
112
21
        self.service_dropped.notify(usize::MAX);
113
21
    }
_RNvXNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB2_14JsonRpcServiceNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Line
Count
Source
111
21
    fn drop(&mut self) {
112
21
        self.service_dropped.notify(usize::MAX);
113
21
    }
Unexecuted instantiation: _RNvXNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB2_14JsonRpcServiceNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
114
}
115
116
impl JsonRpcService {
117
    /// Initializes a new [`JsonRpcService`].
118
21
    pub async fn new(config: Config) -> Result<Self, InitError> {
_RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService3new
Line
Count
Source
118
21
    pub async fn new(config: Config) -> Result<Self, InitError> {
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService3new
119
21
        let (tcp_listener, listen_addr) = match &config.bind_address {
120
0
            Some(addr) => match TcpListener::bind(addr).await {
121
0
                Ok(listener) => {
122
0
                    let listen_addr = match listener.local_addr() {
123
0
                        Ok(addr) => addr,
124
0
                        Err(error) => {
125
0
                            return Err(InitError::ListenError {
126
0
                                bind_address: *addr,
127
0
                                error,
128
0
                            })
129
                        }
130
                    };
131
132
0
                    (Some(listener), Some(listen_addr))
133
                }
134
0
                Err(error) => {
135
0
                    return Err(InitError::ListenError {
136
0
                        bind_address: *addr,
137
0
                        error,
138
0
                    })
139
                }
140
            },
141
21
            None => (None, None),
142
        };
143
144
21
        let service_dropped = event_listener::Event::new();
145
21
        let on_service_dropped = service_dropped.listen();
146
21
147
21
        let (to_requests_handlers, from_background) = async_channel::bounded(8);
148
21
149
21
        let (virtual_client_main_task, virtual_client_io) =
150
21
            service::client_main_task(service::Config {
151
21
                max_active_subscriptions: u32::MAX,
152
21
                max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(),
153
21
            });
154
21
155
21
        spawn_client_main_task(
156
21
            config.tasks_executor.clone(),
157
21
            config.log_callback.clone(),
158
21
            config.consensus_service.clone(),
159
21
            config.database.clone(),
160
21
            to_requests_handlers.clone(),
161
21
            virtual_client_main_task,
162
21
        );
163
21
164
21
        let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new(
165
21
            runtime_caches_service::Config {
166
21
                tasks_executor: config.tasks_executor.clone(),
167
21
                log_callback: config.log_callback.clone(),
168
21
                database: config.database.clone(),
169
21
                num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable?
170
21
            },
171
21
        ));
172
21
173
672
        for _ in 0..config.max_parallel_requests {
174
672
            requests_handler::spawn_requests_handler(requests_handler::Config {
175
672
                tasks_executor: config.tasks_executor.clone(),
176
672
                log_callback: config.log_callback.clone(),
177
672
                database: config.database.clone(),
178
672
                network_service: config.network_service.clone(),
179
672
                receiver: from_background.clone(),
180
672
                chain_name: config.chain_name.clone(),
181
672
                chain_type: config.chain_type.clone(),
182
672
                chain_properties_json: config.chain_properties_json.clone(),
183
672
                chain_is_live: config.chain_is_live,
184
672
                genesis_block_hash: config.genesis_block_hash,
185
672
                consensus_service: config.consensus_service.clone(),
186
672
                runtime_caches_service: runtime_caches_service.clone(),
187
672
            });
188
672
        }
189
190
21
        if let Some(
tcp_listener0
) = tcp_listener {
191
0
            let background = JsonRpcBackground {
192
0
                tcp_listener,
193
0
                on_service_dropped,
194
0
                tasks_executor: config.tasks_executor.clone(),
195
0
                log_callback: config.log_callback,
196
0
                consensus_service: config.consensus_service.clone(),
197
0
                database: config.database.clone(),
198
0
                to_requests_handlers,
199
0
                num_json_rpc_clients: Arc::new(AtomicU32::new(0)),
200
0
                max_json_rpc_clients: config.max_json_rpc_clients,
201
0
            };
202
0
203
0
            (config.tasks_executor)(Box::pin(async move { background.run().await }));
Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00Ba_
Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00Ba_
204
21
        }
205
206
21
        Ok(JsonRpcService {
207
21
            service_dropped,
208
21
            listen_addr,
209
21
            virtual_client_io,
210
21
        })
211
21
    }
_RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
118
2
    pub async fn new(config: Config) -> Result<Self, InitError> {
119
2
        let (tcp_listener, listen_addr) = match &config.bind_address {
120
0
            Some(addr) => match TcpListener::bind(addr).await {
121
0
                Ok(listener) => {
122
0
                    let listen_addr = match listener.local_addr() {
123
0
                        Ok(addr) => addr,
124
0
                        Err(error) => {
125
0
                            return Err(InitError::ListenError {
126
0
                                bind_address: *addr,
127
0
                                error,
128
0
                            })
129
                        }
130
                    };
131
132
0
                    (Some(listener), Some(listen_addr))
133
                }
134
0
                Err(error) => {
135
0
                    return Err(InitError::ListenError {
136
0
                        bind_address: *addr,
137
0
                        error,
138
0
                    })
139
                }
140
            },
141
2
            None => (None, None),
142
        };
143
144
2
        let service_dropped = event_listener::Event::new();
145
2
        let on_service_dropped = service_dropped.listen();
146
2
147
2
        let (to_requests_handlers, from_background) = async_channel::bounded(8);
148
2
149
2
        let (virtual_client_main_task, virtual_client_io) =
150
2
            service::client_main_task(service::Config {
151
2
                max_active_subscriptions: u32::MAX,
152
2
                max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(),
153
2
            });
154
2
155
2
        spawn_client_main_task(
156
2
            config.tasks_executor.clone(),
157
2
            config.log_callback.clone(),
158
2
            config.consensus_service.clone(),
159
2
            config.database.clone(),
160
2
            to_requests_handlers.clone(),
161
2
            virtual_client_main_task,
162
2
        );
163
2
164
2
        let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new(
165
2
            runtime_caches_service::Config {
166
2
                tasks_executor: config.tasks_executor.clone(),
167
2
                log_callback: config.log_callback.clone(),
168
2
                database: config.database.clone(),
169
2
                num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable?
170
2
            },
171
2
        ));
172
2
173
64
        for _ in 0..config.max_parallel_requests {
174
64
            requests_handler::spawn_requests_handler(requests_handler::Config {
175
64
                tasks_executor: config.tasks_executor.clone(),
176
64
                log_callback: config.log_callback.clone(),
177
64
                database: config.database.clone(),
178
64
                network_service: config.network_service.clone(),
179
64
                receiver: from_background.clone(),
180
64
                chain_name: config.chain_name.clone(),
181
64
                chain_type: config.chain_type.clone(),
182
64
                chain_properties_json: config.chain_properties_json.clone(),
183
64
                chain_is_live: config.chain_is_live,
184
64
                genesis_block_hash: config.genesis_block_hash,
185
64
                consensus_service: config.consensus_service.clone(),
186
64
                runtime_caches_service: runtime_caches_service.clone(),
187
64
            });
188
64
        }
189
190
2
        if let Some(
tcp_listener0
) = tcp_listener {
191
0
            let background = JsonRpcBackground {
192
0
                tcp_listener,
193
0
                on_service_dropped,
194
0
                tasks_executor: config.tasks_executor.clone(),
195
0
                log_callback: config.log_callback,
196
0
                consensus_service: config.consensus_service.clone(),
197
0
                database: config.database.clone(),
198
0
                to_requests_handlers,
199
0
                num_json_rpc_clients: Arc::new(AtomicU32::new(0)),
200
0
                max_json_rpc_clients: config.max_json_rpc_clients,
201
0
            };
202
0
203
0
            (config.tasks_executor)(Box::pin(async move { background.run().await }));
204
2
        }
205
206
2
        Ok(JsonRpcService {
207
2
            service_dropped,
208
2
            listen_addr,
209
2
            virtual_client_io,
210
2
        })
211
2
    }
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0B8_
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CscDgN54JpMGG_6author
_RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
118
19
    pub async fn new(config: Config) -> Result<Self, InitError> {
119
19
        let (tcp_listener, listen_addr) = match &config.bind_address {
120
0
            Some(addr) => match TcpListener::bind(addr).await {
121
0
                Ok(listener) => {
122
0
                    let listen_addr = match listener.local_addr() {
123
0
                        Ok(addr) => addr,
124
0
                        Err(error) => {
125
0
                            return Err(InitError::ListenError {
126
0
                                bind_address: *addr,
127
0
                                error,
128
0
                            })
129
                        }
130
                    };
131
132
0
                    (Some(listener), Some(listen_addr))
133
                }
134
0
                Err(error) => {
135
0
                    return Err(InitError::ListenError {
136
0
                        bind_address: *addr,
137
0
                        error,
138
0
                    })
139
                }
140
            },
141
19
            None => (None, None),
142
        };
143
144
19
        let service_dropped = event_listener::Event::new();
145
19
        let on_service_dropped = service_dropped.listen();
146
19
147
19
        let (to_requests_handlers, from_background) = async_channel::bounded(8);
148
19
149
19
        let (virtual_client_main_task, virtual_client_io) =
150
19
            service::client_main_task(service::Config {
151
19
                max_active_subscriptions: u32::MAX,
152
19
                max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(),
153
19
            });
154
19
155
19
        spawn_client_main_task(
156
19
            config.tasks_executor.clone(),
157
19
            config.log_callback.clone(),
158
19
            config.consensus_service.clone(),
159
19
            config.database.clone(),
160
19
            to_requests_handlers.clone(),
161
19
            virtual_client_main_task,
162
19
        );
163
19
164
19
        let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new(
165
19
            runtime_caches_service::Config {
166
19
                tasks_executor: config.tasks_executor.clone(),
167
19
                log_callback: config.log_callback.clone(),
168
19
                database: config.database.clone(),
169
19
                num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable?
170
19
            },
171
19
        ));
172
19
173
608
        for _ in 0..config.max_parallel_requests {
174
608
            requests_handler::spawn_requests_handler(requests_handler::Config {
175
608
                tasks_executor: config.tasks_executor.clone(),
176
608
                log_callback: config.log_callback.clone(),
177
608
                database: config.database.clone(),
178
608
                network_service: config.network_service.clone(),
179
608
                receiver: from_background.clone(),
180
608
                chain_name: config.chain_name.clone(),
181
608
                chain_type: config.chain_type.clone(),
182
608
                chain_properties_json: config.chain_properties_json.clone(),
183
608
                chain_is_live: config.chain_is_live,
184
608
                genesis_block_hash: config.genesis_block_hash,
185
608
                consensus_service: config.consensus_service.clone(),
186
608
                runtime_caches_service: runtime_caches_service.clone(),
187
608
            });
188
608
        }
189
190
19
        if let Some(
tcp_listener0
) = tcp_listener {
191
0
            let background = JsonRpcBackground {
192
0
                tcp_listener,
193
0
                on_service_dropped,
194
0
                tasks_executor: config.tasks_executor.clone(),
195
0
                log_callback: config.log_callback,
196
0
                consensus_service: config.consensus_service.clone(),
197
0
                database: config.database.clone(),
198
0
                to_requests_handlers,
199
0
                num_json_rpc_clients: Arc::new(AtomicU32::new(0)),
200
0
                max_json_rpc_clients: config.max_json_rpc_clients,
201
0
            };
202
0
203
0
            (config.tasks_executor)(Box::pin(async move { background.run().await }));
204
19
        }
205
206
19
        Ok(JsonRpcService {
207
19
            service_dropped,
208
19
            listen_addr,
209
19
            virtual_client_io,
210
19
        })
211
19
    }
Unexecuted instantiation: _RNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0B8_
212
213
    /// Returns the address the server is listening on.
214
    ///
215
    /// Returns `None` if and only if [`Config::bind_address`] was `None`. However, if `Some`,
216
    /// the address is not necessarily equal to the one in [`Config::bind_address`].
217
0
    pub fn listen_addr(&self) -> Option<SocketAddr> {
218
0
        self.listen_addr
219
0
    }
Unexecuted instantiation: _RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService11listen_addr
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService11listen_addr
220
221
    /// Adds a JSON-RPC request to the queue of requests of the virtual endpoint.
222
    ///
223
    /// The virtual endpoint doesn't have any limit.
224
25
    pub fn send_request(&self, request: String) {
225
25
        match self.virtual_client_io.try_send_request(request) {
226
25
            Ok(()) => (),
227
0
            Err(err) => match err.cause {
228
                service::TrySendRequestErrorCause::TooManyPendingRequests
229
0
                | service::TrySendRequestErrorCause::ClientMainTaskDestroyed => unreachable!(),
230
            },
231
        }
232
25
    }
_RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService12send_request
Line
Count
Source
224
25
    pub fn send_request(&self, request: String) {
225
25
        match self.virtual_client_io.try_send_request(request) {
226
25
            Ok(()) => (),
227
0
            Err(err) => match err.cause {
228
                service::TrySendRequestErrorCause::TooManyPendingRequests
229
0
                | service::TrySendRequestErrorCause::ClientMainTaskDestroyed => unreachable!(),
230
            },
231
        }
232
25
    }
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService12send_request
233
234
    /// Returns the new JSON-RPC response or notification for requests sent using
235
    /// [`JsonRpcService::send_request`].
236
    ///
237
    /// If this function is called multiple times simultaneously, only one invocation will receive
238
    /// each response. Which one is unspecified.
239
25
    pub async fn next_response(&self) -> String {
_RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService13next_response
Line
Count
Source
239
25
    pub async fn next_response(&self) -> String {
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService13next_response
240
25
        match self.virtual_client_io.wait_next_response().
await14
{
241
25
            Ok(r) => r,
242
0
            Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(),
243
        }
244
25
    }
_RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
239
2
    pub async fn next_response(&self) -> String {
240
2
        match self.virtual_client_io.wait_next_response().
await1
{
241
2
            Ok(r) => r,
242
0
            Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(),
243
        }
244
2
    }
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0B8_
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CscDgN54JpMGG_6author
_RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
239
23
    pub async fn next_response(&self) -> String {
240
23
        match self.virtual_client_io.wait_next_response().
await13
{
241
23
            Ok(r) => r,
242
0
            Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(),
243
        }
244
23
    }
Unexecuted instantiation: _RNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0B8_
245
}
246
247
/// Error potentially returned by [`JsonRpcService::new`].
248
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXs2_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs2_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
249
pub enum InitError {
250
    /// Failed to listen on the server address.
251
    #[display(fmt = "Failed to listen on TCP address {bind_address}: {error}")]
252
    ListenError {
253
        /// Address that was attempted.
254
        bind_address: SocketAddr,
255
        /// Error returned by the operating system.
256
        error: io::Error,
257
    },
258
}
259
260
struct JsonRpcBackground {
261
    /// TCP listener for new incoming connections.
262
    tcp_listener: TcpListener,
263
264
    /// Event notified when the frontend is dropped.
265
    on_service_dropped: event_listener::EventListener,
266
267
    /// See [`Config::tasks_executor`].
268
    tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
269
270
    /// See [`Config::log_callback`].
271
    log_callback: Arc<dyn LogCallback + Send + Sync>,
272
273
    /// Database to access blocks.
274
    database: Arc<database_thread::DatabaseThread>,
275
276
    /// Consensus service of the chain.
277
    consensus_service: Arc<consensus_service::ConsensusService>,
278
279
    /// Channel used to send requests to the tasks that process said requests.
280
    to_requests_handlers: async_channel::Sender<requests_handler::Message>,
281
282
    /// Number of clients currently alive.
283
    num_json_rpc_clients: Arc<AtomicU32>,
284
285
    /// See [`Config::max_json_rpc_clients`].
286
    max_json_rpc_clients: u32,
287
}
288
289
impl JsonRpcBackground {
290
0
    async fn run(mut self) {
Unexecuted instantiation: _RNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB5_17JsonRpcBackground3run
Unexecuted instantiation: _RNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB5_17JsonRpcBackground3run
291
        loop {
292
0
            let Some(accept_result) = future::or(
293
0
                async {
294
0
                    (&mut self.on_service_dropped).await;
295
0
                    None
296
0
                },
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00Bb_
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00Bb_
297
0
                async { Some(self.tcp_listener.accept().await) },
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0Bb_
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0Bb_
298
0
            )
299
0
            .await
300
            else {
301
0
                return;
302
            };
303
304
0
            let (tcp_socket, address) = match accept_result {
305
0
                Ok(v) => v,
306
0
                Err(error) => {
307
0
                    // Failing to accept an incoming TCP connection generally happens due to
308
0
                    // the limit of file descriptors being reached.
309
0
                    // Sleep a little bit and try again.
310
0
                    self.log_callback.log(
311
0
                        LogLevel::Warn,
312
0
                        format!("json-rpc-tcp-listener-error; error={error}"),
313
0
                    );
314
0
                    smol::Timer::after(Duration::from_millis(50)).await;
315
0
                    continue;
316
                }
317
            };
318
319
            // New incoming TCP connection.
320
321
            // Try to increase `num_json_rpc_clients`. Fails if the maximum is reached.
322
0
            if self
323
0
                .num_json_rpc_clients
324
0
                .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| {
325
0
                    if old_value < self.max_json_rpc_clients {
326
                        // Considering that `old_value < max`, and `max` fits in a `u32` by
327
                        // definition, then `old_value + 1` also always fits in a `u32`. QED.
328
                        // There's no risk of overflow.
329
0
                        Some(old_value + 1)
330
                    } else {
331
0
                        None
332
                    }
333
0
                })
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0Bb_
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0Bb_
334
0
                .is_err()
335
            {
336
                // Reject the socket without sending back anything. Sending back a status
337
                // code would require allocating resources for that socket, which we
338
                // specifically don't want to do.
339
0
                self.log_callback.log(
340
0
                    LogLevel::Debug,
341
0
                    format!("json-rpc-incoming-connection-rejected; address={}", address),
342
0
                );
343
0
                smol::Timer::after(Duration::from_millis(50)).await;
344
0
                continue;
345
0
            }
346
0
347
0
            // Spawn two tasks: one for the socket I/O, and one to process requests.
348
0
            self.log_callback.log(
349
0
                LogLevel::Debug,
350
0
                format!("json-rpc-incoming-connection; address={}", address),
351
0
            );
352
0
            let (client_main_task, io) = service::client_main_task(service::Config {
353
0
                max_active_subscriptions: 128,
354
0
                max_pending_requests: NonZeroU32::new(64).unwrap(),
355
0
            });
356
0
            spawn_client_io_task(
357
0
                &self.tasks_executor,
358
0
                self.log_callback.clone(),
359
0
                tcp_socket,
360
0
                address,
361
0
                io,
362
0
                self.num_json_rpc_clients.clone(),
363
0
            );
364
0
            spawn_client_main_task(
365
0
                self.tasks_executor.clone(),
366
0
                self.log_callback.clone(),
367
0
                self.consensus_service.clone(),
368
0
                self.database.clone(),
369
0
                self.to_requests_handlers.clone(),
370
0
                client_main_task,
371
0
            );
372
        }
373
0
    }
Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0B9_
Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0B9_
374
}
375
376
0
fn spawn_client_io_task(
377
0
    tasks_executor: &Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
378
0
    log_callback: Arc<dyn LogCallback + Send + Sync>,
379
0
    tcp_socket: TcpStream,
380
0
    socket_address: SocketAddr,
381
0
    io: service::SerializedRequestsIo,
382
0
    num_json_rpc_clients: Arc<AtomicU32>,
383
0
) {
384
0
    let run_future = async move {
385
        // Perform the WebSocket handshake.
386
0
        let (mut ws_sender, mut ws_receiver) = {
387
0
            let mut ws_server = soketto::handshake::Server::new(tcp_socket);
388
389
            // TODO: enabling the `deflate` extension leads to "flate stream corrupted" errors
390
            //let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server);
391
            //ws_server.add_extension(Box::new(deflate));
392
393
0
            let key = match ws_server.receive_request().await {
394
0
                Ok(req) => req.key(),
395
0
                Err(error) => {
396
0
                    log_callback.log(
397
0
                        LogLevel::Debug,
398
0
                        format!(
399
0
                            "json-rpc-connection-error; address={socket_address}, error={error}"
400
0
                        ),
401
0
                    );
402
0
                    return;
403
                }
404
            };
405
406
0
            let accept = soketto::handshake::server::Response::Accept {
407
0
                key,
408
0
                protocol: None,
409
0
            };
410
0
411
0
            match ws_server.send_response(&accept).await {
412
0
                Ok(()) => {}
413
0
                Err(error) => {
414
0
                    log_callback.log(
415
0
                        LogLevel::Debug,
416
0
                        format!(
417
0
                            "json-rpc-connection-error; address={socket_address}, error={error}"
418
0
                        ),
419
0
                    );
420
0
                    return;
421
                }
422
            }
423
424
0
            ws_server.into_builder().finish()
425
0
        };
426
0
427
0
        // Create a future responsible for pulling responses and sending them back.
428
0
        let sending_future = async {
429
0
            let mut must_flush_asap = false;
430
431
            loop {
432
                // If `must_flush_asap`, we simply peek for the next response but without awaiting.
433
                // If `!must_flush_asap`, we wait for as long as necessary.
434
0
                let maybe_response = if must_flush_asap {
435
0
                    io.wait_next_response().now_or_never()
436
                } else {
437
0
                    Some(io.wait_next_response().await)
438
                };
439
440
0
                match maybe_response {
441
                    None => {
442
0
                        if let Err(err) = ws_sender.flush().await {
443
0
                            break Err(err.to_string());
444
0
                        }
445
0
                        must_flush_asap = false;
446
                    }
447
0
                    Some(Ok(response)) => {
448
0
                        log_callback.log(
449
0
                            LogLevel::Debug,
450
0
                            format!(
451
0
                                "json-rpc-response; address={}; response={}",
452
0
                                socket_address,
453
0
                                crate::util::truncated_str(
454
0
                                    response.chars().filter(|c| !c.is_control()),
Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task000B9_
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task000B9_
455
0
                                    128
456
0
                                )
457
0
                            ),
458
0
                        );
459
460
0
                        if let Err(err) = ws_sender.send_text_owned(response).await {
461
0
                            break Err(err.to_string());
462
0
                        }
463
0
                        must_flush_asap = true;
464
                    }
465
                    Some(Err(service::WaitNextResponseError::ClientMainTaskDestroyed)) => {
466
                        // The client main task never closes by itself but only as a consequence
467
                        // to the I/O task closing.
468
0
                        unreachable!()
469
                    }
470
                };
471
            }
472
0
        };
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task00B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task00B7_
473
474
        // Create a future responsible for pulling messages from the socket and sending them to
475
        // the main task.
476
0
        let receiving_future = async {
477
0
            let mut message = Vec::new();
478
0
            loop {
479
0
                message.clear();
480
0
481
0
                match ws_receiver.receive_data(&mut message).await {
482
                    Ok(soketto::Data::Binary(_)) => {
483
0
                        break Err("Unexpected binary frame".to_string());
484
                    }
485
0
                    Ok(soketto::Data::Text(_)) => {} // Handled below.
486
0
                    Err(soketto::connection::Error::Closed) => break Ok(()),
487
0
                    Err(err) => {
488
0
                        break Err(err.to_string());
489
                    }
490
                }
491
492
0
                let request = match String::from_utf8(mem::take(&mut message)) {
493
0
                    Ok(r) => r,
494
0
                    Err(error) => {
495
0
                        break Err(format!("Non-UTF8 text frame: {error}"));
496
                    }
497
                };
498
499
0
                log_callback.log(
500
0
                    LogLevel::Debug,
501
0
                    format!(
502
0
                        "json-rpc-request; address={}; request={}",
503
0
                        socket_address,
504
0
                        crate::util::truncated_str(
505
0
                            request.chars().filter(|c| !c.is_control()),
Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_00B9_
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_00B9_
506
0
                            128
507
0
                        )
508
0
                    ),
509
0
                );
510
0
511
0
                match io.send_request(request).await {
512
0
                    Ok(()) => {}
513
                    Err(service::SendRequestError {
514
                        cause: service::SendRequestErrorCause::ClientMainTaskDestroyed,
515
                        ..
516
                    }) => {
517
                        // The client main task never closes by itself but only as a
518
                        // consequence to the I/O task closing.
519
0
                        unreachable!()
520
                    }
521
                }
522
            }
523
0
        };
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_0B7_
524
525
        // Run these two futures until completion.
526
0
        match future::or(sending_future, receiving_future).await {
527
0
            Ok(()) => {
528
0
                log_callback.log(
529
0
                    LogLevel::Debug,
530
0
                    format!("json-rpc-connection-closed; address={socket_address}"),
531
0
                );
532
0
            }
533
0
            Err(error) => {
534
0
                log_callback.log(
535
0
                    LogLevel::Debug,
536
0
                    format!("json-rpc-connection-error; address={socket_address}, error={error}"),
537
0
                );
538
0
            }
539
        }
540
0
    };
Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0B5_
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0B5_
541
542
0
    tasks_executor(Box::pin(async move {
543
0
        run_future.await;
544
0
        num_json_rpc_clients.fetch_sub(1, Ordering::Release);
545
0
    }))
Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_tasks_0B5_
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_tasks_0B5_
546
0
}
Unexecuted instantiation: _RNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task
547
548
21
fn spawn_client_main_task(
549
21
    tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
550
21
    log_callback: Arc<dyn LogCallback + Send + Sync>,
551
21
    consensus_service: Arc<consensus_service::ConsensusService>,
552
21
    database: Arc<database_thread::DatabaseThread>,
553
21
    to_requests_handlers: async_channel::Sender<requests_handler::Message>,
554
21
    mut client_main_task: service::ClientMainTask,
555
21
) {
556
21
    let tasks_executor2 = tasks_executor.clone();
557
21
    tasks_executor2(Box::pin(async move {
558
21
        let mut chain_head_follow_subscriptions: hashbrown::HashMap<
559
21
            String,
560
21
            async_channel::Sender<chain_head_subscriptions::Message>,
561
21
            _,
562
21
        > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default());
563
564
        loop {
565
65
            match 
client_main_task.run_until_event()44
.await {
566
                service::Event::HandleRequest {
567
23
                    task,
568
23
                    request_process,
569
23
                } => {
570
23
                    client_main_task = task;
571
23
572
23
                    match request_process.request() {
573
                        methods::MethodCall::chainHead_v1_header {
574
0
                            follow_subscription,
575
                            ..
576
                        } => {
577
0
                            if let Some(follow_subscription) =
578
0
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
579
                            {
580
0
                                let _ = follow_subscription
581
0
                                    .send(chain_head_subscriptions::Message::Header {
582
0
                                        request: request_process,
583
0
                                    })
584
0
                                    .await;
585
                                // TODO racy; doesn't handle situation where follow subscription stops
586
0
                            } else {
587
0
                                request_process
588
0
                                    .respond(methods::Response::chainHead_v1_header(None));
589
0
                            }
590
                        }
591
                        methods::MethodCall::chainHead_v1_unpin {
592
0
                            follow_subscription,
593
0
                            hash_or_hashes,
594
                        } => {
595
0
                            if let Some(follow_subscription) =
596
0
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
597
                            {
598
0
                                let block_hashes = match hash_or_hashes {
599
0
                                    methods::HashHexStringSingleOrArray::Array(list) => {
600
0
                                        list.into_iter().map(|h| h.0).collect::<Vec<_>>()
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task00B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task00B7_
601
                                    }
602
0
                                    methods::HashHexStringSingleOrArray::Single(hash) => {
603
0
                                        vec![hash.0]
604
                                    }
605
                                };
606
607
0
                                let (outcome, outcome_rx) = oneshot::channel();
608
0
                                let _ = follow_subscription
609
0
                                    .send(chain_head_subscriptions::Message::Unpin {
610
0
                                        block_hashes,
611
0
                                        outcome,
612
0
                                    })
613
0
                                    .await;
614
615
0
                                match outcome_rx.await {
616
0
                                    Err(_) => {
617
0
                                        request_process
618
0
                                            .respond(methods::Response::chainHead_v1_unpin(()));
619
0
                                    }
620
0
                                    Ok(Ok(())) => {
621
0
                                        request_process
622
0
                                            .respond(methods::Response::chainHead_v1_unpin(()));
623
0
                                    }
624
0
                                    Ok(Err(())) => {
625
0
                                        request_process.fail(service::ErrorResponse::InvalidParams);
626
0
                                    }
627
                                }
628
0
                            }
629
                        }
630
                        _ => {
631
23
                            to_requests_handlers
632
23
                                .send(requests_handler::Message::Request(request_process))
633
0
                                .await
634
23
                                .unwrap();
635
                        }
636
                    }
637
                }
638
                service::Event::HandleSubscriptionStart {
639
0
                    task,
640
0
                    subscription_start,
641
0
                } => {
642
0
                    client_main_task = task;
643
0
644
0
                    match subscription_start.request() {
645
                        // TODO: enforce limit to number of subscriptions
646
0
                        methods::MethodCall::chainHead_v1_follow { with_runtime } => {
647
0
                            let (tx, rx) = async_channel::bounded(16);
648
0
                            let subscription_id =
649
0
                                chain_head_subscriptions::spawn_chain_head_subscription_task(
650
0
                                    chain_head_subscriptions::Config {
651
0
                                        tasks_executor: tasks_executor.clone(),
652
0
                                        log_callback: log_callback.clone(),
653
0
                                        receiver: rx,
654
0
                                        chain_head_follow_subscription: subscription_start,
655
0
                                        with_runtime,
656
0
                                        consensus_service: consensus_service.clone(),
657
0
                                        database: database.clone(),
658
0
                                    },
659
0
                                )
660
0
                                .await;
661
0
                            chain_head_follow_subscriptions.insert(subscription_id, tx);
662
                        }
663
                        _ => {
664
0
                            to_requests_handlers
665
0
                                .send(requests_handler::Message::SubscriptionStart(
666
0
                                    subscription_start,
667
0
                                ))
668
0
                                .await
669
0
                                .unwrap();
670
                        }
671
                    }
672
                }
673
                service::Event::SubscriptionDestroyed {
674
0
                    task,
675
0
                    subscription_id,
676
0
                    ..
677
0
                } => {
678
0
                    let _ = chain_head_follow_subscriptions.remove(&subscription_id);
679
0
                    client_main_task = task;
680
0
                }
681
                service::Event::SerializedRequestsIoClosed => {
682
                    // JSON-RPC client has disconnected.
683
21
                    return;
684
21
                }
685
21
            }
686
21
        }
687
21
    }));
_RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task0B5_
Line
Count
Source
557
21
    tasks_executor2(Box::pin(async move {
558
21
        let mut chain_head_follow_subscriptions: hashbrown::HashMap<
559
21
            String,
560
21
            async_channel::Sender<chain_head_subscriptions::Message>,
561
21
            _,
562
21
        > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default());
563
564
        loop {
565
65
            match 
client_main_task.run_until_event()44
.await {
566
                service::Event::HandleRequest {
567
23
                    task,
568
23
                    request_process,
569
23
                } => {
570
23
                    client_main_task = task;
571
23
572
23
                    match request_process.request() {
573
                        methods::MethodCall::chainHead_v1_header {
574
0
                            follow_subscription,
575
                            ..
576
                        } => {
577
0
                            if let Some(follow_subscription) =
578
0
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
579
                            {
580
0
                                let _ = follow_subscription
581
0
                                    .send(chain_head_subscriptions::Message::Header {
582
0
                                        request: request_process,
583
0
                                    })
584
0
                                    .await;
585
                                // TODO racy; doesn't handle situation where follow subscription stops
586
0
                            } else {
587
0
                                request_process
588
0
                                    .respond(methods::Response::chainHead_v1_header(None));
589
0
                            }
590
                        }
591
                        methods::MethodCall::chainHead_v1_unpin {
592
0
                            follow_subscription,
593
0
                            hash_or_hashes,
594
                        } => {
595
0
                            if let Some(follow_subscription) =
596
0
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
597
                            {
598
0
                                let block_hashes = match hash_or_hashes {
599
0
                                    methods::HashHexStringSingleOrArray::Array(list) => {
600
0
                                        list.into_iter().map(|h| h.0).collect::<Vec<_>>()
601
                                    }
602
0
                                    methods::HashHexStringSingleOrArray::Single(hash) => {
603
0
                                        vec![hash.0]
604
                                    }
605
                                };
606
607
0
                                let (outcome, outcome_rx) = oneshot::channel();
608
0
                                let _ = follow_subscription
609
0
                                    .send(chain_head_subscriptions::Message::Unpin {
610
0
                                        block_hashes,
611
0
                                        outcome,
612
0
                                    })
613
0
                                    .await;
614
615
0
                                match outcome_rx.await {
616
0
                                    Err(_) => {
617
0
                                        request_process
618
0
                                            .respond(methods::Response::chainHead_v1_unpin(()));
619
0
                                    }
620
0
                                    Ok(Ok(())) => {
621
0
                                        request_process
622
0
                                            .respond(methods::Response::chainHead_v1_unpin(()));
623
0
                                    }
624
0
                                    Ok(Err(())) => {
625
0
                                        request_process.fail(service::ErrorResponse::InvalidParams);
626
0
                                    }
627
                                }
628
0
                            }
629
                        }
630
                        _ => {
631
23
                            to_requests_handlers
632
23
                                .send(requests_handler::Message::Request(request_process))
633
0
                                .await
634
23
                                .unwrap();
635
                        }
636
                    }
637
                }
638
                service::Event::HandleSubscriptionStart {
639
0
                    task,
640
0
                    subscription_start,
641
0
                } => {
642
0
                    client_main_task = task;
643
0
644
0
                    match subscription_start.request() {
645
                        // TODO: enforce limit to number of subscriptions
646
0
                        methods::MethodCall::chainHead_v1_follow { with_runtime } => {
647
0
                            let (tx, rx) = async_channel::bounded(16);
648
0
                            let subscription_id =
649
0
                                chain_head_subscriptions::spawn_chain_head_subscription_task(
650
0
                                    chain_head_subscriptions::Config {
651
0
                                        tasks_executor: tasks_executor.clone(),
652
0
                                        log_callback: log_callback.clone(),
653
0
                                        receiver: rx,
654
0
                                        chain_head_follow_subscription: subscription_start,
655
0
                                        with_runtime,
656
0
                                        consensus_service: consensus_service.clone(),
657
0
                                        database: database.clone(),
658
0
                                    },
659
0
                                )
660
0
                                .await;
661
0
                            chain_head_follow_subscriptions.insert(subscription_id, tx);
662
                        }
663
                        _ => {
664
0
                            to_requests_handlers
665
0
                                .send(requests_handler::Message::SubscriptionStart(
666
0
                                    subscription_start,
667
0
                                ))
668
0
                                .await
669
0
                                .unwrap();
670
                        }
671
                    }
672
                }
673
                service::Event::SubscriptionDestroyed {
674
0
                    task,
675
0
                    subscription_id,
676
0
                    ..
677
0
                } => {
678
0
                    let _ = chain_head_follow_subscriptions.remove(&subscription_id);
679
0
                    client_main_task = task;
680
0
                }
681
                service::Event::SerializedRequestsIoClosed => {
682
                    // JSON-RPC client has disconnected.
683
21
                    return;
684
21
                }
685
21
            }
686
21
        }
687
21
    }));
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task0B5_
688
21
}
_RNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task
Line
Count
Source
548
21
fn spawn_client_main_task(
549
21
    tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
550
21
    log_callback: Arc<dyn LogCallback + Send + Sync>,
551
21
    consensus_service: Arc<consensus_service::ConsensusService>,
552
21
    database: Arc<database_thread::DatabaseThread>,
553
21
    to_requests_handlers: async_channel::Sender<requests_handler::Message>,
554
21
    mut client_main_task: service::ClientMainTask,
555
21
) {
556
21
    let tasks_executor2 = tasks_executor.clone();
557
21
    tasks_executor2(Box::pin(async move {
558
        let mut chain_head_follow_subscriptions: hashbrown::HashMap<
559
            String,
560
            async_channel::Sender<chain_head_subscriptions::Message>,
561
            _,
562
        > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default());
563
564
        loop {
565
            match client_main_task.run_until_event().await {
566
                service::Event::HandleRequest {
567
                    task,
568
                    request_process,
569
                } => {
570
                    client_main_task = task;
571
572
                    match request_process.request() {
573
                        methods::MethodCall::chainHead_v1_header {
574
                            follow_subscription,
575
                            ..
576
                        } => {
577
                            if let Some(follow_subscription) =
578
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
579
                            {
580
                                let _ = follow_subscription
581
                                    .send(chain_head_subscriptions::Message::Header {
582
                                        request: request_process,
583
                                    })
584
                                    .await;
585
                                // TODO racy; doesn't handle situation where follow subscription stops
586
                            } else {
587
                                request_process
588
                                    .respond(methods::Response::chainHead_v1_header(None));
589
                            }
590
                        }
591
                        methods::MethodCall::chainHead_v1_unpin {
592
                            follow_subscription,
593
                            hash_or_hashes,
594
                        } => {
595
                            if let Some(follow_subscription) =
596
                                chain_head_follow_subscriptions.get_mut(&*follow_subscription)
597
                            {
598
                                let block_hashes = match hash_or_hashes {
599
                                    methods::HashHexStringSingleOrArray::Array(list) => {
600
                                        list.into_iter().map(|h| h.0).collect::<Vec<_>>()
601
                                    }
602
                                    methods::HashHexStringSingleOrArray::Single(hash) => {
603
                                        vec![hash.0]
604
                                    }
605
                                };
606
607
                                let (outcome, outcome_rx) = oneshot::channel();
608
                                let _ = follow_subscription
609
                                    .send(chain_head_subscriptions::Message::Unpin {
610
                                        block_hashes,
611
                                        outcome,
612
                                    })
613
                                    .await;
614
615
                                match outcome_rx.await {
616
                                    Err(_) => {
617
                                        request_process
618
                                            .respond(methods::Response::chainHead_v1_unpin(()));
619
                                    }
620
                                    Ok(Ok(())) => {
621
                                        request_process
622
                                            .respond(methods::Response::chainHead_v1_unpin(()));
623
                                    }
624
                                    Ok(Err(())) => {
625
                                        request_process.fail(service::ErrorResponse::InvalidParams);
626
                                    }
627
                                }
628
                            }
629
                        }
630
                        _ => {
631
                            to_requests_handlers
632
                                .send(requests_handler::Message::Request(request_process))
633
                                .await
634
                                .unwrap();
635
                        }
636
                    }
637
                }
638
                service::Event::HandleSubscriptionStart {
639
                    task,
640
                    subscription_start,
641
                } => {
642
                    client_main_task = task;
643
644
                    match subscription_start.request() {
645
                        // TODO: enforce limit to number of subscriptions
646
                        methods::MethodCall::chainHead_v1_follow { with_runtime } => {
647
                            let (tx, rx) = async_channel::bounded(16);
648
                            let subscription_id =
649
                                chain_head_subscriptions::spawn_chain_head_subscription_task(
650
                                    chain_head_subscriptions::Config {
651
                                        tasks_executor: tasks_executor.clone(),
652
                                        log_callback: log_callback.clone(),
653
                                        receiver: rx,
654
                                        chain_head_follow_subscription: subscription_start,
655
                                        with_runtime,
656
                                        consensus_service: consensus_service.clone(),
657
                                        database: database.clone(),
658
                                    },
659
                                )
660
                                .await;
661
                            chain_head_follow_subscriptions.insert(subscription_id, tx);
662
                        }
663
                        _ => {
664
                            to_requests_handlers
665
                                .send(requests_handler::Message::SubscriptionStart(
666
                                    subscription_start,
667
                                ))
668
                                .await
669
                                .unwrap();
670
                        }
671
                    }
672
                }
673
                service::Event::SubscriptionDestroyed {
674
                    task,
675
                    subscription_id,
676
                    ..
677
                } => {
678
                    let _ = chain_head_follow_subscriptions.remove(&subscription_id);
679
                    client_main_task = task;
680
                }
681
                service::Event::SerializedRequestsIoClosed => {
682
                    // JSON-RPC client has disconnected.
683
                    return;
684
                }
685
            }
686
        }
687
21
    }));
688
21
}
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task