/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{consensus_service, database_thread, network_service, LogCallback, LogLevel}; |
19 | | use futures_channel::oneshot; |
20 | | use futures_util::FutureExt; |
21 | | use smol::{ |
22 | | future, |
23 | | net::{TcpListener, TcpStream}, |
24 | | }; |
25 | | use smoldot::json_rpc::{methods, service}; |
26 | | use std::{ |
27 | | future::Future, |
28 | | io, mem, |
29 | | net::SocketAddr, |
30 | | num::{NonZeroU32, NonZeroUsize}, |
31 | | pin::Pin, |
32 | | sync::{ |
33 | | atomic::{AtomicU32, Ordering}, |
34 | | Arc, |
35 | | }, |
36 | | time::Duration, |
37 | | }; |
38 | | |
39 | | mod chain_head_subscriptions; |
40 | | mod legacy_api_subscriptions; |
41 | | mod requests_handler; |
42 | | mod runtime_caches_service; |
43 | | |
44 | | /// Configuration for a [`JsonRpcService`]. |
45 | | pub struct Config { |
46 | | /// Function that can be used to spawn background tasks. |
47 | | /// |
48 | | /// The tasks passed as parameter must be executed until they shut down. |
49 | | pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
50 | | |
51 | | /// Function called in order to notify of something. |
52 | | pub log_callback: Arc<dyn LogCallback + Send + Sync>, |
53 | | |
54 | | /// Database to access blocks. |
55 | | pub database: Arc<database_thread::DatabaseThread>, |
56 | | |
57 | | /// Access to the network, and identifier of the chain from the point of view of the network |
58 | | /// service. |
59 | | pub network_service: ( |
60 | | Arc<network_service::NetworkService>, |
61 | | network_service::ChainId, |
62 | | ), |
63 | | |
64 | | /// Where to bind the WebSocket server. If `None`, no TCP server is started. |
65 | | pub bind_address: Option<SocketAddr>, |
66 | | |
67 | | /// Maximum number of requests to process in parallel. |
68 | | pub max_parallel_requests: u32, |
69 | | |
70 | | /// Maximum number of JSON-RPC clients until new ones are rejected. |
71 | | pub max_json_rpc_clients: u32, |
72 | | |
73 | | /// Name of the chain, as found in the chain specification. |
74 | | pub chain_name: String, |
75 | | |
76 | | /// Type of the chain, as found in the chain specification. |
77 | | pub chain_type: String, |
78 | | |
79 | | /// JSON-encoded properties of the chain, as found in the chain specification. |
80 | | pub chain_properties_json: String, |
81 | | |
82 | | /// Whether the chain is a live network. Found in the chain specification. |
83 | | pub chain_is_live: bool, |
84 | | |
85 | | /// Hash of the genesis block. |
86 | | // TODO: load from database maybe? |
87 | | pub genesis_block_hash: [u8; 32], |
88 | | |
89 | | /// Consensus service of the chain. |
90 | | pub consensus_service: Arc<consensus_service::ConsensusService>, |
91 | | } |
92 | | |
93 | | /// Running JSON-RPC service. |
94 | | /// |
95 | | /// If [`Config::bind_address`] is `Some`, holds a TCP server open for as long as it is alive. |
96 | | /// |
97 | | /// In addition to a TCP/IP server, this service also provides a virtual JSON-RPC endpoint that |
98 | | /// can be used through [`JsonRpcService::send_request`] and [`JsonRpcService::next_response`]. |
99 | | pub struct JsonRpcService { |
100 | | /// This events listener is notified when the service is dropped. |
101 | | service_dropped: event_listener::Event, |
102 | | |
103 | | /// Address the server is listening on. Not necessarily equal to [`Config::bind_address`]. |
104 | | listen_addr: Option<SocketAddr>, |
105 | | |
106 | | /// I/O for the virtual endpoint. |
107 | | virtual_client_io: service::SerializedRequestsIo, |
108 | | } |
109 | | |
110 | | impl Drop for JsonRpcService { |
111 | 21 | fn drop(&mut self) { |
112 | 21 | self.service_dropped.notify(usize::MAX); |
113 | 21 | } _RNvXNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB2_14JsonRpcServiceNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Line | Count | Source | 111 | 21 | fn drop(&mut self) { | 112 | 21 | self.service_dropped.notify(usize::MAX); | 113 | 21 | } |
Unexecuted instantiation: _RNvXNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB2_14JsonRpcServiceNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop |
114 | | } |
115 | | |
116 | | impl JsonRpcService { |
117 | | /// Initializes a new [`JsonRpcService`]. |
118 | 21 | pub async fn new(config: Config) -> Result<Self, InitError> { _RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService3new Line | Count | Source | 118 | 21 | pub async fn new(config: Config) -> Result<Self, InitError> { |
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService3new |
119 | 21 | let (tcp_listener, listen_addr) = match &config.bind_address { |
120 | 0 | Some(addr) => match TcpListener::bind(addr).await { |
121 | 0 | Ok(listener) => { |
122 | 0 | let listen_addr = match listener.local_addr() { |
123 | 0 | Ok(addr) => addr, |
124 | 0 | Err(error) => { |
125 | 0 | return Err(InitError::ListenError { |
126 | 0 | bind_address: *addr, |
127 | 0 | error, |
128 | 0 | }) |
129 | | } |
130 | | }; |
131 | | |
132 | 0 | (Some(listener), Some(listen_addr)) |
133 | | } |
134 | 0 | Err(error) => { |
135 | 0 | return Err(InitError::ListenError { |
136 | 0 | bind_address: *addr, |
137 | 0 | error, |
138 | 0 | }) |
139 | | } |
140 | | }, |
141 | 21 | None => (None, None), |
142 | | }; |
143 | | |
144 | 21 | let service_dropped = event_listener::Event::new(); |
145 | 21 | let on_service_dropped = service_dropped.listen(); |
146 | 21 | |
147 | 21 | let (to_requests_handlers, from_background) = async_channel::bounded(8); |
148 | 21 | |
149 | 21 | let (virtual_client_main_task, virtual_client_io) = |
150 | 21 | service::client_main_task(service::Config { |
151 | 21 | max_active_subscriptions: u32::MAX, |
152 | 21 | max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(), |
153 | 21 | }); |
154 | 21 | |
155 | 21 | spawn_client_main_task( |
156 | 21 | config.tasks_executor.clone(), |
157 | 21 | config.log_callback.clone(), |
158 | 21 | config.consensus_service.clone(), |
159 | 21 | config.database.clone(), |
160 | 21 | to_requests_handlers.clone(), |
161 | 21 | virtual_client_main_task, |
162 | 21 | ); |
163 | 21 | |
164 | 21 | let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new( |
165 | 21 | runtime_caches_service::Config { |
166 | 21 | tasks_executor: config.tasks_executor.clone(), |
167 | 21 | log_callback: config.log_callback.clone(), |
168 | 21 | database: config.database.clone(), |
169 | 21 | num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable? |
170 | 21 | }, |
171 | 21 | )); |
172 | 21 | |
173 | 672 | for _ in 0..config.max_parallel_requests { |
174 | 672 | requests_handler::spawn_requests_handler(requests_handler::Config { |
175 | 672 | tasks_executor: config.tasks_executor.clone(), |
176 | 672 | log_callback: config.log_callback.clone(), |
177 | 672 | database: config.database.clone(), |
178 | 672 | network_service: config.network_service.clone(), |
179 | 672 | receiver: from_background.clone(), |
180 | 672 | chain_name: config.chain_name.clone(), |
181 | 672 | chain_type: config.chain_type.clone(), |
182 | 672 | chain_properties_json: config.chain_properties_json.clone(), |
183 | 672 | chain_is_live: config.chain_is_live, |
184 | 672 | genesis_block_hash: config.genesis_block_hash, |
185 | 672 | consensus_service: config.consensus_service.clone(), |
186 | 672 | runtime_caches_service: runtime_caches_service.clone(), |
187 | 672 | }); |
188 | 672 | } |
189 | | |
190 | 21 | if let Some(tcp_listener0 ) = tcp_listener { |
191 | 0 | let background = JsonRpcBackground { |
192 | 0 | tcp_listener, |
193 | 0 | on_service_dropped, |
194 | 0 | tasks_executor: config.tasks_executor.clone(), |
195 | 0 | log_callback: config.log_callback, |
196 | 0 | consensus_service: config.consensus_service.clone(), |
197 | 0 | database: config.database.clone(), |
198 | 0 | to_requests_handlers, |
199 | 0 | num_json_rpc_clients: Arc::new(AtomicU32::new(0)), |
200 | 0 | max_json_rpc_clients: config.max_json_rpc_clients, |
201 | 0 | }; |
202 | 0 |
|
203 | 0 | (config.tasks_executor)(Box::pin(async move { background.run().await })); Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00Ba_ Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB8_14JsonRpcService3new00Ba_ |
204 | 21 | } |
205 | | |
206 | 21 | Ok(JsonRpcService { |
207 | 21 | service_dropped, |
208 | 21 | listen_addr, |
209 | 21 | virtual_client_io, |
210 | 21 | }) |
211 | 21 | } _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 118 | 2 | pub async fn new(config: Config) -> Result<Self, InitError> { | 119 | 2 | let (tcp_listener, listen_addr) = match &config.bind_address { | 120 | 0 | Some(addr) => match TcpListener::bind(addr).await { | 121 | 0 | Ok(listener) => { | 122 | 0 | let listen_addr = match listener.local_addr() { | 123 | 0 | Ok(addr) => addr, | 124 | 0 | Err(error) => { | 125 | 0 | return Err(InitError::ListenError { | 126 | 0 | bind_address: *addr, | 127 | 0 | error, | 128 | 0 | }) | 129 | | } | 130 | | }; | 131 | | | 132 | 0 | (Some(listener), Some(listen_addr)) | 133 | | } | 134 | 0 | Err(error) => { | 135 | 0 | return Err(InitError::ListenError { | 136 | 0 | bind_address: *addr, | 137 | 0 | error, | 138 | 0 | }) | 139 | | } | 140 | | }, | 141 | 2 | None => (None, None), | 142 | | }; | 143 | | | 144 | 2 | let service_dropped = event_listener::Event::new(); | 145 | 2 | let on_service_dropped = service_dropped.listen(); | 146 | 2 | | 147 | 2 | let (to_requests_handlers, from_background) = async_channel::bounded(8); | 148 | 2 | | 149 | 2 | let (virtual_client_main_task, virtual_client_io) = | 150 | 2 | service::client_main_task(service::Config { | 151 | 2 | max_active_subscriptions: u32::MAX, | 152 | 2 | max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(), | 153 | 2 | }); | 154 | 2 | | 155 | 2 | spawn_client_main_task( | 156 | 2 | config.tasks_executor.clone(), | 157 | 2 | config.log_callback.clone(), | 158 | 2 | config.consensus_service.clone(), | 159 | 2 | config.database.clone(), | 160 | 2 | to_requests_handlers.clone(), | 161 | 2 | virtual_client_main_task, | 162 | 2 | ); | 163 | 2 | | 164 | 2 | let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new( | 165 | 2 | runtime_caches_service::Config { | 166 | 2 | tasks_executor: config.tasks_executor.clone(), | 167 | 2 | log_callback: config.log_callback.clone(), | 168 | 2 | database: config.database.clone(), | 169 | 2 | num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable? | 170 | 2 | }, | 171 | 2 | )); | 172 | 2 | | 173 | 64 | for _ in 0..config.max_parallel_requests { | 174 | 64 | requests_handler::spawn_requests_handler(requests_handler::Config { | 175 | 64 | tasks_executor: config.tasks_executor.clone(), | 176 | 64 | log_callback: config.log_callback.clone(), | 177 | 64 | database: config.database.clone(), | 178 | 64 | network_service: config.network_service.clone(), | 179 | 64 | receiver: from_background.clone(), | 180 | 64 | chain_name: config.chain_name.clone(), | 181 | 64 | chain_type: config.chain_type.clone(), | 182 | 64 | chain_properties_json: config.chain_properties_json.clone(), | 183 | 64 | chain_is_live: config.chain_is_live, | 184 | 64 | genesis_block_hash: config.genesis_block_hash, | 185 | 64 | consensus_service: config.consensus_service.clone(), | 186 | 64 | runtime_caches_service: runtime_caches_service.clone(), | 187 | 64 | }); | 188 | 64 | } | 189 | | | 190 | 2 | if let Some(tcp_listener0 ) = tcp_listener { | 191 | 0 | let background = JsonRpcBackground { | 192 | 0 | tcp_listener, | 193 | 0 | on_service_dropped, | 194 | 0 | tasks_executor: config.tasks_executor.clone(), | 195 | 0 | log_callback: config.log_callback, | 196 | 0 | consensus_service: config.consensus_service.clone(), | 197 | 0 | database: config.database.clone(), | 198 | 0 | to_requests_handlers, | 199 | 0 | num_json_rpc_clients: Arc::new(AtomicU32::new(0)), | 200 | 0 | max_json_rpc_clients: config.max_json_rpc_clients, | 201 | 0 | }; | 202 | 0 |
| 203 | 0 | (config.tasks_executor)(Box::pin(async move { background.run().await })); | 204 | 2 | } | 205 | | | 206 | 2 | Ok(JsonRpcService { | 207 | 2 | service_dropped, | 208 | 2 | listen_addr, | 209 | 2 | virtual_client_io, | 210 | 2 | }) | 211 | 2 | } |
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0B8_ Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CscDgN54JpMGG_6author _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 118 | 19 | pub async fn new(config: Config) -> Result<Self, InitError> { | 119 | 19 | let (tcp_listener, listen_addr) = match &config.bind_address { | 120 | 0 | Some(addr) => match TcpListener::bind(addr).await { | 121 | 0 | Ok(listener) => { | 122 | 0 | let listen_addr = match listener.local_addr() { | 123 | 0 | Ok(addr) => addr, | 124 | 0 | Err(error) => { | 125 | 0 | return Err(InitError::ListenError { | 126 | 0 | bind_address: *addr, | 127 | 0 | error, | 128 | 0 | }) | 129 | | } | 130 | | }; | 131 | | | 132 | 0 | (Some(listener), Some(listen_addr)) | 133 | | } | 134 | 0 | Err(error) => { | 135 | 0 | return Err(InitError::ListenError { | 136 | 0 | bind_address: *addr, | 137 | 0 | error, | 138 | 0 | }) | 139 | | } | 140 | | }, | 141 | 19 | None => (None, None), | 142 | | }; | 143 | | | 144 | 19 | let service_dropped = event_listener::Event::new(); | 145 | 19 | let on_service_dropped = service_dropped.listen(); | 146 | 19 | | 147 | 19 | let (to_requests_handlers, from_background) = async_channel::bounded(8); | 148 | 19 | | 149 | 19 | let (virtual_client_main_task, virtual_client_io) = | 150 | 19 | service::client_main_task(service::Config { | 151 | 19 | max_active_subscriptions: u32::MAX, | 152 | 19 | max_pending_requests: NonZeroU32::new(u32::MAX).unwrap(), | 153 | 19 | }); | 154 | 19 | | 155 | 19 | spawn_client_main_task( | 156 | 19 | config.tasks_executor.clone(), | 157 | 19 | config.log_callback.clone(), | 158 | 19 | config.consensus_service.clone(), | 159 | 19 | config.database.clone(), | 160 | 19 | to_requests_handlers.clone(), | 161 | 19 | virtual_client_main_task, | 162 | 19 | ); | 163 | 19 | | 164 | 19 | let runtime_caches_service = Arc::new(runtime_caches_service::RuntimeCachesService::new( | 165 | 19 | runtime_caches_service::Config { | 166 | 19 | tasks_executor: config.tasks_executor.clone(), | 167 | 19 | log_callback: config.log_callback.clone(), | 168 | 19 | database: config.database.clone(), | 169 | 19 | num_cache_entries: NonZeroUsize::new(16).unwrap(), // TODO: configurable? | 170 | 19 | }, | 171 | 19 | )); | 172 | 19 | | 173 | 608 | for _ in 0..config.max_parallel_requests { | 174 | 608 | requests_handler::spawn_requests_handler(requests_handler::Config { | 175 | 608 | tasks_executor: config.tasks_executor.clone(), | 176 | 608 | log_callback: config.log_callback.clone(), | 177 | 608 | database: config.database.clone(), | 178 | 608 | network_service: config.network_service.clone(), | 179 | 608 | receiver: from_background.clone(), | 180 | 608 | chain_name: config.chain_name.clone(), | 181 | 608 | chain_type: config.chain_type.clone(), | 182 | 608 | chain_properties_json: config.chain_properties_json.clone(), | 183 | 608 | chain_is_live: config.chain_is_live, | 184 | 608 | genesis_block_hash: config.genesis_block_hash, | 185 | 608 | consensus_service: config.consensus_service.clone(), | 186 | 608 | runtime_caches_service: runtime_caches_service.clone(), | 187 | 608 | }); | 188 | 608 | } | 189 | | | 190 | 19 | if let Some(tcp_listener0 ) = tcp_listener { | 191 | 0 | let background = JsonRpcBackground { | 192 | 0 | tcp_listener, | 193 | 0 | on_service_dropped, | 194 | 0 | tasks_executor: config.tasks_executor.clone(), | 195 | 0 | log_callback: config.log_callback, | 196 | 0 | consensus_service: config.consensus_service.clone(), | 197 | 0 | database: config.database.clone(), | 198 | 0 | to_requests_handlers, | 199 | 0 | num_json_rpc_clients: Arc::new(AtomicU32::new(0)), | 200 | 0 | max_json_rpc_clients: config.max_json_rpc_clients, | 201 | 0 | }; | 202 | 0 |
| 203 | 0 | (config.tasks_executor)(Box::pin(async move { background.run().await })); | 204 | 19 | } | 205 | | | 206 | 19 | Ok(JsonRpcService { | 207 | 19 | service_dropped, | 208 | 19 | listen_addr, | 209 | 19 | virtual_client_io, | 210 | 19 | }) | 211 | 19 | } |
Unexecuted instantiation: _RNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService3new0B8_ |
212 | | |
213 | | /// Returns the address the server is listening on. |
214 | | /// |
215 | | /// Returns `None` if and only if [`Config::bind_address`] was `None`. However, if `Some`, |
216 | | /// the address is not necessarily equal to the one in [`Config::bind_address`]. |
217 | 0 | pub fn listen_addr(&self) -> Option<SocketAddr> { |
218 | 0 | self.listen_addr |
219 | 0 | } Unexecuted instantiation: _RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService11listen_addr Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService11listen_addr |
220 | | |
221 | | /// Adds a JSON-RPC request to the queue of requests of the virtual endpoint. |
222 | | /// |
223 | | /// The virtual endpoint doesn't have any limit. |
224 | 25 | pub fn send_request(&self, request: String) { |
225 | 25 | match self.virtual_client_io.try_send_request(request) { |
226 | 25 | Ok(()) => (), |
227 | 0 | Err(err) => match err.cause { |
228 | | service::TrySendRequestErrorCause::TooManyPendingRequests |
229 | 0 | | service::TrySendRequestErrorCause::ClientMainTaskDestroyed => unreachable!(), |
230 | | }, |
231 | | } |
232 | 25 | } _RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService12send_request Line | Count | Source | 224 | 25 | pub fn send_request(&self, request: String) { | 225 | 25 | match self.virtual_client_io.try_send_request(request) { | 226 | 25 | Ok(()) => (), | 227 | 0 | Err(err) => match err.cause { | 228 | | service::TrySendRequestErrorCause::TooManyPendingRequests | 229 | 0 | | service::TrySendRequestErrorCause::ClientMainTaskDestroyed => unreachable!(), | 230 | | }, | 231 | | } | 232 | 25 | } |
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService12send_request |
233 | | |
234 | | /// Returns the new JSON-RPC response or notification for requests sent using |
235 | | /// [`JsonRpcService::send_request`]. |
236 | | /// |
237 | | /// If this function is called multiple times simultaneously, only one invocation will receive |
238 | | /// each response. Which one is unspecified. |
239 | 25 | pub async fn next_response(&self) -> String { _RNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService13next_response Line | Count | Source | 239 | 25 | pub async fn next_response(&self) -> String { |
Unexecuted instantiation: _RNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB4_14JsonRpcService13next_response |
240 | 25 | match self.virtual_client_io.wait_next_response().await14 { |
241 | 25 | Ok(r) => r, |
242 | 0 | Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(), |
243 | | } |
244 | 25 | } _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 239 | 2 | pub async fn next_response(&self) -> String { | 240 | 2 | match self.virtual_client_io.wait_next_response().await1 { | 241 | 2 | Ok(r) => r, | 242 | 0 | Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(), | 243 | | } | 244 | 2 | } |
Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0B8_ Unexecuted instantiation: _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CscDgN54JpMGG_6author _RNCNvMs_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 239 | 23 | pub async fn next_response(&self) -> String { | 240 | 23 | match self.virtual_client_io.wait_next_response().await13 { | 241 | 23 | Ok(r) => r, | 242 | 0 | Err(service::WaitNextResponseError::ClientMainTaskDestroyed) => unreachable!(), | 243 | | } | 244 | 23 | } |
Unexecuted instantiation: _RNCNvMs_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB6_14JsonRpcService13next_response0B8_ |
245 | | } |
246 | | |
247 | | /// Error potentially returned by [`JsonRpcService::new`]. |
248 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXs2_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs2_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
249 | | pub enum InitError { |
250 | | /// Failed to listen on the server address. |
251 | | #[display(fmt = "Failed to listen on TCP address {bind_address}: {error}")] |
252 | | ListenError { |
253 | | /// Address that was attempted. |
254 | | bind_address: SocketAddr, |
255 | | /// Error returned by the operating system. |
256 | | error: io::Error, |
257 | | }, |
258 | | } |
259 | | |
260 | | struct JsonRpcBackground { |
261 | | /// TCP listener for new incoming connections. |
262 | | tcp_listener: TcpListener, |
263 | | |
264 | | /// Event notified when the frontend is dropped. |
265 | | on_service_dropped: event_listener::EventListener, |
266 | | |
267 | | /// See [`Config::tasks_executor`]. |
268 | | tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
269 | | |
270 | | /// See [`Config::log_callback`]. |
271 | | log_callback: Arc<dyn LogCallback + Send + Sync>, |
272 | | |
273 | | /// Database to access blocks. |
274 | | database: Arc<database_thread::DatabaseThread>, |
275 | | |
276 | | /// Consensus service of the chain. |
277 | | consensus_service: Arc<consensus_service::ConsensusService>, |
278 | | |
279 | | /// Channel used to send requests to the tasks that process said requests. |
280 | | to_requests_handlers: async_channel::Sender<requests_handler::Message>, |
281 | | |
282 | | /// Number of clients currently alive. |
283 | | num_json_rpc_clients: Arc<AtomicU32>, |
284 | | |
285 | | /// See [`Config::max_json_rpc_clients`]. |
286 | | max_json_rpc_clients: u32, |
287 | | } |
288 | | |
289 | | impl JsonRpcBackground { |
290 | 0 | async fn run(mut self) { Unexecuted instantiation: _RNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB5_17JsonRpcBackground3run Unexecuted instantiation: _RNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB5_17JsonRpcBackground3run |
291 | | loop { |
292 | 0 | let Some(accept_result) = future::or( |
293 | 0 | async { |
294 | 0 | (&mut self.on_service_dropped).await; |
295 | 0 | None |
296 | 0 | }, Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00Bb_ Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run00Bb_ |
297 | 0 | async { Some(self.tcp_listener.accept().await) }, Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0Bb_ Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s_0Bb_ |
298 | 0 | ) |
299 | 0 | .await |
300 | | else { |
301 | 0 | return; |
302 | | }; |
303 | | |
304 | 0 | let (tcp_socket, address) = match accept_result { |
305 | 0 | Ok(v) => v, |
306 | 0 | Err(error) => { |
307 | 0 | // Failing to accept an incoming TCP connection generally happens due to |
308 | 0 | // the limit of file descriptors being reached. |
309 | 0 | // Sleep a little bit and try again. |
310 | 0 | self.log_callback.log( |
311 | 0 | LogLevel::Warn, |
312 | 0 | format!("json-rpc-tcp-listener-error; error={error}"), |
313 | 0 | ); |
314 | 0 | smol::Timer::after(Duration::from_millis(50)).await; |
315 | 0 | continue; |
316 | | } |
317 | | }; |
318 | | |
319 | | // New incoming TCP connection. |
320 | | |
321 | | // Try to increase `num_json_rpc_clients`. Fails if the maximum is reached. |
322 | 0 | if self |
323 | 0 | .num_json_rpc_clients |
324 | 0 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { |
325 | 0 | if old_value < self.max_json_rpc_clients { |
326 | | // Considering that `old_value < max`, and `max` fits in a `u32` by |
327 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. |
328 | | // There's no risk of overflow. |
329 | 0 | Some(old_value + 1) |
330 | | } else { |
331 | 0 | None |
332 | | } |
333 | 0 | }) Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0Bb_ Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB9_17JsonRpcBackground3run0s0_0Bb_ |
334 | 0 | .is_err() |
335 | | { |
336 | | // Reject the socket without sending back anything. Sending back a status |
337 | | // code would require allocating resources for that socket, which we |
338 | | // specifically don't want to do. |
339 | 0 | self.log_callback.log( |
340 | 0 | LogLevel::Debug, |
341 | 0 | format!("json-rpc-incoming-connection-rejected; address={}", address), |
342 | 0 | ); |
343 | 0 | smol::Timer::after(Duration::from_millis(50)).await; |
344 | 0 | continue; |
345 | 0 | } |
346 | 0 |
|
347 | 0 | // Spawn two tasks: one for the socket I/O, and one to process requests. |
348 | 0 | self.log_callback.log( |
349 | 0 | LogLevel::Debug, |
350 | 0 | format!("json-rpc-incoming-connection; address={}", address), |
351 | 0 | ); |
352 | 0 | let (client_main_task, io) = service::client_main_task(service::Config { |
353 | 0 | max_active_subscriptions: 128, |
354 | 0 | max_pending_requests: NonZeroU32::new(64).unwrap(), |
355 | 0 | }); |
356 | 0 | spawn_client_io_task( |
357 | 0 | &self.tasks_executor, |
358 | 0 | self.log_callback.clone(), |
359 | 0 | tcp_socket, |
360 | 0 | address, |
361 | 0 | io, |
362 | 0 | self.num_json_rpc_clients.clone(), |
363 | 0 | ); |
364 | 0 | spawn_client_main_task( |
365 | 0 | self.tasks_executor.clone(), |
366 | 0 | self.log_callback.clone(), |
367 | 0 | self.consensus_service.clone(), |
368 | 0 | self.database.clone(), |
369 | 0 | self.to_requests_handlers.clone(), |
370 | 0 | client_main_task, |
371 | 0 | ); |
372 | | } |
373 | 0 | } Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0B9_ Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNvMs0_NtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNvMs0_NtCshBwayKnNXDT_17smoldot_full_node16json_rpc_serviceNtB7_17JsonRpcBackground3run0B9_ |
374 | | } |
375 | | |
376 | 0 | fn spawn_client_io_task( |
377 | 0 | tasks_executor: &Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
378 | 0 | log_callback: Arc<dyn LogCallback + Send + Sync>, |
379 | 0 | tcp_socket: TcpStream, |
380 | 0 | socket_address: SocketAddr, |
381 | 0 | io: service::SerializedRequestsIo, |
382 | 0 | num_json_rpc_clients: Arc<AtomicU32>, |
383 | 0 | ) { |
384 | 0 | let run_future = async move { |
385 | | // Perform the WebSocket handshake. |
386 | 0 | let (mut ws_sender, mut ws_receiver) = { |
387 | 0 | let mut ws_server = soketto::handshake::Server::new(tcp_socket); |
388 | | |
389 | | // TODO: enabling the `deflate` extension leads to "flate stream corrupted" errors |
390 | | //let deflate = soketto::extension::deflate::Deflate::new(soketto::Mode::Server); |
391 | | //ws_server.add_extension(Box::new(deflate)); |
392 | | |
393 | 0 | let key = match ws_server.receive_request().await { |
394 | 0 | Ok(req) => req.key(), |
395 | 0 | Err(error) => { |
396 | 0 | log_callback.log( |
397 | 0 | LogLevel::Debug, |
398 | 0 | format!( |
399 | 0 | "json-rpc-connection-error; address={socket_address}, error={error}" |
400 | 0 | ), |
401 | 0 | ); |
402 | 0 | return; |
403 | | } |
404 | | }; |
405 | | |
406 | 0 | let accept = soketto::handshake::server::Response::Accept { |
407 | 0 | key, |
408 | 0 | protocol: None, |
409 | 0 | }; |
410 | 0 |
|
411 | 0 | match ws_server.send_response(&accept).await { |
412 | 0 | Ok(()) => {} |
413 | 0 | Err(error) => { |
414 | 0 | log_callback.log( |
415 | 0 | LogLevel::Debug, |
416 | 0 | format!( |
417 | 0 | "json-rpc-connection-error; address={socket_address}, error={error}" |
418 | 0 | ), |
419 | 0 | ); |
420 | 0 | return; |
421 | | } |
422 | | } |
423 | | |
424 | 0 | ws_server.into_builder().finish() |
425 | 0 | }; |
426 | 0 |
|
427 | 0 | // Create a future responsible for pulling responses and sending them back. |
428 | 0 | let sending_future = async { |
429 | 0 | let mut must_flush_asap = false; |
430 | | |
431 | | loop { |
432 | | // If `must_flush_asap`, we simply peek for the next response but without awaiting. |
433 | | // If `!must_flush_asap`, we wait for as long as necessary. |
434 | 0 | let maybe_response = if must_flush_asap { |
435 | 0 | io.wait_next_response().now_or_never() |
436 | | } else { |
437 | 0 | Some(io.wait_next_response().await) |
438 | | }; |
439 | | |
440 | 0 | match maybe_response { |
441 | | None => { |
442 | 0 | if let Err(err) = ws_sender.flush().await { |
443 | 0 | break Err(err.to_string()); |
444 | 0 | } |
445 | 0 | must_flush_asap = false; |
446 | | } |
447 | 0 | Some(Ok(response)) => { |
448 | 0 | log_callback.log( |
449 | 0 | LogLevel::Debug, |
450 | 0 | format!( |
451 | 0 | "json-rpc-response; address={}; response={}", |
452 | 0 | socket_address, |
453 | 0 | crate::util::truncated_str( |
454 | 0 | response.chars().filter(|c| !c.is_control()), Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task000B9_ Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task000B9_ |
455 | 0 | 128 |
456 | 0 | ) |
457 | 0 | ), |
458 | 0 | ); |
459 | | |
460 | 0 | if let Err(err) = ws_sender.send_text_owned(response).await { |
461 | 0 | break Err(err.to_string()); |
462 | 0 | } |
463 | 0 | must_flush_asap = true; |
464 | | } |
465 | | Some(Err(service::WaitNextResponseError::ClientMainTaskDestroyed)) => { |
466 | | // The client main task never closes by itself but only as a consequence |
467 | | // to the I/O task closing. |
468 | 0 | unreachable!() |
469 | | } |
470 | | }; |
471 | | } |
472 | 0 | }; Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task00B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task00B7_ |
473 | | |
474 | | // Create a future responsible for pulling messages from the socket and sending them to |
475 | | // the main task. |
476 | 0 | let receiving_future = async { |
477 | 0 | let mut message = Vec::new(); |
478 | 0 | loop { |
479 | 0 | message.clear(); |
480 | 0 |
|
481 | 0 | match ws_receiver.receive_data(&mut message).await { |
482 | | Ok(soketto::Data::Binary(_)) => { |
483 | 0 | break Err("Unexpected binary frame".to_string()); |
484 | | } |
485 | 0 | Ok(soketto::Data::Text(_)) => {} // Handled below. |
486 | 0 | Err(soketto::connection::Error::Closed) => break Ok(()), |
487 | 0 | Err(err) => { |
488 | 0 | break Err(err.to_string()); |
489 | | } |
490 | | } |
491 | | |
492 | 0 | let request = match String::from_utf8(mem::take(&mut message)) { |
493 | 0 | Ok(r) => r, |
494 | 0 | Err(error) => { |
495 | 0 | break Err(format!("Non-UTF8 text frame: {error}")); |
496 | | } |
497 | | }; |
498 | | |
499 | 0 | log_callback.log( |
500 | 0 | LogLevel::Debug, |
501 | 0 | format!( |
502 | 0 | "json-rpc-request; address={}; request={}", |
503 | 0 | socket_address, |
504 | 0 | crate::util::truncated_str( |
505 | 0 | request.chars().filter(|c| !c.is_control()), Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_00B9_ Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_00B9_ |
506 | 0 | 128 |
507 | 0 | ) |
508 | 0 | ), |
509 | 0 | ); |
510 | 0 |
|
511 | 0 | match io.send_request(request).await { |
512 | 0 | Ok(()) => {} |
513 | | Err(service::SendRequestError { |
514 | | cause: service::SendRequestErrorCause::ClientMainTaskDestroyed, |
515 | | .. |
516 | | }) => { |
517 | | // The client main task never closes by itself but only as a |
518 | | // consequence to the I/O task closing. |
519 | 0 | unreachable!() |
520 | | } |
521 | | } |
522 | | } |
523 | 0 | }; Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0s_0B7_ |
524 | | |
525 | | // Run these two futures until completion. |
526 | 0 | match future::or(sending_future, receiving_future).await { |
527 | 0 | Ok(()) => { |
528 | 0 | log_callback.log( |
529 | 0 | LogLevel::Debug, |
530 | 0 | format!("json-rpc-connection-closed; address={socket_address}"), |
531 | 0 | ); |
532 | 0 | } |
533 | 0 | Err(error) => { |
534 | 0 | log_callback.log( |
535 | 0 | LogLevel::Debug, |
536 | 0 | format!("json-rpc-connection-error; address={socket_address}, error={error}"), |
537 | 0 | ); |
538 | 0 | } |
539 | | } |
540 | 0 | }; Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task0B5_ Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task0B5_ |
541 | | |
542 | 0 | tasks_executor(Box::pin(async move { |
543 | 0 | run_future.await; |
544 | 0 | num_json_rpc_clients.fetch_sub(1, Ordering::Release); |
545 | 0 | })) Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_tasks_0B5_ Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_tasks_0B5_ |
546 | 0 | } Unexecuted instantiation: _RNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service20spawn_client_io_task Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service20spawn_client_io_task |
547 | | |
548 | 21 | fn spawn_client_main_task( |
549 | 21 | tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
550 | 21 | log_callback: Arc<dyn LogCallback + Send + Sync>, |
551 | 21 | consensus_service: Arc<consensus_service::ConsensusService>, |
552 | 21 | database: Arc<database_thread::DatabaseThread>, |
553 | 21 | to_requests_handlers: async_channel::Sender<requests_handler::Message>, |
554 | 21 | mut client_main_task: service::ClientMainTask, |
555 | 21 | ) { |
556 | 21 | let tasks_executor2 = tasks_executor.clone(); |
557 | 21 | tasks_executor2(Box::pin(async move { |
558 | 21 | let mut chain_head_follow_subscriptions: hashbrown::HashMap< |
559 | 21 | String, |
560 | 21 | async_channel::Sender<chain_head_subscriptions::Message>, |
561 | 21 | _, |
562 | 21 | > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default()); |
563 | | |
564 | | loop { |
565 | 65 | match client_main_task.run_until_event()44 .await { |
566 | | service::Event::HandleRequest { |
567 | 23 | task, |
568 | 23 | request_process, |
569 | 23 | } => { |
570 | 23 | client_main_task = task; |
571 | 23 | |
572 | 23 | match request_process.request() { |
573 | | methods::MethodCall::chainHead_v1_header { |
574 | 0 | follow_subscription, |
575 | | .. |
576 | | } => { |
577 | 0 | if let Some(follow_subscription) = |
578 | 0 | chain_head_follow_subscriptions.get_mut(&*follow_subscription) |
579 | | { |
580 | 0 | let _ = follow_subscription |
581 | 0 | .send(chain_head_subscriptions::Message::Header { |
582 | 0 | request: request_process, |
583 | 0 | }) |
584 | 0 | .await; |
585 | | // TODO racy; doesn't handle situation where follow subscription stops |
586 | 0 | } else { |
587 | 0 | request_process |
588 | 0 | .respond(methods::Response::chainHead_v1_header(None)); |
589 | 0 | } |
590 | | } |
591 | | methods::MethodCall::chainHead_v1_unpin { |
592 | 0 | follow_subscription, |
593 | 0 | hash_or_hashes, |
594 | | } => { |
595 | 0 | if let Some(follow_subscription) = |
596 | 0 | chain_head_follow_subscriptions.get_mut(&*follow_subscription) |
597 | | { |
598 | 0 | let block_hashes = match hash_or_hashes { |
599 | 0 | methods::HashHexStringSingleOrArray::Array(list) => { |
600 | 0 | list.into_iter().map(|h| h.0).collect::<Vec<_>>() Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task00B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task00B7_ |
601 | | } |
602 | 0 | methods::HashHexStringSingleOrArray::Single(hash) => { |
603 | 0 | vec![hash.0] |
604 | | } |
605 | | }; |
606 | | |
607 | 0 | let (outcome, outcome_rx) = oneshot::channel(); |
608 | 0 | let _ = follow_subscription |
609 | 0 | .send(chain_head_subscriptions::Message::Unpin { |
610 | 0 | block_hashes, |
611 | 0 | outcome, |
612 | 0 | }) |
613 | 0 | .await; |
614 | | |
615 | 0 | match outcome_rx.await { |
616 | 0 | Err(_) => { |
617 | 0 | request_process |
618 | 0 | .respond(methods::Response::chainHead_v1_unpin(())); |
619 | 0 | } |
620 | 0 | Ok(Ok(())) => { |
621 | 0 | request_process |
622 | 0 | .respond(methods::Response::chainHead_v1_unpin(())); |
623 | 0 | } |
624 | 0 | Ok(Err(())) => { |
625 | 0 | request_process.fail(service::ErrorResponse::InvalidParams); |
626 | 0 | } |
627 | | } |
628 | 0 | } |
629 | | } |
630 | | _ => { |
631 | 23 | to_requests_handlers |
632 | 23 | .send(requests_handler::Message::Request(request_process)) |
633 | 0 | .await |
634 | 23 | .unwrap(); |
635 | | } |
636 | | } |
637 | | } |
638 | | service::Event::HandleSubscriptionStart { |
639 | 0 | task, |
640 | 0 | subscription_start, |
641 | 0 | } => { |
642 | 0 | client_main_task = task; |
643 | 0 |
|
644 | 0 | match subscription_start.request() { |
645 | | // TODO: enforce limit to number of subscriptions |
646 | 0 | methods::MethodCall::chainHead_v1_follow { with_runtime } => { |
647 | 0 | let (tx, rx) = async_channel::bounded(16); |
648 | 0 | let subscription_id = |
649 | 0 | chain_head_subscriptions::spawn_chain_head_subscription_task( |
650 | 0 | chain_head_subscriptions::Config { |
651 | 0 | tasks_executor: tasks_executor.clone(), |
652 | 0 | log_callback: log_callback.clone(), |
653 | 0 | receiver: rx, |
654 | 0 | chain_head_follow_subscription: subscription_start, |
655 | 0 | with_runtime, |
656 | 0 | consensus_service: consensus_service.clone(), |
657 | 0 | database: database.clone(), |
658 | 0 | }, |
659 | 0 | ) |
660 | 0 | .await; |
661 | 0 | chain_head_follow_subscriptions.insert(subscription_id, tx); |
662 | | } |
663 | | _ => { |
664 | 0 | to_requests_handlers |
665 | 0 | .send(requests_handler::Message::SubscriptionStart( |
666 | 0 | subscription_start, |
667 | 0 | )) |
668 | 0 | .await |
669 | 0 | .unwrap(); |
670 | | } |
671 | | } |
672 | | } |
673 | | service::Event::SubscriptionDestroyed { |
674 | 0 | task, |
675 | 0 | subscription_id, |
676 | 0 | .. |
677 | 0 | } => { |
678 | 0 | let _ = chain_head_follow_subscriptions.remove(&subscription_id); |
679 | 0 | client_main_task = task; |
680 | 0 | } |
681 | | service::Event::SerializedRequestsIoClosed => { |
682 | | // JSON-RPC client has disconnected. |
683 | 21 | return; |
684 | 21 | } |
685 | 21 | } |
686 | 21 | } |
687 | 21 | })); _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task0B5_ Line | Count | Source | 557 | 21 | tasks_executor2(Box::pin(async move { | 558 | 21 | let mut chain_head_follow_subscriptions: hashbrown::HashMap< | 559 | 21 | String, | 560 | 21 | async_channel::Sender<chain_head_subscriptions::Message>, | 561 | 21 | _, | 562 | 21 | > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default()); | 563 | | | 564 | | loop { | 565 | 65 | match client_main_task.run_until_event()44 .await { | 566 | | service::Event::HandleRequest { | 567 | 23 | task, | 568 | 23 | request_process, | 569 | 23 | } => { | 570 | 23 | client_main_task = task; | 571 | 23 | | 572 | 23 | match request_process.request() { | 573 | | methods::MethodCall::chainHead_v1_header { | 574 | 0 | follow_subscription, | 575 | | .. | 576 | | } => { | 577 | 0 | if let Some(follow_subscription) = | 578 | 0 | chain_head_follow_subscriptions.get_mut(&*follow_subscription) | 579 | | { | 580 | 0 | let _ = follow_subscription | 581 | 0 | .send(chain_head_subscriptions::Message::Header { | 582 | 0 | request: request_process, | 583 | 0 | }) | 584 | 0 | .await; | 585 | | // TODO racy; doesn't handle situation where follow subscription stops | 586 | 0 | } else { | 587 | 0 | request_process | 588 | 0 | .respond(methods::Response::chainHead_v1_header(None)); | 589 | 0 | } | 590 | | } | 591 | | methods::MethodCall::chainHead_v1_unpin { | 592 | 0 | follow_subscription, | 593 | 0 | hash_or_hashes, | 594 | | } => { | 595 | 0 | if let Some(follow_subscription) = | 596 | 0 | chain_head_follow_subscriptions.get_mut(&*follow_subscription) | 597 | | { | 598 | 0 | let block_hashes = match hash_or_hashes { | 599 | 0 | methods::HashHexStringSingleOrArray::Array(list) => { | 600 | 0 | list.into_iter().map(|h| h.0).collect::<Vec<_>>() | 601 | | } | 602 | 0 | methods::HashHexStringSingleOrArray::Single(hash) => { | 603 | 0 | vec![hash.0] | 604 | | } | 605 | | }; | 606 | | | 607 | 0 | let (outcome, outcome_rx) = oneshot::channel(); | 608 | 0 | let _ = follow_subscription | 609 | 0 | .send(chain_head_subscriptions::Message::Unpin { | 610 | 0 | block_hashes, | 611 | 0 | outcome, | 612 | 0 | }) | 613 | 0 | .await; | 614 | | | 615 | 0 | match outcome_rx.await { | 616 | 0 | Err(_) => { | 617 | 0 | request_process | 618 | 0 | .respond(methods::Response::chainHead_v1_unpin(())); | 619 | 0 | } | 620 | 0 | Ok(Ok(())) => { | 621 | 0 | request_process | 622 | 0 | .respond(methods::Response::chainHead_v1_unpin(())); | 623 | 0 | } | 624 | 0 | Ok(Err(())) => { | 625 | 0 | request_process.fail(service::ErrorResponse::InvalidParams); | 626 | 0 | } | 627 | | } | 628 | 0 | } | 629 | | } | 630 | | _ => { | 631 | 23 | to_requests_handlers | 632 | 23 | .send(requests_handler::Message::Request(request_process)) | 633 | 0 | .await | 634 | 23 | .unwrap(); | 635 | | } | 636 | | } | 637 | | } | 638 | | service::Event::HandleSubscriptionStart { | 639 | 0 | task, | 640 | 0 | subscription_start, | 641 | 0 | } => { | 642 | 0 | client_main_task = task; | 643 | 0 |
| 644 | 0 | match subscription_start.request() { | 645 | | // TODO: enforce limit to number of subscriptions | 646 | 0 | methods::MethodCall::chainHead_v1_follow { with_runtime } => { | 647 | 0 | let (tx, rx) = async_channel::bounded(16); | 648 | 0 | let subscription_id = | 649 | 0 | chain_head_subscriptions::spawn_chain_head_subscription_task( | 650 | 0 | chain_head_subscriptions::Config { | 651 | 0 | tasks_executor: tasks_executor.clone(), | 652 | 0 | log_callback: log_callback.clone(), | 653 | 0 | receiver: rx, | 654 | 0 | chain_head_follow_subscription: subscription_start, | 655 | 0 | with_runtime, | 656 | 0 | consensus_service: consensus_service.clone(), | 657 | 0 | database: database.clone(), | 658 | 0 | }, | 659 | 0 | ) | 660 | 0 | .await; | 661 | 0 | chain_head_follow_subscriptions.insert(subscription_id, tx); | 662 | | } | 663 | | _ => { | 664 | 0 | to_requests_handlers | 665 | 0 | .send(requests_handler::Message::SubscriptionStart( | 666 | 0 | subscription_start, | 667 | 0 | )) | 668 | 0 | .await | 669 | 0 | .unwrap(); | 670 | | } | 671 | | } | 672 | | } | 673 | | service::Event::SubscriptionDestroyed { | 674 | 0 | task, | 675 | 0 | subscription_id, | 676 | 0 | .. | 677 | 0 | } => { | 678 | 0 | let _ = chain_head_follow_subscriptions.remove(&subscription_id); | 679 | 0 | client_main_task = task; | 680 | 0 | } | 681 | | service::Event::SerializedRequestsIoClosed => { | 682 | | // JSON-RPC client has disconnected. | 683 | 21 | return; | 684 | 21 | } | 685 | 21 | } | 686 | 21 | } | 687 | 21 | })); |
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task0B5_ |
688 | 21 | } _RNvNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22spawn_client_main_task Line | Count | Source | 548 | 21 | fn spawn_client_main_task( | 549 | 21 | tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, | 550 | 21 | log_callback: Arc<dyn LogCallback + Send + Sync>, | 551 | 21 | consensus_service: Arc<consensus_service::ConsensusService>, | 552 | 21 | database: Arc<database_thread::DatabaseThread>, | 553 | 21 | to_requests_handlers: async_channel::Sender<requests_handler::Message>, | 554 | 21 | mut client_main_task: service::ClientMainTask, | 555 | 21 | ) { | 556 | 21 | let tasks_executor2 = tasks_executor.clone(); | 557 | 21 | tasks_executor2(Box::pin(async move { | 558 | | let mut chain_head_follow_subscriptions: hashbrown::HashMap< | 559 | | String, | 560 | | async_channel::Sender<chain_head_subscriptions::Message>, | 561 | | _, | 562 | | > = hashbrown::HashMap::with_capacity_and_hasher(2, fnv::FnvBuildHasher::default()); | 563 | | | 564 | | loop { | 565 | | match client_main_task.run_until_event().await { | 566 | | service::Event::HandleRequest { | 567 | | task, | 568 | | request_process, | 569 | | } => { | 570 | | client_main_task = task; | 571 | | | 572 | | match request_process.request() { | 573 | | methods::MethodCall::chainHead_v1_header { | 574 | | follow_subscription, | 575 | | .. | 576 | | } => { | 577 | | if let Some(follow_subscription) = | 578 | | chain_head_follow_subscriptions.get_mut(&*follow_subscription) | 579 | | { | 580 | | let _ = follow_subscription | 581 | | .send(chain_head_subscriptions::Message::Header { | 582 | | request: request_process, | 583 | | }) | 584 | | .await; | 585 | | // TODO racy; doesn't handle situation where follow subscription stops | 586 | | } else { | 587 | | request_process | 588 | | .respond(methods::Response::chainHead_v1_header(None)); | 589 | | } | 590 | | } | 591 | | methods::MethodCall::chainHead_v1_unpin { | 592 | | follow_subscription, | 593 | | hash_or_hashes, | 594 | | } => { | 595 | | if let Some(follow_subscription) = | 596 | | chain_head_follow_subscriptions.get_mut(&*follow_subscription) | 597 | | { | 598 | | let block_hashes = match hash_or_hashes { | 599 | | methods::HashHexStringSingleOrArray::Array(list) => { | 600 | | list.into_iter().map(|h| h.0).collect::<Vec<_>>() | 601 | | } | 602 | | methods::HashHexStringSingleOrArray::Single(hash) => { | 603 | | vec![hash.0] | 604 | | } | 605 | | }; | 606 | | | 607 | | let (outcome, outcome_rx) = oneshot::channel(); | 608 | | let _ = follow_subscription | 609 | | .send(chain_head_subscriptions::Message::Unpin { | 610 | | block_hashes, | 611 | | outcome, | 612 | | }) | 613 | | .await; | 614 | | | 615 | | match outcome_rx.await { | 616 | | Err(_) => { | 617 | | request_process | 618 | | .respond(methods::Response::chainHead_v1_unpin(())); | 619 | | } | 620 | | Ok(Ok(())) => { | 621 | | request_process | 622 | | .respond(methods::Response::chainHead_v1_unpin(())); | 623 | | } | 624 | | Ok(Err(())) => { | 625 | | request_process.fail(service::ErrorResponse::InvalidParams); | 626 | | } | 627 | | } | 628 | | } | 629 | | } | 630 | | _ => { | 631 | | to_requests_handlers | 632 | | .send(requests_handler::Message::Request(request_process)) | 633 | | .await | 634 | | .unwrap(); | 635 | | } | 636 | | } | 637 | | } | 638 | | service::Event::HandleSubscriptionStart { | 639 | | task, | 640 | | subscription_start, | 641 | | } => { | 642 | | client_main_task = task; | 643 | | | 644 | | match subscription_start.request() { | 645 | | // TODO: enforce limit to number of subscriptions | 646 | | methods::MethodCall::chainHead_v1_follow { with_runtime } => { | 647 | | let (tx, rx) = async_channel::bounded(16); | 648 | | let subscription_id = | 649 | | chain_head_subscriptions::spawn_chain_head_subscription_task( | 650 | | chain_head_subscriptions::Config { | 651 | | tasks_executor: tasks_executor.clone(), | 652 | | log_callback: log_callback.clone(), | 653 | | receiver: rx, | 654 | | chain_head_follow_subscription: subscription_start, | 655 | | with_runtime, | 656 | | consensus_service: consensus_service.clone(), | 657 | | database: database.clone(), | 658 | | }, | 659 | | ) | 660 | | .await; | 661 | | chain_head_follow_subscriptions.insert(subscription_id, tx); | 662 | | } | 663 | | _ => { | 664 | | to_requests_handlers | 665 | | .send(requests_handler::Message::SubscriptionStart( | 666 | | subscription_start, | 667 | | )) | 668 | | .await | 669 | | .unwrap(); | 670 | | } | 671 | | } | 672 | | } | 673 | | service::Event::SubscriptionDestroyed { | 674 | | task, | 675 | | subscription_id, | 676 | | .. | 677 | | } => { | 678 | | let _ = chain_head_follow_subscriptions.remove(&subscription_id); | 679 | | client_main_task = task; | 680 | | } | 681 | | service::Event::SerializedRequestsIoClosed => { | 682 | | // JSON-RPC client has disconnected. | 683 | | return; | 684 | | } | 685 | | } | 686 | | } | 687 | 21 | })); | 688 | 21 | } |
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22spawn_client_main_task |