/__w/smoldot/smoldot/repo/lib/src/json_rpc/service/client_main_task.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2023 Pierre Krieger |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | // TODO: doc |
19 | | |
20 | | use crate::json_rpc::{methods, parse}; |
21 | | use alloc::{ |
22 | | borrow::Cow, |
23 | | boxed::Box, |
24 | | collections::VecDeque, |
25 | | string::{String, ToString as _}, |
26 | | sync::{Arc, Weak}, |
27 | | }; |
28 | | use async_lock::Mutex; |
29 | | use core::{ |
30 | | cmp, fmt, mem, |
31 | | num::NonZeroU32, |
32 | | sync::atomic::{AtomicBool, AtomicU32, Ordering}, |
33 | | }; |
34 | | use futures_lite::FutureExt as _; |
35 | | use slab::Slab; |
36 | | |
37 | | pub use crate::json_rpc::parse::{ErrorResponse, ParseError}; |
38 | | |
39 | | /// See [module-level-documentation](..). |
40 | | pub struct ClientMainTask { |
41 | | /// Because we move the task around a lot, all the fields are actually within a `Box`. |
42 | | inner: Box<Inner>, |
43 | | } |
44 | | |
45 | | struct Inner { |
46 | | /// Identifier to allocate to the new subscription requested by the user. |
47 | | // TODO: better strategy than just integers? |
48 | | next_subscription_id: u64, |
49 | | |
50 | | /// List of all active subscriptions. Keys are subscription IDs. |
51 | | /// |
52 | | /// Given that the subscription IDs are allocated locally, there is no harm in using a |
53 | | /// non-HashDoS-resilient hash function. |
54 | | /// |
55 | | /// Entries are removed only when the [`SubscriptionStartProcess`] or [`Subscription`] object |
56 | | /// is destroyed. This is necessary given that the maximum number of subscriptions exists in |
57 | | /// order to avoid spam attacks, and that resources are free'd only when the |
58 | | /// [`SubscriptionStartProcess`] or [`Subscription`] is destroyed. |
59 | | active_subscriptions: hashbrown::HashMap<String, InnerSubscription, fnv::FnvBuildHasher>, |
60 | | /// Maximum size that [`Inner::active_subscriptions`] is allowed to reach. Beyond this, |
61 | | /// subscription start requests are automatically denied. |
62 | | max_active_subscriptions: u32, |
63 | | |
64 | | /// Structure shared with the [`SerializedRequestsIo`]. |
65 | | serialized_io: Arc<SerializedIo>, |
66 | | |
67 | | /// Queue where responses and subscriptions push responses/notifications. |
68 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
69 | | |
70 | | /// Event notified after the [`SerializedRequestsIo`] is destroyed. |
71 | | on_serialized_requests_io_destroyed: event_listener::EventListener, |
72 | | } |
73 | | |
74 | | struct InnerSubscription { |
75 | | /// Shared with the subscription. Used to notify the subscription that it should be killed. |
76 | | kill_channel: Arc<SubscriptionKillChannel>, |
77 | | /// Response to an unsubscribe request that must be sent out once the subscription is killed. |
78 | | unsubscribe_response: Option<String>, |
79 | | } |
80 | | |
81 | | struct SerializedIo { |
82 | | /// Queue of requests. The requests are guaranteed to be a valid request JSON, but not |
83 | | /// necessarily to use a known method. |
84 | | requests_queue: crossbeam_queue::SegQueue<String>, |
85 | | |
86 | | /// Event notified after an element has been pushed to [`SerializedIo::requests_queue`]. |
87 | | on_request_pushed: event_listener::Event, |
88 | | |
89 | | /// Event notified after an element from [`SerializedIo::requests_queue`] has been pulled. |
90 | | on_request_pulled_or_task_destroyed: event_listener::Event, |
91 | | |
92 | | /// Number of requests that have have been received from the client but whose answer hasn't |
93 | | /// been pulled out from [`SerializedIo::requests_queue`] yet. |
94 | | num_requests_in_fly: AtomicU32, |
95 | | |
96 | | /// Maximum value that [`SerializedIo::num_requests_in_fly`] is allowed to reach. |
97 | | /// Beyond this, no more request should be added to [`SerializedIo::requests_queue`]. |
98 | | max_requests_in_fly: NonZeroU32, |
99 | | |
100 | | /// Queue of responses. |
101 | | responses_queue: Mutex<SerializedIoResponses>, |
102 | | |
103 | | /// Event notified after an element has been pushed to [`SerializedIo::responses_queue`], or |
104 | | /// when the [`ClientMainTask`] has been destroyed. |
105 | | on_response_pushed_or_task_destroyed: event_listener::Event, |
106 | | } |
107 | | |
108 | | struct SerializedIoResponses { |
109 | | /// Unordered list of responses and notifications to send back to the client. |
110 | | /// |
111 | | /// Each entry contains the response/notification, and a boolean equal to `true` if this is |
112 | | /// a request response or `false` if this is a notification. |
113 | | pending_serialized_responses: Slab<(String, bool)>, |
114 | | |
115 | | /// Ordered list of responses and notifications to send back to the client, as indices within |
116 | | /// [`SerializedIoResponses::pending_serialized_responses`]. |
117 | | pending_serialized_responses_queue: VecDeque<usize>, |
118 | | } |
119 | | |
120 | | /// Queue where responses and subscriptions push responses/notifications. |
121 | | struct ResponsesNotificationsQueue { |
122 | | /// The actual queue. |
123 | | queue: crossbeam_queue::SegQueue<ToMainTask>, |
124 | | /// Maximum size that [`ResponsesNotificationsQueue::queue`] should reach. |
125 | | /// This is however not a hard limit. Pushing a response to a request and pushing a |
126 | | /// subscription destroyed event ignore this maximum (as doing so must always be lock-free), |
127 | | /// and pushing a notification checks against this limit in a racy way. For this reason, in |
128 | | /// the worst case scenario the queue can reach up to |
129 | | /// `max_requests_in_fly + max_active_subscriptions` elements. What matters, however, is that |
130 | | /// the queue is bounded in a way or the other more than the exact bound. |
131 | | max_len: usize, |
132 | | /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been pushed. |
133 | | on_pushed: event_listener::Event, |
134 | | /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been popped. |
135 | | on_popped: event_listener::Event, |
136 | | } |
137 | | |
138 | | // TODO: weird enum |
139 | | enum ToMainTask { |
140 | | RequestResponse(String), |
141 | | Notification(String), |
142 | | SubscriptionDestroyed { subscription_id: String }, |
143 | | } |
144 | | |
145 | | /// Configuration for [`client_main_task`]. |
146 | | pub struct Config { |
147 | | /// Maximum number of requests that have been sent by the [`SerializedRequestsIo`] but whose |
148 | | /// response hasn't been pulled through the [`SerializedRequestsIo`] yet. |
149 | | /// |
150 | | /// If this limit is reached, it is not possible to send further requests without pulling |
151 | | /// responses first. |
152 | | pub max_pending_requests: NonZeroU32, |
153 | | |
154 | | /// Maximum number of simultaneous subscriptions allowed. Trying to create a subscription will |
155 | | /// be automatically rejected if this limit is reached. |
156 | | pub max_active_subscriptions: u32, |
157 | | } |
158 | | |
159 | | /// Creates a new [`ClientMainTask`] and a [`SerializedRequestsIo`] connected to it. |
160 | 21 | pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) { |
161 | 21 | let buffers_capacity = usize::try_from(config.max_pending_requests.get()) |
162 | 21 | .unwrap_or(usize::MAX) |
163 | 21 | .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX)); |
164 | 21 | |
165 | 21 | let on_serialized_requests_io_destroyed = event_listener::Event::new(); |
166 | 21 | |
167 | 21 | let task = ClientMainTask { |
168 | 21 | inner: Box::new(Inner { |
169 | 21 | next_subscription_id: 1, |
170 | 21 | active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( |
171 | 21 | cmp::min( |
172 | 21 | usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX), |
173 | 21 | 32, |
174 | 21 | ), |
175 | 21 | Default::default(), |
176 | 21 | ), |
177 | 21 | max_active_subscriptions: config.max_active_subscriptions, |
178 | 21 | serialized_io: Arc::new(SerializedIo { |
179 | 21 | requests_queue: crossbeam_queue::SegQueue::new(), |
180 | 21 | on_request_pushed: event_listener::Event::new(), |
181 | 21 | on_request_pulled_or_task_destroyed: event_listener::Event::new(), |
182 | 21 | num_requests_in_fly: AtomicU32::new(0), |
183 | 21 | max_requests_in_fly: config.max_pending_requests, |
184 | 21 | responses_queue: Mutex::new(SerializedIoResponses { |
185 | 21 | pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min( |
186 | 21 | 64, |
187 | 21 | buffers_capacity, |
188 | 21 | )), |
189 | 21 | pending_serialized_responses: Slab::with_capacity(cmp::min( |
190 | 21 | 64, |
191 | 21 | buffers_capacity, |
192 | 21 | )), |
193 | 21 | }), |
194 | 21 | on_response_pushed_or_task_destroyed: event_listener::Event::new(), |
195 | 21 | }), |
196 | 21 | responses_notifications_queue: Arc::new(ResponsesNotificationsQueue { |
197 | 21 | queue: crossbeam_queue::SegQueue::new(), |
198 | 21 | max_len: buffers_capacity, |
199 | 21 | on_pushed: event_listener::Event::new(), |
200 | 21 | on_popped: event_listener::Event::new(), |
201 | 21 | }), |
202 | 21 | on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(), |
203 | 21 | }), |
204 | 21 | }; |
205 | 21 | |
206 | 21 | let serialized_requests_io = SerializedRequestsIo { |
207 | 21 | serialized_io: Arc::downgrade(&task.inner.serialized_io), |
208 | 21 | on_serialized_requests_io_destroyed, |
209 | 21 | }; |
210 | 21 | |
211 | 21 | (task, serialized_requests_io) |
212 | 21 | } Unexecuted instantiation: _RNvNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_task16client_main_task _RNvNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_task16client_main_task Line | Count | Source | 160 | 21 | pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) { | 161 | 21 | let buffers_capacity = usize::try_from(config.max_pending_requests.get()) | 162 | 21 | .unwrap_or(usize::MAX) | 163 | 21 | .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX)); | 164 | 21 | | 165 | 21 | let on_serialized_requests_io_destroyed = event_listener::Event::new(); | 166 | 21 | | 167 | 21 | let task = ClientMainTask { | 168 | 21 | inner: Box::new(Inner { | 169 | 21 | next_subscription_id: 1, | 170 | 21 | active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( | 171 | 21 | cmp::min( | 172 | 21 | usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX), | 173 | 21 | 32, | 174 | 21 | ), | 175 | 21 | Default::default(), | 176 | 21 | ), | 177 | 21 | max_active_subscriptions: config.max_active_subscriptions, | 178 | 21 | serialized_io: Arc::new(SerializedIo { | 179 | 21 | requests_queue: crossbeam_queue::SegQueue::new(), | 180 | 21 | on_request_pushed: event_listener::Event::new(), | 181 | 21 | on_request_pulled_or_task_destroyed: event_listener::Event::new(), | 182 | 21 | num_requests_in_fly: AtomicU32::new(0), | 183 | 21 | max_requests_in_fly: config.max_pending_requests, | 184 | 21 | responses_queue: Mutex::new(SerializedIoResponses { | 185 | 21 | pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min( | 186 | 21 | 64, | 187 | 21 | buffers_capacity, | 188 | 21 | )), | 189 | 21 | pending_serialized_responses: Slab::with_capacity(cmp::min( | 190 | 21 | 64, | 191 | 21 | buffers_capacity, | 192 | 21 | )), | 193 | 21 | }), | 194 | 21 | on_response_pushed_or_task_destroyed: event_listener::Event::new(), | 195 | 21 | }), | 196 | 21 | responses_notifications_queue: Arc::new(ResponsesNotificationsQueue { | 197 | 21 | queue: crossbeam_queue::SegQueue::new(), | 198 | 21 | max_len: buffers_capacity, | 199 | 21 | on_pushed: event_listener::Event::new(), | 200 | 21 | on_popped: event_listener::Event::new(), | 201 | 21 | }), | 202 | 21 | on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(), | 203 | 21 | }), | 204 | 21 | }; | 205 | 21 | | 206 | 21 | let serialized_requests_io = SerializedRequestsIo { | 207 | 21 | serialized_io: Arc::downgrade(&task.inner.serialized_io), | 208 | 21 | on_serialized_requests_io_destroyed, | 209 | 21 | }; | 210 | 21 | | 211 | 21 | (task, serialized_requests_io) | 212 | 21 | } |
|
213 | | |
214 | | impl ClientMainTask { |
215 | | /// Processes the task's internals and waits until something noteworthy happens. |
216 | 44 | pub async fn run_until_event(mut self) -> Event 0 { Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event |
217 | | loop { |
218 | | enum WakeUpReason { |
219 | | NewRequest(String), |
220 | | Message(ToMainTask), |
221 | | } |
222 | | |
223 | 48 | let wake_up_reason = { |
224 | 69 | let serialized_requests_io_destroyed = async { |
225 | 69 | (&mut self.inner.on_serialized_requests_io_destroyed).await65 ; |
226 | 21 | Err(()) |
227 | 21 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_ Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_ _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00CsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 224 | 69 | let serialized_requests_io_destroyed = async { | 225 | 69 | (&mut self.inner.on_serialized_requests_io_destroyed).await65 ; | 226 | 21 | Err(()) | 227 | 21 | }; |
|
228 | | |
229 | 69 | let next_serialized_request = async { |
230 | 69 | let mut wait = None; |
231 | | loop { |
232 | 155 | if let Some(elem25 ) = self.inner.serialized_io.requests_queue.pop() { |
233 | 25 | self.inner |
234 | 25 | .serialized_io |
235 | 25 | .on_request_pulled_or_task_destroyed |
236 | 25 | .notify(usize::MAX); |
237 | 25 | break Ok(WakeUpReason::NewRequest(elem)); |
238 | 130 | } |
239 | 130 | if let Some(wait65 ) = wait.take() { |
240 | 65 | wait.await44 |
241 | 65 | } else { |
242 | 65 | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); |
243 | 65 | } |
244 | | } |
245 | 25 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_ Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_ _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0CsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 229 | 69 | let next_serialized_request = async { | 230 | 69 | let mut wait = None; | 231 | | loop { | 232 | 155 | if let Some(elem25 ) = self.inner.serialized_io.requests_queue.pop() { | 233 | 25 | self.inner | 234 | 25 | .serialized_io | 235 | 25 | .on_request_pulled_or_task_destroyed | 236 | 25 | .notify(usize::MAX); | 237 | 25 | break Ok(WakeUpReason::NewRequest(elem)); | 238 | 130 | } | 239 | 130 | if let Some(wait65 ) = wait.take() { | 240 | 65 | wait.await44 | 241 | 65 | } else { | 242 | 65 | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); | 243 | 65 | } | 244 | | } | 245 | 25 | }; |
|
246 | | |
247 | 69 | let response_notif = async { |
248 | 65 | let mut wait = None; |
249 | | loop { |
250 | 153 | if let Some(elem23 ) = self.inner.responses_notifications_queue.queue.pop() { |
251 | 23 | break Ok(WakeUpReason::Message(elem)); |
252 | 130 | } |
253 | 130 | if let Some(wait65 ) = wait.take() { |
254 | 65 | wait.await23 |
255 | 65 | } else { |
256 | 65 | wait = |
257 | 65 | Some(self.inner.responses_notifications_queue.on_pushed.listen()); |
258 | 65 | } |
259 | | } |
260 | 23 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_ Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_ _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0CsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 247 | 65 | let response_notif = async { | 248 | 65 | let mut wait = None; | 249 | | loop { | 250 | 153 | if let Some(elem23 ) = self.inner.responses_notifications_queue.queue.pop() { | 251 | 23 | break Ok(WakeUpReason::Message(elem)); | 252 | 130 | } | 253 | 130 | if let Some(wait65 ) = wait.take() { | 254 | 65 | wait.await23 | 255 | 65 | } else { | 256 | 65 | wait = | 257 | 65 | Some(self.inner.responses_notifications_queue.on_pushed.listen()); | 258 | 65 | } | 259 | | } | 260 | 23 | }; |
|
261 | | |
262 | 69 | match serialized_requests_io_destroyed |
263 | 69 | .or(next_serialized_request) |
264 | 69 | .or(response_notif) |
265 | 65 | .await |
266 | | { |
267 | 48 | Ok(wake_up_reason) => wake_up_reason, |
268 | 21 | Err(()) => return Event::SerializedRequestsIoClosed, |
269 | | } |
270 | | }; |
271 | | |
272 | | // Immediately handle every event apart from `NewRequest`. |
273 | 25 | let new_request = match wake_up_reason23 { |
274 | 25 | WakeUpReason::NewRequest(request) => request, |
275 | 0 | WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => { |
276 | 0 | let InnerSubscription { |
277 | 0 | unsubscribe_response, |
278 | 0 | .. |
279 | 0 | } = self |
280 | 0 | .inner |
281 | 0 | .active_subscriptions |
282 | 0 | .remove(&subscription_id) |
283 | 0 | .unwrap(); |
284 | | // TODO: post a `stop`/`error` event for chainhead subscriptions |
285 | 0 | if let Some(unsubscribe_response) = unsubscribe_response { |
286 | 0 | let mut responses_queue = |
287 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
288 | 0 | let pos = responses_queue |
289 | 0 | .pending_serialized_responses |
290 | 0 | .insert((unsubscribe_response, true)); |
291 | 0 | responses_queue |
292 | 0 | .pending_serialized_responses_queue |
293 | 0 | .push_back(pos); |
294 | 0 | self.inner |
295 | 0 | .serialized_io |
296 | 0 | .on_response_pushed_or_task_destroyed |
297 | 0 | .notify(usize::MAX); |
298 | 0 | } |
299 | | |
300 | | // Shrink the list of active subscriptions if necessary. |
301 | 0 | if self.inner.active_subscriptions.capacity() |
302 | 0 | >= 2 * self.inner.active_subscriptions.len() + 16 |
303 | 0 | { |
304 | 0 | self.inner.active_subscriptions.shrink_to_fit(); |
305 | 0 | } |
306 | | |
307 | 0 | return Event::SubscriptionDestroyed { |
308 | 0 | task: self, |
309 | 0 | subscription_id, |
310 | 0 | }; |
311 | | } |
312 | 23 | WakeUpReason::Message(ToMainTask::RequestResponse(response)) => { |
313 | 23 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await0 ; |
314 | 23 | let pos = responses_queue |
315 | 23 | .pending_serialized_responses |
316 | 23 | .insert((response, true)); |
317 | 23 | responses_queue |
318 | 23 | .pending_serialized_responses_queue |
319 | 23 | .push_back(pos); |
320 | 23 | self.inner |
321 | 23 | .serialized_io |
322 | 23 | .on_response_pushed_or_task_destroyed |
323 | 23 | .notify(usize::MAX); |
324 | 23 | continue; |
325 | | } |
326 | 0 | WakeUpReason::Message(ToMainTask::Notification(notification)) => { |
327 | | // TODO: filter out redundant notifications, as it's the entire point of this module |
328 | 0 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; |
329 | 0 | let pos = responses_queue |
330 | 0 | .pending_serialized_responses |
331 | 0 | .insert((notification, false)); |
332 | 0 | responses_queue |
333 | 0 | .pending_serialized_responses_queue |
334 | 0 | .push_back(pos); |
335 | 0 | self.inner |
336 | 0 | .serialized_io |
337 | 0 | .on_response_pushed_or_task_destroyed |
338 | 0 | .notify(usize::MAX); |
339 | 0 | continue; |
340 | | } |
341 | | }; |
342 | | |
343 | 23 | let (request_id, parsed_request) = |
344 | 25 | match methods::parse_jsonrpc_client_to_server(&new_request) { |
345 | 23 | Ok((request_id, method)) => (request_id, method), |
346 | 1 | Err(methods::ParseClientToServerError::Method { request_id, error }) => { |
347 | 1 | let response = error.to_json_error(request_id); |
348 | 1 | let mut responses_queue = |
349 | 1 | self.inner.serialized_io.responses_queue.lock().await0 ; |
350 | 1 | let pos = responses_queue |
351 | 1 | .pending_serialized_responses |
352 | 1 | .insert((response, true)); |
353 | 1 | responses_queue |
354 | 1 | .pending_serialized_responses_queue |
355 | 1 | .push_back(pos); |
356 | 1 | self.inner |
357 | 1 | .serialized_io |
358 | 1 | .on_response_pushed_or_task_destroyed |
359 | 1 | .notify(usize::MAX); |
360 | 1 | continue; |
361 | | } |
362 | 0 | Err(methods::ParseClientToServerError::UnknownNotification(_)) => continue, |
363 | | Err(methods::ParseClientToServerError::JsonRpcParse(_)) => { |
364 | 1 | let response = parse::build_parse_error_response(); |
365 | 1 | let mut responses_queue = |
366 | 1 | self.inner.serialized_io.responses_queue.lock().await0 ; |
367 | 1 | let pos = responses_queue |
368 | 1 | .pending_serialized_responses |
369 | 1 | .insert((response, true)); |
370 | 1 | responses_queue |
371 | 1 | .pending_serialized_responses_queue |
372 | 1 | .push_back(pos); |
373 | 1 | self.inner |
374 | 1 | .serialized_io |
375 | 1 | .on_response_pushed_or_task_destroyed |
376 | 1 | .notify(usize::MAX); |
377 | 1 | continue; |
378 | | } |
379 | | }; |
380 | | |
381 | | // There exists three types of requests: |
382 | | // |
383 | | // - Requests that follow a simple one-request-one-response schema. |
384 | | // - Requests that, if accepted, start a subscription. |
385 | | // - Requests that unsubscribe from a subscription. |
386 | | // |
387 | 23 | match &parsed_request { |
388 | | methods::MethodCall::account_nextIndex { .. } |
389 | | | methods::MethodCall::author_hasKey { .. } |
390 | | | methods::MethodCall::author_hasSessionKeys { .. } |
391 | | | methods::MethodCall::author_insertKey { .. } |
392 | | | methods::MethodCall::author_pendingExtrinsics { .. } |
393 | | | methods::MethodCall::author_removeExtrinsic { .. } |
394 | | | methods::MethodCall::author_rotateKeys { .. } |
395 | | | methods::MethodCall::author_submitExtrinsic { .. } |
396 | | | methods::MethodCall::babe_epochAuthorship { .. } |
397 | | | methods::MethodCall::chain_getBlock { .. } |
398 | | | methods::MethodCall::chain_getBlockHash { .. } |
399 | | | methods::MethodCall::chain_getFinalizedHead { .. } |
400 | | | methods::MethodCall::chain_getHeader { .. } |
401 | | | methods::MethodCall::childstate_getKeys { .. } |
402 | | | methods::MethodCall::childstate_getStorage { .. } |
403 | | | methods::MethodCall::childstate_getStorageHash { .. } |
404 | | | methods::MethodCall::childstate_getStorageSize { .. } |
405 | | | methods::MethodCall::grandpa_roundState { .. } |
406 | | | methods::MethodCall::offchain_localStorageGet { .. } |
407 | | | methods::MethodCall::offchain_localStorageSet { .. } |
408 | | | methods::MethodCall::payment_queryInfo { .. } |
409 | | | methods::MethodCall::state_call { .. } |
410 | | | methods::MethodCall::state_getKeys { .. } |
411 | | | methods::MethodCall::state_getKeysPaged { .. } |
412 | | | methods::MethodCall::state_getMetadata { .. } |
413 | | | methods::MethodCall::state_getPairs { .. } |
414 | | | methods::MethodCall::state_getReadProof { .. } |
415 | | | methods::MethodCall::state_getRuntimeVersion { .. } |
416 | | | methods::MethodCall::state_getStorage { .. } |
417 | | | methods::MethodCall::state_getStorageHash { .. } |
418 | | | methods::MethodCall::state_getStorageSize { .. } |
419 | | | methods::MethodCall::state_queryStorage { .. } |
420 | | | methods::MethodCall::state_queryStorageAt { .. } |
421 | | | methods::MethodCall::system_accountNextIndex { .. } |
422 | | | methods::MethodCall::system_addReservedPeer { .. } |
423 | | | methods::MethodCall::system_chain { .. } |
424 | | | methods::MethodCall::system_chainType { .. } |
425 | | | methods::MethodCall::system_dryRun { .. } |
426 | | | methods::MethodCall::system_health { .. } |
427 | | | methods::MethodCall::system_localListenAddresses { .. } |
428 | | | methods::MethodCall::system_localPeerId { .. } |
429 | | | methods::MethodCall::system_name { .. } |
430 | | | methods::MethodCall::system_networkState { .. } |
431 | | | methods::MethodCall::system_nodeRoles { .. } |
432 | | | methods::MethodCall::system_peers { .. } |
433 | | | methods::MethodCall::system_properties { .. } |
434 | | | methods::MethodCall::system_removeReservedPeer { .. } |
435 | | | methods::MethodCall::system_version { .. } |
436 | | | methods::MethodCall::chainSpec_v1_chainName { .. } |
437 | | | methods::MethodCall::chainSpec_v1_genesisHash { .. } |
438 | | | methods::MethodCall::chainSpec_v1_properties { .. } |
439 | | | methods::MethodCall::rpc_methods { .. } |
440 | | | methods::MethodCall::sudo_unstable_p2pDiscover { .. } |
441 | | | methods::MethodCall::sudo_unstable_version { .. } |
442 | | | methods::MethodCall::chainHead_v1_body { .. } |
443 | | | methods::MethodCall::chainHead_v1_call { .. } |
444 | | | methods::MethodCall::chainHead_v1_continue { .. } |
445 | | | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. } |
446 | | | methods::MethodCall::chainHead_v1_header { .. } |
447 | | | methods::MethodCall::chainHead_v1_stopOperation { .. } |
448 | | | methods::MethodCall::chainHead_v1_storage { .. } |
449 | | | methods::MethodCall::chainHead_v1_unpin { .. } => { |
450 | | // Simple one-request-one-response. |
451 | 23 | return Event::HandleRequest { |
452 | 23 | request_process: RequestProcess { |
453 | 23 | responses_notifications_queue: self |
454 | 23 | .inner |
455 | 23 | .responses_notifications_queue |
456 | 23 | .clone(), |
457 | 23 | request: new_request, |
458 | 23 | has_sent_response: false, |
459 | 23 | }, |
460 | 23 | task: self, |
461 | 23 | }; |
462 | | } |
463 | | |
464 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } |
465 | | | methods::MethodCall::chain_subscribeAllHeads { .. } |
466 | | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } |
467 | | | methods::MethodCall::chain_subscribeNewHeads { .. } |
468 | | | methods::MethodCall::state_subscribeRuntimeVersion { .. } |
469 | | | methods::MethodCall::state_subscribeStorage { .. } |
470 | | | methods::MethodCall::transaction_v1_broadcast { .. } |
471 | | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } |
472 | | | methods::MethodCall::sudo_network_unstable_watch { .. } |
473 | | | methods::MethodCall::chainHead_v1_follow { .. } => { |
474 | | // Subscription starting requests. |
475 | | |
476 | | // We must check the maximum number of subscriptions. |
477 | 0 | let max_subscriptions = |
478 | 0 | usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX); |
479 | 0 | debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions); |
480 | 0 | if self.inner.active_subscriptions.len() >= max_subscriptions { |
481 | 0 | let response = parse::build_error_response( |
482 | 0 | request_id, |
483 | 0 | ErrorResponse::ServerError(-32000, "Too many active subscriptions"), |
484 | 0 | None, |
485 | 0 | ); |
486 | 0 | let mut responses_queue = |
487 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
488 | 0 | let pos = responses_queue |
489 | 0 | .pending_serialized_responses |
490 | 0 | .insert((response, true)); |
491 | 0 | responses_queue |
492 | 0 | .pending_serialized_responses_queue |
493 | 0 | .push_back(pos); |
494 | 0 | self.inner |
495 | 0 | .serialized_io |
496 | 0 | .on_response_pushed_or_task_destroyed |
497 | 0 | .notify(usize::MAX); |
498 | 0 | continue; |
499 | 0 | } |
500 | 0 |
|
501 | 0 | // Allocate the new subscription ID. |
502 | 0 | let subscription_id = self.allocate_subscription_id(); |
503 | 0 | debug_assert!(!self |
504 | 0 | .inner |
505 | 0 | .active_subscriptions |
506 | 0 | .contains_key(&subscription_id)); |
507 | | |
508 | | // Insert an "kill channel" in the local state. This kill channel is shared |
509 | | // with the subscription object and is used to notify when a subscription |
510 | | // should be killed. |
511 | 0 | let kill_channel = Arc::new(SubscriptionKillChannel { |
512 | 0 | dead: AtomicBool::new(false), |
513 | 0 | on_dead_changed: event_listener::Event::new(), |
514 | 0 | }); |
515 | 0 | self.inner.active_subscriptions.insert( |
516 | 0 | subscription_id.clone(), |
517 | 0 | InnerSubscription { |
518 | 0 | kill_channel: kill_channel.clone(), |
519 | 0 | unsubscribe_response: None, |
520 | 0 | }, |
521 | 0 | ); |
522 | 0 |
|
523 | 0 | return Event::HandleSubscriptionStart { |
524 | 0 | subscription_start: SubscriptionStartProcess { |
525 | 0 | responses_notifications_queue: self |
526 | 0 | .inner |
527 | 0 | .responses_notifications_queue |
528 | 0 | .clone(), |
529 | 0 | request: new_request, |
530 | 0 | kill_channel, |
531 | 0 | subscription_id, |
532 | 0 | has_sent_response: false, |
533 | 0 | }, |
534 | 0 | task: self, |
535 | 0 | }; |
536 | | } |
537 | | |
538 | 0 | methods::MethodCall::author_unwatchExtrinsic { subscription, .. } |
539 | 0 | | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. } |
540 | 0 | | methods::MethodCall::state_unsubscribeStorage { subscription, .. } |
541 | | | methods::MethodCall::transaction_v1_stop { |
542 | 0 | operation_id: subscription, |
543 | | } |
544 | 0 | | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. } |
545 | 0 | | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. } |
546 | | | methods::MethodCall::chainHead_v1_unfollow { |
547 | 0 | follow_subscription: subscription, |
548 | | .. |
549 | | } => { |
550 | | // TODO: must check whether type of subscription matches |
551 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { |
552 | | Some(InnerSubscription { |
553 | 0 | kill_channel, |
554 | 0 | unsubscribe_response, |
555 | 0 | }) if unsubscribe_response.is_none() => { |
556 | 0 | *unsubscribe_response = Some( |
557 | 0 | match parsed_request { |
558 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { |
559 | 0 | methods::Response::author_unwatchExtrinsic(true) |
560 | | } |
561 | | methods::MethodCall::state_unsubscribeRuntimeVersion { |
562 | | .. |
563 | 0 | } => methods::Response::state_unsubscribeRuntimeVersion(true), |
564 | | methods::MethodCall::state_unsubscribeStorage { .. } => { |
565 | 0 | methods::Response::state_unsubscribeStorage(true) |
566 | | } |
567 | | methods::MethodCall::transaction_v1_stop { .. } => { |
568 | 0 | methods::Response::transaction_v1_stop(()) |
569 | | } |
570 | | methods::MethodCall::transactionWatch_v1_unwatch { .. } => { |
571 | 0 | methods::Response::transactionWatch_v1_unwatch(()) |
572 | | } |
573 | | methods::MethodCall::sudo_network_unstable_unwatch { |
574 | | .. |
575 | 0 | } => methods::Response::sudo_network_unstable_unwatch(()), |
576 | | methods::MethodCall::chainHead_v1_unfollow { .. } => { |
577 | 0 | methods::Response::chainHead_v1_unfollow(()) |
578 | | } |
579 | 0 | _ => unreachable!(), |
580 | | } |
581 | 0 | .to_json_response(request_id), |
582 | 0 | ); |
583 | 0 |
|
584 | 0 | kill_channel.dead.store(true, Ordering::Release); |
585 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
586 | | } |
587 | | _ => { |
588 | 0 | let response = match parsed_request { |
589 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { |
590 | 0 | methods::Response::author_unwatchExtrinsic(false) |
591 | 0 | .to_json_response(request_id) |
592 | | } |
593 | | methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => { |
594 | 0 | methods::Response::state_unsubscribeRuntimeVersion(false) |
595 | 0 | .to_json_response(request_id) |
596 | | } |
597 | | methods::MethodCall::state_unsubscribeStorage { .. } => { |
598 | 0 | methods::Response::state_unsubscribeStorage(false) |
599 | 0 | .to_json_response(request_id) |
600 | | } |
601 | 0 | _ => parse::build_error_response( |
602 | 0 | request_id, |
603 | 0 | ErrorResponse::InvalidParams, |
604 | 0 | None, |
605 | 0 | ), |
606 | | }; |
607 | | |
608 | 0 | let mut responses_queue = |
609 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
610 | 0 | let pos = responses_queue |
611 | 0 | .pending_serialized_responses |
612 | 0 | .insert((response, true)); |
613 | 0 | responses_queue |
614 | 0 | .pending_serialized_responses_queue |
615 | 0 | .push_back(pos); |
616 | 0 | self.inner |
617 | 0 | .serialized_io |
618 | 0 | .on_response_pushed_or_task_destroyed |
619 | 0 | .notify(usize::MAX); |
620 | | } |
621 | | } |
622 | | } |
623 | 0 | methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. } |
624 | 0 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. } |
625 | 0 | | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => { |
626 | | // TODO: DRY with above |
627 | | // TODO: must check whether type of subscription matches |
628 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { |
629 | | Some(InnerSubscription { |
630 | 0 | unsubscribe_response, |
631 | 0 | kill_channel, |
632 | 0 | }) if unsubscribe_response.is_none() => { |
633 | 0 | *unsubscribe_response = Some(match parsed_request { |
634 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { |
635 | 0 | methods::Response::chain_unsubscribeAllHeads(true) |
636 | 0 | .to_json_response(request_id) |
637 | | } |
638 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { |
639 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(true) |
640 | 0 | .to_json_response(request_id) |
641 | | } |
642 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { |
643 | 0 | methods::Response::chain_unsubscribeNewHeads(true) |
644 | 0 | .to_json_response(request_id) |
645 | | } |
646 | 0 | _ => unreachable!(), |
647 | | }); |
648 | | |
649 | 0 | kill_channel.dead.store(true, Ordering::Release); |
650 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
651 | | } |
652 | | _ => { |
653 | 0 | let response = match parsed_request { |
654 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { |
655 | 0 | methods::Response::chain_unsubscribeAllHeads(false) |
656 | 0 | .to_json_response(request_id) |
657 | | } |
658 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { |
659 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(false) |
660 | 0 | .to_json_response(request_id) |
661 | | } |
662 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { |
663 | 0 | methods::Response::chain_unsubscribeNewHeads(false) |
664 | 0 | .to_json_response(request_id) |
665 | | } |
666 | 0 | _ => unreachable!(), |
667 | | }; |
668 | | |
669 | 0 | let mut responses_queue = |
670 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
671 | 0 | let pos = responses_queue |
672 | 0 | .pending_serialized_responses |
673 | 0 | .insert((response, true)); |
674 | 0 | responses_queue |
675 | 0 | .pending_serialized_responses_queue |
676 | 0 | .push_back(pos); |
677 | 0 | self.inner |
678 | 0 | .serialized_io |
679 | 0 | .on_response_pushed_or_task_destroyed |
680 | 0 | .notify(usize::MAX); |
681 | | } |
682 | | } |
683 | | } |
684 | | } |
685 | | } |
686 | 44 | } Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_ _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0CsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 216 | 44 | pub async fn run_until_event(mut self) -> Event { | 217 | | loop { | 218 | | enum WakeUpReason { | 219 | | NewRequest(String), | 220 | | Message(ToMainTask), | 221 | | } | 222 | | | 223 | 48 | let wake_up_reason = { | 224 | 69 | let serialized_requests_io_destroyed = async { | 225 | | (&mut self.inner.on_serialized_requests_io_destroyed).await; | 226 | | Err(()) | 227 | | }; | 228 | | | 229 | 69 | let next_serialized_request = async { | 230 | | let mut wait = None; | 231 | | loop { | 232 | | if let Some(elem) = self.inner.serialized_io.requests_queue.pop() { | 233 | | self.inner | 234 | | .serialized_io | 235 | | .on_request_pulled_or_task_destroyed | 236 | | .notify(usize::MAX); | 237 | | break Ok(WakeUpReason::NewRequest(elem)); | 238 | | } | 239 | | if let Some(wait) = wait.take() { | 240 | | wait.await | 241 | | } else { | 242 | | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); | 243 | | } | 244 | | } | 245 | | }; | 246 | | | 247 | 69 | let response_notif = async { | 248 | | let mut wait = None; | 249 | | loop { | 250 | | if let Some(elem) = self.inner.responses_notifications_queue.queue.pop() { | 251 | | break Ok(WakeUpReason::Message(elem)); | 252 | | } | 253 | | if let Some(wait) = wait.take() { | 254 | | wait.await | 255 | | } else { | 256 | | wait = | 257 | | Some(self.inner.responses_notifications_queue.on_pushed.listen()); | 258 | | } | 259 | | } | 260 | | }; | 261 | | | 262 | 69 | match serialized_requests_io_destroyed | 263 | 69 | .or(next_serialized_request) | 264 | 69 | .or(response_notif) | 265 | 65 | .await | 266 | | { | 267 | 48 | Ok(wake_up_reason) => wake_up_reason, | 268 | 21 | Err(()) => return Event::SerializedRequestsIoClosed, | 269 | | } | 270 | | }; | 271 | | | 272 | | // Immediately handle every event apart from `NewRequest`. | 273 | 25 | let new_request = match wake_up_reason23 { | 274 | 25 | WakeUpReason::NewRequest(request) => request, | 275 | 0 | WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => { | 276 | 0 | let InnerSubscription { | 277 | 0 | unsubscribe_response, | 278 | 0 | .. | 279 | 0 | } = self | 280 | 0 | .inner | 281 | 0 | .active_subscriptions | 282 | 0 | .remove(&subscription_id) | 283 | 0 | .unwrap(); | 284 | | // TODO: post a `stop`/`error` event for chainhead subscriptions | 285 | 0 | if let Some(unsubscribe_response) = unsubscribe_response { | 286 | 0 | let mut responses_queue = | 287 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 288 | 0 | let pos = responses_queue | 289 | 0 | .pending_serialized_responses | 290 | 0 | .insert((unsubscribe_response, true)); | 291 | 0 | responses_queue | 292 | 0 | .pending_serialized_responses_queue | 293 | 0 | .push_back(pos); | 294 | 0 | self.inner | 295 | 0 | .serialized_io | 296 | 0 | .on_response_pushed_or_task_destroyed | 297 | 0 | .notify(usize::MAX); | 298 | 0 | } | 299 | | | 300 | | // Shrink the list of active subscriptions if necessary. | 301 | 0 | if self.inner.active_subscriptions.capacity() | 302 | 0 | >= 2 * self.inner.active_subscriptions.len() + 16 | 303 | 0 | { | 304 | 0 | self.inner.active_subscriptions.shrink_to_fit(); | 305 | 0 | } | 306 | | | 307 | 0 | return Event::SubscriptionDestroyed { | 308 | 0 | task: self, | 309 | 0 | subscription_id, | 310 | 0 | }; | 311 | | } | 312 | 23 | WakeUpReason::Message(ToMainTask::RequestResponse(response)) => { | 313 | 23 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await0 ; | 314 | 23 | let pos = responses_queue | 315 | 23 | .pending_serialized_responses | 316 | 23 | .insert((response, true)); | 317 | 23 | responses_queue | 318 | 23 | .pending_serialized_responses_queue | 319 | 23 | .push_back(pos); | 320 | 23 | self.inner | 321 | 23 | .serialized_io | 322 | 23 | .on_response_pushed_or_task_destroyed | 323 | 23 | .notify(usize::MAX); | 324 | 23 | continue; | 325 | | } | 326 | 0 | WakeUpReason::Message(ToMainTask::Notification(notification)) => { | 327 | | // TODO: filter out redundant notifications, as it's the entire point of this module | 328 | 0 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; | 329 | 0 | let pos = responses_queue | 330 | 0 | .pending_serialized_responses | 331 | 0 | .insert((notification, false)); | 332 | 0 | responses_queue | 333 | 0 | .pending_serialized_responses_queue | 334 | 0 | .push_back(pos); | 335 | 0 | self.inner | 336 | 0 | .serialized_io | 337 | 0 | .on_response_pushed_or_task_destroyed | 338 | 0 | .notify(usize::MAX); | 339 | 0 | continue; | 340 | | } | 341 | | }; | 342 | | | 343 | 23 | let (request_id, parsed_request) = | 344 | 25 | match methods::parse_jsonrpc_client_to_server(&new_request) { | 345 | 23 | Ok((request_id, method)) => (request_id, method), | 346 | 1 | Err(methods::ParseClientToServerError::Method { request_id, error }) => { | 347 | 1 | let response = error.to_json_error(request_id); | 348 | 1 | let mut responses_queue = | 349 | 1 | self.inner.serialized_io.responses_queue.lock().await0 ; | 350 | 1 | let pos = responses_queue | 351 | 1 | .pending_serialized_responses | 352 | 1 | .insert((response, true)); | 353 | 1 | responses_queue | 354 | 1 | .pending_serialized_responses_queue | 355 | 1 | .push_back(pos); | 356 | 1 | self.inner | 357 | 1 | .serialized_io | 358 | 1 | .on_response_pushed_or_task_destroyed | 359 | 1 | .notify(usize::MAX); | 360 | 1 | continue; | 361 | | } | 362 | 0 | Err(methods::ParseClientToServerError::UnknownNotification(_)) => continue, | 363 | | Err(methods::ParseClientToServerError::JsonRpcParse(_)) => { | 364 | 1 | let response = parse::build_parse_error_response(); | 365 | 1 | let mut responses_queue = | 366 | 1 | self.inner.serialized_io.responses_queue.lock().await0 ; | 367 | 1 | let pos = responses_queue | 368 | 1 | .pending_serialized_responses | 369 | 1 | .insert((response, true)); | 370 | 1 | responses_queue | 371 | 1 | .pending_serialized_responses_queue | 372 | 1 | .push_back(pos); | 373 | 1 | self.inner | 374 | 1 | .serialized_io | 375 | 1 | .on_response_pushed_or_task_destroyed | 376 | 1 | .notify(usize::MAX); | 377 | 1 | continue; | 378 | | } | 379 | | }; | 380 | | | 381 | | // There exists three types of requests: | 382 | | // | 383 | | // - Requests that follow a simple one-request-one-response schema. | 384 | | // - Requests that, if accepted, start a subscription. | 385 | | // - Requests that unsubscribe from a subscription. | 386 | | // | 387 | 23 | match &parsed_request { | 388 | | methods::MethodCall::account_nextIndex { .. } | 389 | | | methods::MethodCall::author_hasKey { .. } | 390 | | | methods::MethodCall::author_hasSessionKeys { .. } | 391 | | | methods::MethodCall::author_insertKey { .. } | 392 | | | methods::MethodCall::author_pendingExtrinsics { .. } | 393 | | | methods::MethodCall::author_removeExtrinsic { .. } | 394 | | | methods::MethodCall::author_rotateKeys { .. } | 395 | | | methods::MethodCall::author_submitExtrinsic { .. } | 396 | | | methods::MethodCall::babe_epochAuthorship { .. } | 397 | | | methods::MethodCall::chain_getBlock { .. } | 398 | | | methods::MethodCall::chain_getBlockHash { .. } | 399 | | | methods::MethodCall::chain_getFinalizedHead { .. } | 400 | | | methods::MethodCall::chain_getHeader { .. } | 401 | | | methods::MethodCall::childstate_getKeys { .. } | 402 | | | methods::MethodCall::childstate_getStorage { .. } | 403 | | | methods::MethodCall::childstate_getStorageHash { .. } | 404 | | | methods::MethodCall::childstate_getStorageSize { .. } | 405 | | | methods::MethodCall::grandpa_roundState { .. } | 406 | | | methods::MethodCall::offchain_localStorageGet { .. } | 407 | | | methods::MethodCall::offchain_localStorageSet { .. } | 408 | | | methods::MethodCall::payment_queryInfo { .. } | 409 | | | methods::MethodCall::state_call { .. } | 410 | | | methods::MethodCall::state_getKeys { .. } | 411 | | | methods::MethodCall::state_getKeysPaged { .. } | 412 | | | methods::MethodCall::state_getMetadata { .. } | 413 | | | methods::MethodCall::state_getPairs { .. } | 414 | | | methods::MethodCall::state_getReadProof { .. } | 415 | | | methods::MethodCall::state_getRuntimeVersion { .. } | 416 | | | methods::MethodCall::state_getStorage { .. } | 417 | | | methods::MethodCall::state_getStorageHash { .. } | 418 | | | methods::MethodCall::state_getStorageSize { .. } | 419 | | | methods::MethodCall::state_queryStorage { .. } | 420 | | | methods::MethodCall::state_queryStorageAt { .. } | 421 | | | methods::MethodCall::system_accountNextIndex { .. } | 422 | | | methods::MethodCall::system_addReservedPeer { .. } | 423 | | | methods::MethodCall::system_chain { .. } | 424 | | | methods::MethodCall::system_chainType { .. } | 425 | | | methods::MethodCall::system_dryRun { .. } | 426 | | | methods::MethodCall::system_health { .. } | 427 | | | methods::MethodCall::system_localListenAddresses { .. } | 428 | | | methods::MethodCall::system_localPeerId { .. } | 429 | | | methods::MethodCall::system_name { .. } | 430 | | | methods::MethodCall::system_networkState { .. } | 431 | | | methods::MethodCall::system_nodeRoles { .. } | 432 | | | methods::MethodCall::system_peers { .. } | 433 | | | methods::MethodCall::system_properties { .. } | 434 | | | methods::MethodCall::system_removeReservedPeer { .. } | 435 | | | methods::MethodCall::system_version { .. } | 436 | | | methods::MethodCall::chainSpec_v1_chainName { .. } | 437 | | | methods::MethodCall::chainSpec_v1_genesisHash { .. } | 438 | | | methods::MethodCall::chainSpec_v1_properties { .. } | 439 | | | methods::MethodCall::rpc_methods { .. } | 440 | | | methods::MethodCall::sudo_unstable_p2pDiscover { .. } | 441 | | | methods::MethodCall::sudo_unstable_version { .. } | 442 | | | methods::MethodCall::chainHead_v1_body { .. } | 443 | | | methods::MethodCall::chainHead_v1_call { .. } | 444 | | | methods::MethodCall::chainHead_v1_continue { .. } | 445 | | | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. } | 446 | | | methods::MethodCall::chainHead_v1_header { .. } | 447 | | | methods::MethodCall::chainHead_v1_stopOperation { .. } | 448 | | | methods::MethodCall::chainHead_v1_storage { .. } | 449 | | | methods::MethodCall::chainHead_v1_unpin { .. } => { | 450 | | // Simple one-request-one-response. | 451 | 23 | return Event::HandleRequest { | 452 | 23 | request_process: RequestProcess { | 453 | 23 | responses_notifications_queue: self | 454 | 23 | .inner | 455 | 23 | .responses_notifications_queue | 456 | 23 | .clone(), | 457 | 23 | request: new_request, | 458 | 23 | has_sent_response: false, | 459 | 23 | }, | 460 | 23 | task: self, | 461 | 23 | }; | 462 | | } | 463 | | | 464 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } | 465 | | | methods::MethodCall::chain_subscribeAllHeads { .. } | 466 | | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } | 467 | | | methods::MethodCall::chain_subscribeNewHeads { .. } | 468 | | | methods::MethodCall::state_subscribeRuntimeVersion { .. } | 469 | | | methods::MethodCall::state_subscribeStorage { .. } | 470 | | | methods::MethodCall::transaction_v1_broadcast { .. } | 471 | | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } | 472 | | | methods::MethodCall::sudo_network_unstable_watch { .. } | 473 | | | methods::MethodCall::chainHead_v1_follow { .. } => { | 474 | | // Subscription starting requests. | 475 | | | 476 | | // We must check the maximum number of subscriptions. | 477 | 0 | let max_subscriptions = | 478 | 0 | usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX); | 479 | 0 | debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions); | 480 | 0 | if self.inner.active_subscriptions.len() >= max_subscriptions { | 481 | 0 | let response = parse::build_error_response( | 482 | 0 | request_id, | 483 | 0 | ErrorResponse::ServerError(-32000, "Too many active subscriptions"), | 484 | 0 | None, | 485 | 0 | ); | 486 | 0 | let mut responses_queue = | 487 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 488 | 0 | let pos = responses_queue | 489 | 0 | .pending_serialized_responses | 490 | 0 | .insert((response, true)); | 491 | 0 | responses_queue | 492 | 0 | .pending_serialized_responses_queue | 493 | 0 | .push_back(pos); | 494 | 0 | self.inner | 495 | 0 | .serialized_io | 496 | 0 | .on_response_pushed_or_task_destroyed | 497 | 0 | .notify(usize::MAX); | 498 | 0 | continue; | 499 | 0 | } | 500 | 0 |
| 501 | 0 | // Allocate the new subscription ID. | 502 | 0 | let subscription_id = self.allocate_subscription_id(); | 503 | 0 | debug_assert!(!self | 504 | 0 | .inner | 505 | 0 | .active_subscriptions | 506 | 0 | .contains_key(&subscription_id)); | 507 | | | 508 | | // Insert an "kill channel" in the local state. This kill channel is shared | 509 | | // with the subscription object and is used to notify when a subscription | 510 | | // should be killed. | 511 | 0 | let kill_channel = Arc::new(SubscriptionKillChannel { | 512 | 0 | dead: AtomicBool::new(false), | 513 | 0 | on_dead_changed: event_listener::Event::new(), | 514 | 0 | }); | 515 | 0 | self.inner.active_subscriptions.insert( | 516 | 0 | subscription_id.clone(), | 517 | 0 | InnerSubscription { | 518 | 0 | kill_channel: kill_channel.clone(), | 519 | 0 | unsubscribe_response: None, | 520 | 0 | }, | 521 | 0 | ); | 522 | 0 |
| 523 | 0 | return Event::HandleSubscriptionStart { | 524 | 0 | subscription_start: SubscriptionStartProcess { | 525 | 0 | responses_notifications_queue: self | 526 | 0 | .inner | 527 | 0 | .responses_notifications_queue | 528 | 0 | .clone(), | 529 | 0 | request: new_request, | 530 | 0 | kill_channel, | 531 | 0 | subscription_id, | 532 | 0 | has_sent_response: false, | 533 | 0 | }, | 534 | 0 | task: self, | 535 | 0 | }; | 536 | | } | 537 | | | 538 | 0 | methods::MethodCall::author_unwatchExtrinsic { subscription, .. } | 539 | 0 | | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. } | 540 | 0 | | methods::MethodCall::state_unsubscribeStorage { subscription, .. } | 541 | | | methods::MethodCall::transaction_v1_stop { | 542 | 0 | operation_id: subscription, | 543 | | } | 544 | 0 | | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. } | 545 | 0 | | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. } | 546 | | | methods::MethodCall::chainHead_v1_unfollow { | 547 | 0 | follow_subscription: subscription, | 548 | | .. | 549 | | } => { | 550 | | // TODO: must check whether type of subscription matches | 551 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { | 552 | | Some(InnerSubscription { | 553 | 0 | kill_channel, | 554 | 0 | unsubscribe_response, | 555 | 0 | }) if unsubscribe_response.is_none() => { | 556 | 0 | *unsubscribe_response = Some( | 557 | 0 | match parsed_request { | 558 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { | 559 | 0 | methods::Response::author_unwatchExtrinsic(true) | 560 | | } | 561 | | methods::MethodCall::state_unsubscribeRuntimeVersion { | 562 | | .. | 563 | 0 | } => methods::Response::state_unsubscribeRuntimeVersion(true), | 564 | | methods::MethodCall::state_unsubscribeStorage { .. } => { | 565 | 0 | methods::Response::state_unsubscribeStorage(true) | 566 | | } | 567 | | methods::MethodCall::transaction_v1_stop { .. } => { | 568 | 0 | methods::Response::transaction_v1_stop(()) | 569 | | } | 570 | | methods::MethodCall::transactionWatch_v1_unwatch { .. } => { | 571 | 0 | methods::Response::transactionWatch_v1_unwatch(()) | 572 | | } | 573 | | methods::MethodCall::sudo_network_unstable_unwatch { | 574 | | .. | 575 | 0 | } => methods::Response::sudo_network_unstable_unwatch(()), | 576 | | methods::MethodCall::chainHead_v1_unfollow { .. } => { | 577 | 0 | methods::Response::chainHead_v1_unfollow(()) | 578 | | } | 579 | 0 | _ => unreachable!(), | 580 | | } | 581 | 0 | .to_json_response(request_id), | 582 | 0 | ); | 583 | 0 |
| 584 | 0 | kill_channel.dead.store(true, Ordering::Release); | 585 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 586 | | } | 587 | | _ => { | 588 | 0 | let response = match parsed_request { | 589 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { | 590 | 0 | methods::Response::author_unwatchExtrinsic(false) | 591 | 0 | .to_json_response(request_id) | 592 | | } | 593 | | methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => { | 594 | 0 | methods::Response::state_unsubscribeRuntimeVersion(false) | 595 | 0 | .to_json_response(request_id) | 596 | | } | 597 | | methods::MethodCall::state_unsubscribeStorage { .. } => { | 598 | 0 | methods::Response::state_unsubscribeStorage(false) | 599 | 0 | .to_json_response(request_id) | 600 | | } | 601 | 0 | _ => parse::build_error_response( | 602 | 0 | request_id, | 603 | 0 | ErrorResponse::InvalidParams, | 604 | 0 | None, | 605 | 0 | ), | 606 | | }; | 607 | | | 608 | 0 | let mut responses_queue = | 609 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 610 | 0 | let pos = responses_queue | 611 | 0 | .pending_serialized_responses | 612 | 0 | .insert((response, true)); | 613 | 0 | responses_queue | 614 | 0 | .pending_serialized_responses_queue | 615 | 0 | .push_back(pos); | 616 | 0 | self.inner | 617 | 0 | .serialized_io | 618 | 0 | .on_response_pushed_or_task_destroyed | 619 | 0 | .notify(usize::MAX); | 620 | | } | 621 | | } | 622 | | } | 623 | 0 | methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. } | 624 | 0 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. } | 625 | 0 | | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => { | 626 | | // TODO: DRY with above | 627 | | // TODO: must check whether type of subscription matches | 628 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { | 629 | | Some(InnerSubscription { | 630 | 0 | unsubscribe_response, | 631 | 0 | kill_channel, | 632 | 0 | }) if unsubscribe_response.is_none() => { | 633 | 0 | *unsubscribe_response = Some(match parsed_request { | 634 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { | 635 | 0 | methods::Response::chain_unsubscribeAllHeads(true) | 636 | 0 | .to_json_response(request_id) | 637 | | } | 638 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { | 639 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(true) | 640 | 0 | .to_json_response(request_id) | 641 | | } | 642 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { | 643 | 0 | methods::Response::chain_unsubscribeNewHeads(true) | 644 | 0 | .to_json_response(request_id) | 645 | | } | 646 | 0 | _ => unreachable!(), | 647 | | }); | 648 | | | 649 | 0 | kill_channel.dead.store(true, Ordering::Release); | 650 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 651 | | } | 652 | | _ => { | 653 | 0 | let response = match parsed_request { | 654 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { | 655 | 0 | methods::Response::chain_unsubscribeAllHeads(false) | 656 | 0 | .to_json_response(request_id) | 657 | | } | 658 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { | 659 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(false) | 660 | 0 | .to_json_response(request_id) | 661 | | } | 662 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { | 663 | 0 | methods::Response::chain_unsubscribeNewHeads(false) | 664 | 0 | .to_json_response(request_id) | 665 | | } | 666 | 0 | _ => unreachable!(), | 667 | | }; | 668 | | | 669 | 0 | let mut responses_queue = | 670 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 671 | 0 | let pos = responses_queue | 672 | 0 | .pending_serialized_responses | 673 | 0 | .insert((response, true)); | 674 | 0 | responses_queue | 675 | 0 | .pending_serialized_responses_queue | 676 | 0 | .push_back(pos); | 677 | 0 | self.inner | 678 | 0 | .serialized_io | 679 | 0 | .on_response_pushed_or_task_destroyed | 680 | 0 | .notify(usize::MAX); | 681 | | } | 682 | | } | 683 | | } | 684 | | } | 685 | | } | 686 | 44 | } |
|
687 | | |
688 | 0 | fn allocate_subscription_id(&mut self) -> String { |
689 | 0 | let subscription_id = self.inner.next_subscription_id.to_string(); |
690 | 0 | self.inner.next_subscription_id += 1; |
691 | 0 | subscription_id |
692 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id |
693 | | } |
694 | | |
695 | | impl fmt::Debug for ClientMainTask { |
696 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
697 | 0 | f.debug_tuple("ClientMainTask").finish() |
698 | 0 | } Unexecuted instantiation: _RNvXs_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
699 | | } |
700 | | |
701 | | impl Drop for ClientMainTask { |
702 | 21 | fn drop(&mut self) { |
703 | 21 | // Notify the `SerializedRequestsIo`. |
704 | 21 | self.inner |
705 | 21 | .serialized_io |
706 | 21 | .on_response_pushed_or_task_destroyed |
707 | 21 | .notify(usize::MAX); |
708 | 21 | self.inner |
709 | 21 | .serialized_io |
710 | 21 | .on_request_pulled_or_task_destroyed |
711 | 21 | .notify(usize::MAX); |
712 | | |
713 | | // Mark all active subscriptions as dead. |
714 | 21 | for (_, InnerSubscription { kill_channel0 , .. }) in self.inner.active_subscriptions.drain() { |
715 | 0 | kill_channel.dead.store(true, Ordering::Release); |
716 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
717 | 0 | } |
718 | 21 | } Unexecuted instantiation: _RNvXs0_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop _RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Line | Count | Source | 702 | 21 | fn drop(&mut self) { | 703 | 21 | // Notify the `SerializedRequestsIo`. | 704 | 21 | self.inner | 705 | 21 | .serialized_io | 706 | 21 | .on_response_pushed_or_task_destroyed | 707 | 21 | .notify(usize::MAX); | 708 | 21 | self.inner | 709 | 21 | .serialized_io | 710 | 21 | .on_request_pulled_or_task_destroyed | 711 | 21 | .notify(usize::MAX); | 712 | | | 713 | | // Mark all active subscriptions as dead. | 714 | 21 | for (_, InnerSubscription { kill_channel0 , .. }) in self.inner.active_subscriptions.drain() { | 715 | 0 | kill_channel.dead.store(true, Ordering::Release); | 716 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 717 | 0 | } | 718 | 21 | } |
|
719 | | } |
720 | | |
721 | | /// Outcome of the processing of [`ClientMainTask::run_until_event`]. |
722 | | #[derive(Debug)] |
723 | | pub enum Event { |
724 | | /// JSON-RPC client has sent a plain request (i.e. that isn't related to subscriptions). |
725 | | HandleRequest { |
726 | | /// The task that generated the event. |
727 | | task: ClientMainTask, |
728 | | /// Object connected to the [`ClientMainTask`] and containing the information about the |
729 | | /// request to process. |
730 | | request_process: RequestProcess, |
731 | | }, |
732 | | |
733 | | /// JSON-RPC client desires starting a new subscription. |
734 | | /// |
735 | | /// Note that the [`ClientMainTask`] automatically enforces a limit to the maximum number of |
736 | | /// subscriptions. If this event is generated, this check has already passed. |
737 | | HandleSubscriptionStart { |
738 | | /// The task that generated the event. |
739 | | task: ClientMainTask, |
740 | | /// Object connected to the [`ClientMainTask`] and containing the information about the |
741 | | /// request to process. |
742 | | subscription_start: SubscriptionStartProcess, |
743 | | }, |
744 | | |
745 | | /// A [`SubscriptionStartProcess`] object or a [`Subscription`] object has been destroyed. |
746 | | SubscriptionDestroyed { |
747 | | /// The task that generated the event. |
748 | | task: ClientMainTask, |
749 | | /// Id of the subscription that was destroyed. Equals to the value that |
750 | | /// [`Subscription::subscription_id`] would have returned for the now-dead subscription. |
751 | | subscription_id: String, |
752 | | }, |
753 | | |
754 | | /// The [`SerializedRequestsIo`] has been dropped. The [`ClientMainTask`] has been destroyed. |
755 | | SerializedRequestsIoClosed, |
756 | | } |
757 | | |
758 | | /// Object connected to the [`ClientMainTask`] that allows sending requests to the task and |
759 | | /// receiving responses. |
760 | | pub struct SerializedRequestsIo { |
761 | | serialized_io: Weak<SerializedIo>, |
762 | | |
763 | | /// Event notified after the [`SerializedRequestsIo`] is destroyed. |
764 | | on_serialized_requests_io_destroyed: event_listener::Event, |
765 | | } |
766 | | |
767 | | impl SerializedRequestsIo { |
768 | | /// Waits for a response or a notification to send to the JSON-RPC client to be available, |
769 | | /// and returns it. |
770 | | /// |
771 | | /// Returns `None` if the [`ClientMainTask`] has been destroyed. |
772 | | /// |
773 | | /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to |
774 | | /// > this function, otherwise it might never return. |
775 | 25 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> 0 { Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response Unexecuted instantiation: _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response |
776 | 25 | let mut wait = None; |
777 | | |
778 | | loop { |
779 | 53 | let Some(queue) = self.serialized_io.upgrade() else { |
780 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); |
781 | | }; |
782 | | |
783 | | // Lock the responses queue. |
784 | | { |
785 | 53 | let mut responses_queue = queue.responses_queue.lock().await0 ; |
786 | | |
787 | 53 | if let Some(response_index25 ) = responses_queue |
788 | 53 | .pending_serialized_responses_queue |
789 | 53 | .pop_front() |
790 | | { |
791 | 25 | let (response_or_notif, is_response) = responses_queue |
792 | 25 | .pending_serialized_responses |
793 | 25 | .remove(response_index); |
794 | 25 | |
795 | 25 | if is_response { |
796 | 25 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); |
797 | 25 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. |
798 | 0 | } |
799 | | |
800 | | // Shrink containers if necessary in order to reduce memory usage after a |
801 | | // burst of requests. |
802 | 25 | if responses_queue.pending_serialized_responses.capacity() |
803 | 25 | > responses_queue |
804 | 25 | .pending_serialized_responses |
805 | 25 | .len() |
806 | 25 | .saturating_mul(4) |
807 | 25 | { |
808 | 25 | responses_queue.pending_serialized_responses.shrink_to_fit(); |
809 | 25 | }0 |
810 | 25 | if responses_queue |
811 | 25 | .pending_serialized_responses_queue |
812 | 25 | .capacity() |
813 | 25 | > responses_queue |
814 | 25 | .pending_serialized_responses_queue |
815 | 25 | .len() |
816 | 25 | .saturating_mul(4) |
817 | 25 | { |
818 | 25 | responses_queue |
819 | 25 | .pending_serialized_responses_queue |
820 | 25 | .shrink_to_fit(); |
821 | 25 | }0 |
822 | | |
823 | 25 | return Ok(response_or_notif); |
824 | 28 | } |
825 | | } |
826 | | |
827 | 28 | if let Some(wait14 ) = wait.take() { |
828 | 14 | wait.await |
829 | 14 | } else { |
830 | 14 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); |
831 | 14 | } |
832 | | } |
833 | 25 | } Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_ Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_ _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 775 | 2 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { | 776 | 2 | let mut wait = None; | 777 | | | 778 | | loop { | 779 | 4 | let Some(queue) = self.serialized_io.upgrade() else { | 780 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); | 781 | | }; | 782 | | | 783 | | // Lock the responses queue. | 784 | | { | 785 | 4 | let mut responses_queue = queue.responses_queue.lock().await0 ; | 786 | | | 787 | 4 | if let Some(response_index2 ) = responses_queue | 788 | 4 | .pending_serialized_responses_queue | 789 | 4 | .pop_front() | 790 | | { | 791 | 2 | let (response_or_notif, is_response) = responses_queue | 792 | 2 | .pending_serialized_responses | 793 | 2 | .remove(response_index); | 794 | 2 | | 795 | 2 | if is_response { | 796 | 2 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); | 797 | 2 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. | 798 | 0 | } | 799 | | | 800 | | // Shrink containers if necessary in order to reduce memory usage after a | 801 | | // burst of requests. | 802 | 2 | if responses_queue.pending_serialized_responses.capacity() | 803 | 2 | > responses_queue | 804 | 2 | .pending_serialized_responses | 805 | 2 | .len() | 806 | 2 | .saturating_mul(4) | 807 | 2 | { | 808 | 2 | responses_queue.pending_serialized_responses.shrink_to_fit(); | 809 | 2 | }0 | 810 | 2 | if responses_queue | 811 | 2 | .pending_serialized_responses_queue | 812 | 2 | .capacity() | 813 | 2 | > responses_queue | 814 | 2 | .pending_serialized_responses_queue | 815 | 2 | .len() | 816 | 2 | .saturating_mul(4) | 817 | 2 | { | 818 | 2 | responses_queue | 819 | 2 | .pending_serialized_responses_queue | 820 | 2 | .shrink_to_fit(); | 821 | 2 | }0 | 822 | | | 823 | 2 | return Ok(response_or_notif); | 824 | 2 | } | 825 | | } | 826 | | | 827 | 2 | if let Some(wait1 ) = wait.take() { | 828 | 1 | wait.await | 829 | 1 | } else { | 830 | 1 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); | 831 | 1 | } | 832 | | } | 833 | 2 | } |
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CscDgN54JpMGG_6author _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 775 | 23 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { | 776 | 23 | let mut wait = None; | 777 | | | 778 | | loop { | 779 | 49 | let Some(queue) = self.serialized_io.upgrade() else { | 780 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); | 781 | | }; | 782 | | | 783 | | // Lock the responses queue. | 784 | | { | 785 | 49 | let mut responses_queue = queue.responses_queue.lock().await0 ; | 786 | | | 787 | 49 | if let Some(response_index23 ) = responses_queue | 788 | 49 | .pending_serialized_responses_queue | 789 | 49 | .pop_front() | 790 | | { | 791 | 23 | let (response_or_notif, is_response) = responses_queue | 792 | 23 | .pending_serialized_responses | 793 | 23 | .remove(response_index); | 794 | 23 | | 795 | 23 | if is_response { | 796 | 23 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); | 797 | 23 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. | 798 | 0 | } | 799 | | | 800 | | // Shrink containers if necessary in order to reduce memory usage after a | 801 | | // burst of requests. | 802 | 23 | if responses_queue.pending_serialized_responses.capacity() | 803 | 23 | > responses_queue | 804 | 23 | .pending_serialized_responses | 805 | 23 | .len() | 806 | 23 | .saturating_mul(4) | 807 | 23 | { | 808 | 23 | responses_queue.pending_serialized_responses.shrink_to_fit(); | 809 | 23 | }0 | 810 | 23 | if responses_queue | 811 | 23 | .pending_serialized_responses_queue | 812 | 23 | .capacity() | 813 | 23 | > responses_queue | 814 | 23 | .pending_serialized_responses_queue | 815 | 23 | .len() | 816 | 23 | .saturating_mul(4) | 817 | 23 | { | 818 | 23 | responses_queue | 819 | 23 | .pending_serialized_responses_queue | 820 | 23 | .shrink_to_fit(); | 821 | 23 | }0 | 822 | | | 823 | 23 | return Ok(response_or_notif); | 824 | 26 | } | 825 | | } | 826 | | | 827 | 26 | if let Some(wait13 ) = wait.take() { | 828 | 13 | wait.await | 829 | 13 | } else { | 830 | 13 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); | 831 | 13 | } | 832 | | } | 833 | 23 | } |
|
834 | | |
835 | | /// Adds a JSON-RPC request to the queue of requests of the [`ClientMainTask`]. Waits if the |
836 | | /// queue is full. |
837 | | /// |
838 | | /// This might cause a call to [`ClientMainTask::run_until_event`] to return |
839 | | /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`]. |
840 | 0 | pub async fn send_request(&self, request: String) -> Result<(), SendRequestError> { Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request Unexecuted instantiation: _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request |
841 | 0 | // Wait until it is possible to increment `num_requests_in_fly`. |
842 | 0 | let mut wait = None; |
843 | 0 | let queue = loop { |
844 | 0 | let Some(queue) = self.serialized_io.upgrade() else { |
845 | 0 | return Err(SendRequestError { |
846 | 0 | request, |
847 | 0 | cause: SendRequestErrorCause::ClientMainTaskDestroyed, |
848 | 0 | }); |
849 | | }; |
850 | | |
851 | 0 | if queue |
852 | 0 | .num_requests_in_fly |
853 | 0 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { |
854 | 0 | if old_value < queue.max_requests_in_fly.get() { |
855 | | // Considering that `old_value < max`, and `max` fits in a `u32` by |
856 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. |
857 | | // There's no risk of overflow. |
858 | 0 | Some(old_value + 1) |
859 | | } else { |
860 | 0 | None |
861 | | } |
862 | 0 | }) Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_ Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_ Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00CsiUjFBJteJ7x_17smoldot_full_node |
863 | 0 | .is_ok() |
864 | | { |
865 | 0 | break queue; |
866 | 0 | } |
867 | | |
868 | 0 | if let Some(wait) = wait.take() { |
869 | 0 | wait.await; |
870 | 0 | } else { |
871 | 0 | wait = Some(queue.on_request_pulled_or_task_destroyed.listen()); |
872 | 0 | } |
873 | | }; |
874 | | |
875 | | // Everything successful. |
876 | 0 | queue.requests_queue.push(request); |
877 | 0 | queue.on_request_pushed.notify(usize::MAX); |
878 | 0 | Ok(()) |
879 | 0 | } Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_ Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_ Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0CsiUjFBJteJ7x_17smoldot_full_node |
880 | | |
881 | | /// Tries to add a JSON-RPC request to the queue of requests of the [`ClientMainTask`]. |
882 | | /// |
883 | | /// This might cause a call to [`ClientMainTask::run_until_event`] to return |
884 | | /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`]. |
885 | 25 | pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> { |
886 | 25 | let Some(queue) = self.serialized_io.upgrade() else { |
887 | 0 | return Err(TrySendRequestError { |
888 | 0 | request, |
889 | 0 | cause: TrySendRequestErrorCause::ClientMainTaskDestroyed, |
890 | 0 | }); |
891 | | }; |
892 | | |
893 | | // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum. |
894 | 25 | if queue |
895 | 25 | .num_requests_in_fly |
896 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { |
897 | 25 | if old_value < queue.max_requests_in_fly.get() { |
898 | | // Considering that `old_value < max`, and `max` fits in a `u32` by |
899 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. |
900 | | // There's no risk of overflow. |
901 | 25 | Some(old_value + 1) |
902 | | } else { |
903 | 0 | None |
904 | | } |
905 | 25 | }) Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_ _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_ Line | Count | Source | 896 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { | 897 | 25 | if old_value < queue.max_requests_in_fly.get() { | 898 | | // Considering that `old_value < max`, and `max` fits in a `u32` by | 899 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. | 900 | | // There's no risk of overflow. | 901 | 25 | Some(old_value + 1) | 902 | | } else { | 903 | 0 | None | 904 | | } | 905 | 25 | }) |
|
906 | 25 | .is_err() |
907 | | { |
908 | 0 | return Err(TrySendRequestError { |
909 | 0 | request, |
910 | 0 | cause: TrySendRequestErrorCause::TooManyPendingRequests, |
911 | 0 | }); |
912 | 25 | } |
913 | 25 | |
914 | 25 | // Everything successful. |
915 | 25 | queue.requests_queue.push(request); |
916 | 25 | queue.on_request_pushed.notify(usize::MAX); |
917 | 25 | Ok(()) |
918 | 25 | } Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request Line | Count | Source | 885 | 25 | pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> { | 886 | 25 | let Some(queue) = self.serialized_io.upgrade() else { | 887 | 0 | return Err(TrySendRequestError { | 888 | 0 | request, | 889 | 0 | cause: TrySendRequestErrorCause::ClientMainTaskDestroyed, | 890 | 0 | }); | 891 | | }; | 892 | | | 893 | | // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum. | 894 | 25 | if queue | 895 | 25 | .num_requests_in_fly | 896 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { | 897 | | if old_value < queue.max_requests_in_fly.get() { | 898 | | // Considering that `old_value < max`, and `max` fits in a `u32` by | 899 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. | 900 | | // There's no risk of overflow. | 901 | | Some(old_value + 1) | 902 | | } else { | 903 | | None | 904 | | } | 905 | 25 | }) | 906 | 25 | .is_err() | 907 | | { | 908 | 0 | return Err(TrySendRequestError { | 909 | 0 | request, | 910 | 0 | cause: TrySendRequestErrorCause::TooManyPendingRequests, | 911 | 0 | }); | 912 | 25 | } | 913 | 25 | | 914 | 25 | // Everything successful. | 915 | 25 | queue.requests_queue.push(request); | 916 | 25 | queue.on_request_pushed.notify(usize::MAX); | 917 | 25 | Ok(()) | 918 | 25 | } |
|
919 | | } |
920 | | |
921 | | impl fmt::Debug for SerializedRequestsIo { |
922 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
923 | 0 | f.debug_tuple("SerializedRequestsIo").finish() |
924 | 0 | } Unexecuted instantiation: _RNvXs2_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs2_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
925 | | } |
926 | | |
927 | | impl Drop for SerializedRequestsIo { |
928 | 21 | fn drop(&mut self) { |
929 | 21 | self.on_serialized_requests_io_destroyed.notify(usize::MAX); |
930 | 21 | } Unexecuted instantiation: _RNvXs3_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop _RNvXs3_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Line | Count | Source | 928 | 21 | fn drop(&mut self) { | 929 | 21 | self.on_serialized_requests_io_destroyed.notify(usize::MAX); | 930 | 21 | } |
|
931 | | } |
932 | | |
933 | | /// See [`SerializedRequestsIo::wait_next_response`]. |
934 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXsg_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_21WaitNextResponseErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsg_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_21WaitNextResponseErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
935 | | pub enum WaitNextResponseError { |
936 | | /// The attached [`ClientMainTask`] has been destroyed. |
937 | | ClientMainTaskDestroyed, |
938 | | } |
939 | | |
940 | | /// Error returned by [`SerializedRequestsIo::send_request`]. |
941 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsi_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_16SendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsi_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_16SendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
942 | | #[display(fmt = "{cause}")] |
943 | | pub struct SendRequestError { |
944 | | /// The JSON-RPC request that was passed as parameter. |
945 | | pub request: String, |
946 | | /// Reason for the error. |
947 | | pub cause: SendRequestErrorCause, |
948 | | } |
949 | | |
950 | | /// See [`SendRequestError::cause`]. |
951 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsk_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_21SendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsk_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_21SendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
952 | | pub enum SendRequestErrorCause { |
953 | | /// The attached [`ClientMainTask`] has been destroyed. |
954 | | ClientMainTaskDestroyed, |
955 | | } |
956 | | |
957 | | /// Error returned by [`SerializedRequestsIo::try_send_request`]. |
958 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsm_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_19TrySendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsm_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_19TrySendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
959 | | #[display(fmt = "{cause}")] |
960 | | pub struct TrySendRequestError { |
961 | | /// The JSON-RPC request that was passed as parameter. |
962 | | pub request: String, |
963 | | /// Reason for the error. |
964 | | pub cause: TrySendRequestErrorCause, |
965 | | } |
966 | | |
967 | | /// See [`TrySendRequestError::cause`]. |
968 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXso_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24TrySendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXso_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24TrySendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
969 | | pub enum TrySendRequestErrorCause { |
970 | | /// Limit to the maximum number of pending requests that was passed as |
971 | | /// [`Config::max_pending_requests`] has been reached. No more requests can be sent before |
972 | | /// some responses have been pulled. |
973 | | TooManyPendingRequests, |
974 | | /// The attached [`ClientMainTask`] has been destroyed. |
975 | | ClientMainTaskDestroyed, |
976 | | } |
977 | | |
978 | | /// Object connected to the [`ClientMainTask`] and containing a request expecting an answer. |
979 | | /// |
980 | | /// If this object is dropped before the request has been answered, an automatic "internal error" |
981 | | /// error response is automatically sent back. |
982 | | pub struct RequestProcess { |
983 | | /// Queue where responses and subscriptions push responses/notifications. |
984 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
985 | | /// Request in JSON form. Guaranteed to decode successfully. |
986 | | request: String, |
987 | | /// `true` if a response has already been sent. |
988 | | has_sent_response: bool, |
989 | | } |
990 | | |
991 | | impl RequestProcess { |
992 | | /// Returns the request which must be processed. |
993 | | /// |
994 | | /// The request is guaranteed to not be related to subscriptions in any way. |
995 | | // TODO: with stronger typing users wouldn't have to worry about the type of request |
996 | 46 | pub fn request(&self) -> methods::MethodCall { |
997 | 46 | methods::parse_jsonrpc_client_to_server(&self.request) |
998 | 46 | .unwrap() |
999 | 46 | .1 |
1000 | 46 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request Line | Count | Source | 996 | 46 | pub fn request(&self) -> methods::MethodCall { | 997 | 46 | methods::parse_jsonrpc_client_to_server(&self.request) | 998 | 46 | .unwrap() | 999 | 46 | .1 | 1000 | 46 | } |
|
1001 | | |
1002 | | /// Indicate the response to the request to the [`ClientMainTask`]. |
1003 | | /// |
1004 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1005 | 19 | pub fn respond(mut self, response: methods::Response<'_>) { |
1006 | 19 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1007 | 19 | .unwrap() |
1008 | 19 | .0; |
1009 | 19 | let serialized = response.to_json_response(request_id); |
1010 | 19 | self.responses_notifications_queue |
1011 | 19 | .queue |
1012 | 19 | .push(ToMainTask::RequestResponse(serialized)); |
1013 | 19 | self.responses_notifications_queue |
1014 | 19 | .on_pushed |
1015 | 19 | .notify(usize::MAX); |
1016 | 19 | self.has_sent_response = true; |
1017 | 19 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond Line | Count | Source | 1005 | 19 | pub fn respond(mut self, response: methods::Response<'_>) { | 1006 | 19 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1007 | 19 | .unwrap() | 1008 | 19 | .0; | 1009 | 19 | let serialized = response.to_json_response(request_id); | 1010 | 19 | self.responses_notifications_queue | 1011 | 19 | .queue | 1012 | 19 | .push(ToMainTask::RequestResponse(serialized)); | 1013 | 19 | self.responses_notifications_queue | 1014 | 19 | .on_pushed | 1015 | 19 | .notify(usize::MAX); | 1016 | 19 | self.has_sent_response = true; | 1017 | 19 | } |
|
1018 | | |
1019 | | /// Indicate to the [`ClientMainTask`] that the response to the request is `null`. |
1020 | | /// |
1021 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1022 | | // TODO: the necessity for this function is basically a hack |
1023 | 2 | pub fn respond_null(mut self) { |
1024 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1025 | 2 | .unwrap() |
1026 | 2 | .0; |
1027 | 2 | let serialized = parse::build_success_response(request_id, "null"); |
1028 | 2 | self.responses_notifications_queue |
1029 | 2 | .queue |
1030 | 2 | .push(ToMainTask::RequestResponse(serialized)); |
1031 | 2 | self.responses_notifications_queue |
1032 | 2 | .on_pushed |
1033 | 2 | .notify(usize::MAX); |
1034 | 2 | self.has_sent_response = true; |
1035 | 2 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null Line | Count | Source | 1023 | 2 | pub fn respond_null(mut self) { | 1024 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1025 | 2 | .unwrap() | 1026 | 2 | .0; | 1027 | 2 | let serialized = parse::build_success_response(request_id, "null"); | 1028 | 2 | self.responses_notifications_queue | 1029 | 2 | .queue | 1030 | 2 | .push(ToMainTask::RequestResponse(serialized)); | 1031 | 2 | self.responses_notifications_queue | 1032 | 2 | .on_pushed | 1033 | 2 | .notify(usize::MAX); | 1034 | 2 | self.has_sent_response = true; | 1035 | 2 | } |
|
1036 | | |
1037 | | /// Indicate to the [`ClientMainTask`] that the request should return an error. |
1038 | | /// |
1039 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1040 | 2 | pub fn fail(mut self, error: ErrorResponse) { |
1041 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1042 | 2 | .unwrap() |
1043 | 2 | .0; |
1044 | 2 | let serialized = parse::build_error_response(request_id, error, None); |
1045 | 2 | self.responses_notifications_queue |
1046 | 2 | .queue |
1047 | 2 | .push(ToMainTask::RequestResponse(serialized)); |
1048 | 2 | self.responses_notifications_queue |
1049 | 2 | .on_pushed |
1050 | 2 | .notify(usize::MAX); |
1051 | 2 | self.has_sent_response = true; |
1052 | 2 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail Line | Count | Source | 1040 | 2 | pub fn fail(mut self, error: ErrorResponse) { | 1041 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1042 | 2 | .unwrap() | 1043 | 2 | .0; | 1044 | 2 | let serialized = parse::build_error_response(request_id, error, None); | 1045 | 2 | self.responses_notifications_queue | 1046 | 2 | .queue | 1047 | 2 | .push(ToMainTask::RequestResponse(serialized)); | 1048 | 2 | self.responses_notifications_queue | 1049 | 2 | .on_pushed | 1050 | 2 | .notify(usize::MAX); | 1051 | 2 | self.has_sent_response = true; | 1052 | 2 | } |
|
1053 | | |
1054 | | /// Indicate to the [`ClientMainTask`] that the request should return an error. |
1055 | | /// |
1056 | | /// This function is similar to [`RequestProcess`], except that an additional JSON payload is |
1057 | | /// attached to the error. |
1058 | | /// |
1059 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1060 | 0 | pub fn fail_with_attached_json(mut self, error: ErrorResponse, json: &str) { |
1061 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1062 | 0 | .unwrap() |
1063 | 0 | .0; |
1064 | 0 | let serialized = parse::build_error_response(request_id, error, Some(json)); |
1065 | 0 | self.responses_notifications_queue |
1066 | 0 | .queue |
1067 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1068 | 0 | self.responses_notifications_queue |
1069 | 0 | .on_pushed |
1070 | 0 | .notify(usize::MAX); |
1071 | 0 | self.has_sent_response = true; |
1072 | 0 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json |
1073 | | } |
1074 | | |
1075 | | impl fmt::Debug for RequestProcess { |
1076 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1077 | 0 | fmt::Debug::fmt(&self.request, f) |
1078 | 0 | } Unexecuted instantiation: _RNvXs5_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs5_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
1079 | | } |
1080 | | |
1081 | | impl Drop for RequestProcess { |
1082 | 23 | fn drop(&mut self) { |
1083 | 23 | if !self.has_sent_response { |
1084 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1085 | 0 | .unwrap() |
1086 | 0 | .0; |
1087 | 0 | let serialized = |
1088 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); |
1089 | 0 | self.responses_notifications_queue |
1090 | 0 | .queue |
1091 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1092 | 0 | self.responses_notifications_queue |
1093 | 0 | .on_pushed |
1094 | 0 | .notify(usize::MAX); |
1095 | 23 | } |
1096 | 23 | } Unexecuted instantiation: _RNvXs6_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop _RNvXs6_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Line | Count | Source | 1082 | 23 | fn drop(&mut self) { | 1083 | 23 | if !self.has_sent_response { | 1084 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1085 | 0 | .unwrap() | 1086 | 0 | .0; | 1087 | 0 | let serialized = | 1088 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); | 1089 | 0 | self.responses_notifications_queue | 1090 | 0 | .queue | 1091 | 0 | .push(ToMainTask::RequestResponse(serialized)); | 1092 | 0 | self.responses_notifications_queue | 1093 | 0 | .on_pushed | 1094 | 0 | .notify(usize::MAX); | 1095 | 23 | } | 1096 | 23 | } |
|
1097 | | } |
1098 | | |
1099 | | /// Object connected to the [`ClientMainTask`] and containing a request that leads to the creation |
1100 | | /// of a subscription. |
1101 | | /// |
1102 | | /// If this object is dropped before the request has been answered, an automatic "internal error" |
1103 | | /// error response is automatically sent back. |
1104 | | pub struct SubscriptionStartProcess { |
1105 | | /// Queue where responses and subscriptions push responses/notifications. |
1106 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
1107 | | /// `Arc` shared with the client main task and that is used to notify that the subscription |
1108 | | /// should be killed. |
1109 | | kill_channel: Arc<SubscriptionKillChannel>, |
1110 | | /// Request in JSON form. Guaranteed to decode successfully. |
1111 | | request: String, |
1112 | | /// Identifier of the subscription. Assigned by the client task. |
1113 | | subscription_id: String, |
1114 | | /// `true` if a response has already been sent. |
1115 | | has_sent_response: bool, |
1116 | | } |
1117 | | |
1118 | | impl SubscriptionStartProcess { |
1119 | | /// Returns the request which must be processed. |
1120 | | /// |
1121 | | /// The request is guaranteed to be a request that starts a subscription. |
1122 | | // TODO: with stronger typing users wouldn't have to worry about the type of request |
1123 | 0 | pub fn request(&self) -> methods::MethodCall { |
1124 | 0 | methods::parse_jsonrpc_client_to_server(&self.request) |
1125 | 0 | .unwrap() |
1126 | 0 | .1 |
1127 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request |
1128 | | |
1129 | | /// Indicate to the [`ClientMainTask`] that the subscription is accepted. |
1130 | | /// |
1131 | | /// The [`ClientMainTask`] will send the confirmation to the JSON-RPC client. |
1132 | | /// |
1133 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1134 | 0 | pub fn accept(mut self) -> Subscription { |
1135 | 0 | let (request_id, parsed_request) = |
1136 | 0 | methods::parse_jsonrpc_client_to_server(&self.request).unwrap(); |
1137 | | |
1138 | 0 | let serialized_response = match parsed_request { |
1139 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } => { |
1140 | 0 | methods::Response::author_submitAndWatchExtrinsic(Cow::Borrowed( |
1141 | 0 | &self.subscription_id, |
1142 | 0 | )) |
1143 | | } |
1144 | | methods::MethodCall::chain_subscribeAllHeads { .. } => { |
1145 | 0 | methods::Response::chain_subscribeAllHeads(Cow::Borrowed(&self.subscription_id)) |
1146 | | } |
1147 | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } => { |
1148 | 0 | methods::Response::chain_subscribeFinalizedHeads(Cow::Borrowed( |
1149 | 0 | &self.subscription_id, |
1150 | 0 | )) |
1151 | | } |
1152 | | methods::MethodCall::chain_subscribeNewHeads { .. } => { |
1153 | 0 | methods::Response::chain_subscribeNewHeads(Cow::Borrowed(&self.subscription_id)) |
1154 | | } |
1155 | | methods::MethodCall::state_subscribeRuntimeVersion { .. } => { |
1156 | 0 | methods::Response::state_subscribeRuntimeVersion(Cow::Borrowed( |
1157 | 0 | &self.subscription_id, |
1158 | 0 | )) |
1159 | | } |
1160 | | methods::MethodCall::state_subscribeStorage { .. } => { |
1161 | 0 | methods::Response::state_subscribeStorage(Cow::Borrowed(&self.subscription_id)) |
1162 | | } |
1163 | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } => { |
1164 | 0 | methods::Response::transactionWatch_v1_submitAndWatch(Cow::Borrowed( |
1165 | 0 | &self.subscription_id, |
1166 | 0 | )) |
1167 | | } |
1168 | | methods::MethodCall::sudo_network_unstable_watch { .. } => { |
1169 | 0 | methods::Response::sudo_network_unstable_watch(Cow::Borrowed(&self.subscription_id)) |
1170 | | } |
1171 | | methods::MethodCall::chainHead_v1_follow { .. } => { |
1172 | 0 | methods::Response::chainHead_v1_follow(Cow::Borrowed(&self.subscription_id)) |
1173 | | } |
1174 | 0 | _ => unreachable!(), |
1175 | | } |
1176 | 0 | .to_json_response(request_id); |
1177 | 0 |
|
1178 | 0 | self.responses_notifications_queue |
1179 | 0 | .queue |
1180 | 0 | .push(ToMainTask::RequestResponse(serialized_response)); |
1181 | 0 | self.responses_notifications_queue |
1182 | 0 | .on_pushed |
1183 | 0 | .notify(usize::MAX); |
1184 | 0 | self.has_sent_response = true; |
1185 | 0 |
|
1186 | 0 | Subscription { |
1187 | 0 | responses_notifications_queue: self.responses_notifications_queue.clone(), |
1188 | 0 | kill_channel: self.kill_channel.clone(), |
1189 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1190 | 0 | } |
1191 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept |
1192 | | |
1193 | | /// Indicate to the [`ClientMainTask`] that the subscription start request should return an |
1194 | | /// error. |
1195 | | /// |
1196 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1197 | 0 | pub fn fail(mut self, error: ErrorResponse) { |
1198 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1199 | 0 | .unwrap() |
1200 | 0 | .0; |
1201 | 0 | let serialized = parse::build_error_response(request_id, error, None); |
1202 | 0 | self.responses_notifications_queue |
1203 | 0 | .queue |
1204 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1205 | 0 | self.responses_notifications_queue |
1206 | 0 | .queue |
1207 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1208 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1209 | 0 | }); |
1210 | 0 | self.responses_notifications_queue |
1211 | 0 | .on_pushed |
1212 | 0 | .notify(usize::MAX); |
1213 | 0 | self.has_sent_response = true; |
1214 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail |
1215 | | } |
1216 | | |
1217 | | impl fmt::Debug for SubscriptionStartProcess { |
1218 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1219 | 0 | fmt::Debug::fmt(&self.request, f) |
1220 | 0 | } Unexecuted instantiation: _RNvXs8_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs8_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
1221 | | } |
1222 | | |
1223 | | impl Drop for SubscriptionStartProcess { |
1224 | 0 | fn drop(&mut self) { |
1225 | 0 | if !self.has_sent_response { |
1226 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1227 | 0 | .unwrap() |
1228 | 0 | .0; |
1229 | 0 | let serialized = |
1230 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); |
1231 | 0 | self.responses_notifications_queue |
1232 | 0 | .queue |
1233 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1234 | 0 | self.responses_notifications_queue |
1235 | 0 | .queue |
1236 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1237 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1238 | 0 | }); |
1239 | 0 | self.responses_notifications_queue |
1240 | 0 | .on_pushed |
1241 | 0 | .notify(usize::MAX); |
1242 | 0 | } |
1243 | 0 | } Unexecuted instantiation: _RNvXs9_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Unexecuted instantiation: _RNvXs9_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop |
1244 | | } |
1245 | | |
1246 | | /// Object connected to the [`ClientMainTask`] representing an active subscription. |
1247 | | pub struct Subscription { |
1248 | | /// Queue where responses and subscriptions push responses/notifications. |
1249 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
1250 | | /// `Arc` shared with the client main task and that is used to notify that the subscription |
1251 | | /// should be killed. |
1252 | | kill_channel: Arc<SubscriptionKillChannel>, |
1253 | | /// Identifier of the subscription. Assigned by the client task. |
1254 | | subscription_id: String, |
1255 | | } |
1256 | | |
1257 | | /// See [`Subscription::kill_channel`]. |
1258 | | struct SubscriptionKillChannel { |
1259 | | /// `true` if this subscription should be destroyed as soon as possible. |
1260 | | dead: AtomicBool, |
1261 | | /// Notified whenever [`SubscriptionKillChannel::dead`] is modified. |
1262 | | on_dead_changed: event_listener::Event, |
1263 | | } |
1264 | | |
1265 | | impl Subscription { |
1266 | | /// Return the identifier of this subscription. Necessary in order to generate answers. |
1267 | 0 | pub fn subscription_id(&self) -> &str { |
1268 | 0 | &self.subscription_id |
1269 | 0 | } Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id |
1270 | | |
1271 | | /// Send a notification the [`ClientMainTask`]. |
1272 | | /// |
1273 | | /// Has no effect if [`Subscription::is_stale`] would return `true`. |
1274 | | /// |
1275 | | /// This notification might end up being discarded if the queue of responses to send back to |
1276 | | /// the JSON-RPC client is full and/or if the notification is redundant with another |
1277 | | /// notification sent earlier. |
1278 | | /// |
1279 | | /// While this function is asynchronous, it is expected to not take very long provided that |
1280 | | /// [`ClientMainTask::run_until_event`] is called in parallel. |
1281 | | /// |
1282 | | /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to |
1283 | | /// > this function, otherwise it might never return. |
1284 | | // TODO: with stronger typing we could automatically fill the subscription_id |
1285 | 0 | pub async fn send_notification(&mut self, notification: methods::ServerToClient<'_>) { Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification |
1286 | 0 | let serialized = notification.to_json_request_object_parameters(None); |
1287 | 0 |
|
1288 | 0 | // Wait until there is space in the queue or that the subscription is dead. |
1289 | 0 | // Note that this is intentionally racy. |
1290 | 0 | { |
1291 | 0 | let mut wait = None; |
1292 | 0 | loop { |
1293 | 0 | // If the subscription is dead, simply do nothing. This is purely an optimization. |
1294 | 0 | if self.kill_channel.dead.load(Ordering::Relaxed) { |
1295 | 0 | return; |
1296 | 0 | } |
1297 | 0 |
|
1298 | 0 | // If there is space, break out of the loop in order to send. |
1299 | 0 | if self.responses_notifications_queue.queue.len() |
1300 | 0 | < self.responses_notifications_queue.max_len |
1301 | | { |
1302 | 0 | break; |
1303 | 0 | } |
1304 | | |
1305 | 0 | if let Some(wait) = wait.take() { |
1306 | 0 | wait.await |
1307 | 0 | } else { |
1308 | 0 | wait = Some( |
1309 | 0 | self.responses_notifications_queue |
1310 | 0 | .on_popped |
1311 | 0 | .listen() |
1312 | 0 | .or(self.kill_channel.on_dead_changed.listen()), |
1313 | 0 | ); |
1314 | 0 | } |
1315 | | } |
1316 | | } |
1317 | | |
1318 | | // Actually push the element. |
1319 | 0 | self.responses_notifications_queue |
1320 | 0 | .queue |
1321 | 0 | .push(ToMainTask::Notification(serialized)); |
1322 | 0 | self.responses_notifications_queue |
1323 | 0 | .on_pushed |
1324 | 0 | .notify(usize::MAX); |
1325 | 0 | } Unexecuted instantiation: _RNCNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_ Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_ Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0CsiUjFBJteJ7x_17smoldot_full_node |
1326 | | |
1327 | | /// Returns `true` if the JSON-RPC client has unsubscribed, or the [`ClientMainTask`] has been |
1328 | | /// destroyed, or the queue of responses to send to the JSON-RPC client is clogged and the |
1329 | | /// logic of the subscription requires that it stops altogether in that situation. |
1330 | | /// |
1331 | | /// Due to the racy nature of this function, a value of `false` can at any moment switch to |
1332 | | /// `true` and thus should be interpreted as "maybe". A value of `true`, however, actually |
1333 | | /// means "yes", as it can't ever switch back to `false`. |
1334 | 0 | pub fn is_stale(&self) -> bool { |
1335 | 0 | self.kill_channel.dead.load(Ordering::Relaxed) |
1336 | 0 | } Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale |
1337 | | |
1338 | | /// Run indefinitely until [`Subscription::is_stale`] returns `true`. |
1339 | 0 | pub async fn wait_until_stale(&mut self) { Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale |
1340 | 0 | // The control flow of this function is a bit magic, but simple enough that it should be |
1341 | 0 | // easy to understand. |
1342 | 0 | let mut wait = None; |
1343 | 0 | loop { |
1344 | 0 | if self.kill_channel.dead.load(Ordering::Acquire) { |
1345 | 0 | return; |
1346 | 0 | } |
1347 | | |
1348 | 0 | if let Some(wait) = wait.take() { |
1349 | 0 | wait.await; |
1350 | 0 | } else { |
1351 | 0 | wait = Some(self.kill_channel.on_dead_changed.listen()); |
1352 | 0 | } |
1353 | | } |
1354 | 0 | } Unexecuted instantiation: _RNCNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_ Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_ |
1355 | | } |
1356 | | |
1357 | | impl fmt::Debug for Subscription { |
1358 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1359 | 0 | f.debug_tuple("Subscription") |
1360 | 0 | .field(&self.subscription_id) |
1361 | 0 | .finish() |
1362 | 0 | } Unexecuted instantiation: _RNvXsb_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXsb_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
1363 | | } |
1364 | | |
1365 | | impl Drop for Subscription { |
1366 | 0 | fn drop(&mut self) { |
1367 | 0 | self.responses_notifications_queue |
1368 | 0 | .queue |
1369 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1370 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1371 | 0 | }); |
1372 | 0 | self.responses_notifications_queue |
1373 | 0 | .on_pushed |
1374 | 0 | .notify(usize::MAX); |
1375 | 0 | } Unexecuted instantiation: _RNvXsc_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Unexecuted instantiation: _RNvXsc_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop |
1376 | | } |