/__w/smoldot/smoldot/repo/lib/src/json_rpc/service/client_main_task.rs
Line | Count | Source |
1 | | // Smoldot |
2 | | // Copyright (C) 2023 Pierre Krieger |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | // TODO: doc |
19 | | |
20 | | use crate::json_rpc::{methods, parse}; |
21 | | use alloc::{ |
22 | | borrow::Cow, |
23 | | boxed::Box, |
24 | | collections::VecDeque, |
25 | | string::{String, ToString as _}, |
26 | | sync::{Arc, Weak}, |
27 | | }; |
28 | | use async_lock::Mutex; |
29 | | use core::{ |
30 | | cmp, fmt, mem, |
31 | | num::NonZero, |
32 | | sync::atomic::{AtomicBool, AtomicU32, Ordering}, |
33 | | }; |
34 | | use futures_lite::FutureExt as _; |
35 | | use slab::Slab; |
36 | | |
37 | | pub use crate::json_rpc::parse::{ErrorResponse, ParseError}; |
38 | | |
39 | | /// See [module-level-documentation](..). |
40 | | pub struct ClientMainTask { |
41 | | /// Because we move the task around a lot, all the fields are actually within a `Box`. |
42 | | inner: Box<Inner>, |
43 | | } |
44 | | |
45 | | struct Inner { |
46 | | /// Identifier to allocate to the new subscription requested by the user. |
47 | | // TODO: better strategy than just integers? |
48 | | next_subscription_id: u64, |
49 | | |
50 | | /// List of all active subscriptions. Keys are subscription IDs. |
51 | | /// |
52 | | /// Given that the subscription IDs are allocated locally, there is no harm in using a |
53 | | /// non-HashDoS-resilient hash function. |
54 | | /// |
55 | | /// Entries are removed only when the [`SubscriptionStartProcess`] or [`Subscription`] object |
56 | | /// is destroyed. This is necessary given that the maximum number of subscriptions exists in |
57 | | /// order to avoid spam attacks, and that resources are free'd only when the |
58 | | /// [`SubscriptionStartProcess`] or [`Subscription`] is destroyed. |
59 | | active_subscriptions: hashbrown::HashMap<String, InnerSubscription, fnv::FnvBuildHasher>, |
60 | | /// Maximum size that [`Inner::active_subscriptions`] is allowed to reach. Beyond this, |
61 | | /// subscription start requests are automatically denied. |
62 | | max_active_subscriptions: u32, |
63 | | |
64 | | /// Structure shared with the [`SerializedRequestsIo`]. |
65 | | serialized_io: Arc<SerializedIo>, |
66 | | |
67 | | /// Queue where responses and subscriptions push responses/notifications. |
68 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
69 | | |
70 | | /// Event notified after the [`SerializedRequestsIo`] is destroyed. |
71 | | on_serialized_requests_io_destroyed: event_listener::EventListener, |
72 | | } |
73 | | |
74 | | struct InnerSubscription { |
75 | | /// Shared with the subscription. Used to notify the subscription that it should be killed. |
76 | | kill_channel: Arc<SubscriptionKillChannel>, |
77 | | /// Response to an unsubscribe request that must be sent out once the subscription is killed. |
78 | | unsubscribe_response: Option<String>, |
79 | | } |
80 | | |
81 | | struct SerializedIo { |
82 | | /// Queue of requests. The requests are guaranteed to be a valid request JSON, but not |
83 | | /// necessarily to use a known method. |
84 | | requests_queue: crossbeam_queue::SegQueue<String>, |
85 | | |
86 | | /// Event notified after an element has been pushed to [`SerializedIo::requests_queue`]. |
87 | | on_request_pushed: event_listener::Event, |
88 | | |
89 | | /// Event notified after an element from [`SerializedIo::requests_queue`] has been pulled. |
90 | | on_request_pulled_or_task_destroyed: event_listener::Event, |
91 | | |
92 | | /// Number of requests that have have been received from the client but whose answer hasn't |
93 | | /// been pulled out from [`SerializedIo::requests_queue`] yet. |
94 | | num_requests_in_fly: AtomicU32, |
95 | | |
96 | | /// Maximum value that [`SerializedIo::num_requests_in_fly`] is allowed to reach. |
97 | | /// Beyond this, no more request should be added to [`SerializedIo::requests_queue`]. |
98 | | max_requests_in_fly: NonZero<u32>, |
99 | | |
100 | | /// Queue of responses. |
101 | | responses_queue: Mutex<SerializedIoResponses>, |
102 | | |
103 | | /// Event notified after an element has been pushed to [`SerializedIo::responses_queue`], or |
104 | | /// when the [`ClientMainTask`] has been destroyed. |
105 | | on_response_pushed_or_task_destroyed: event_listener::Event, |
106 | | } |
107 | | |
108 | | struct SerializedIoResponses { |
109 | | /// Unordered list of responses and notifications to send back to the client. |
110 | | /// |
111 | | /// Each entry contains the response/notification, and a boolean equal to `true` if this is |
112 | | /// a request response or `false` if this is a notification. |
113 | | pending_serialized_responses: Slab<(String, bool)>, |
114 | | |
115 | | /// Ordered list of responses and notifications to send back to the client, as indices within |
116 | | /// [`SerializedIoResponses::pending_serialized_responses`]. |
117 | | pending_serialized_responses_queue: VecDeque<usize>, |
118 | | } |
119 | | |
120 | | /// Queue where responses and subscriptions push responses/notifications. |
121 | | struct ResponsesNotificationsQueue { |
122 | | /// The actual queue. |
123 | | queue: crossbeam_queue::SegQueue<ToMainTask>, |
124 | | /// Maximum size that [`ResponsesNotificationsQueue::queue`] should reach. |
125 | | /// This is however not a hard limit. Pushing a response to a request and pushing a |
126 | | /// subscription destroyed event ignore this maximum (as doing so must always be lock-free), |
127 | | /// and pushing a notification checks against this limit in a racy way. For this reason, in |
128 | | /// the worst case scenario the queue can reach up to |
129 | | /// `max_requests_in_fly + max_active_subscriptions` elements. What matters, however, is that |
130 | | /// the queue is bounded in a way or the other more than the exact bound. |
131 | | max_len: usize, |
132 | | /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been pushed. |
133 | | on_pushed: event_listener::Event, |
134 | | /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been popped. |
135 | | on_popped: event_listener::Event, |
136 | | } |
137 | | |
138 | | // TODO: weird enum |
139 | | enum ToMainTask { |
140 | | RequestResponse(String), |
141 | | Notification(String), |
142 | | SubscriptionDestroyed { subscription_id: String }, |
143 | | } |
144 | | |
145 | | /// Configuration for [`client_main_task`]. |
146 | | pub struct Config { |
147 | | /// Maximum number of requests that have been sent by the [`SerializedRequestsIo`] but whose |
148 | | /// response hasn't been pulled through the [`SerializedRequestsIo`] yet. |
149 | | /// |
150 | | /// If this limit is reached, it is not possible to send further requests without pulling |
151 | | /// responses first. |
152 | | pub max_pending_requests: NonZero<u32>, |
153 | | |
154 | | /// Maximum number of simultaneous subscriptions allowed. Trying to create a subscription will |
155 | | /// be automatically rejected if this limit is reached. |
156 | | pub max_active_subscriptions: u32, |
157 | | } |
158 | | |
159 | | /// Creates a new [`ClientMainTask`] and a [`SerializedRequestsIo`] connected to it. |
160 | 21 | pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) { |
161 | 21 | let buffers_capacity = usize::try_from(config.max_pending_requests.get()) |
162 | 21 | .unwrap_or(usize::MAX) |
163 | 21 | .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX)); |
164 | | |
165 | 21 | let on_serialized_requests_io_destroyed = event_listener::Event::new(); |
166 | | |
167 | 21 | let task = ClientMainTask { |
168 | 21 | inner: Box::new(Inner { |
169 | 21 | next_subscription_id: 1, |
170 | 21 | active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( |
171 | 21 | cmp::min( |
172 | 21 | usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX), |
173 | 21 | 32, |
174 | 21 | ), |
175 | 21 | Default::default(), |
176 | 21 | ), |
177 | 21 | max_active_subscriptions: config.max_active_subscriptions, |
178 | 21 | serialized_io: Arc::new(SerializedIo { |
179 | 21 | requests_queue: crossbeam_queue::SegQueue::new(), |
180 | 21 | on_request_pushed: event_listener::Event::new(), |
181 | 21 | on_request_pulled_or_task_destroyed: event_listener::Event::new(), |
182 | 21 | num_requests_in_fly: AtomicU32::new(0), |
183 | 21 | max_requests_in_fly: config.max_pending_requests, |
184 | 21 | responses_queue: Mutex::new(SerializedIoResponses { |
185 | 21 | pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min( |
186 | 21 | 64, |
187 | 21 | buffers_capacity, |
188 | 21 | )), |
189 | 21 | pending_serialized_responses: Slab::with_capacity(cmp::min( |
190 | 21 | 64, |
191 | 21 | buffers_capacity, |
192 | 21 | )), |
193 | 21 | }), |
194 | 21 | on_response_pushed_or_task_destroyed: event_listener::Event::new(), |
195 | 21 | }), |
196 | 21 | responses_notifications_queue: Arc::new(ResponsesNotificationsQueue { |
197 | 21 | queue: crossbeam_queue::SegQueue::new(), |
198 | 21 | max_len: buffers_capacity, |
199 | 21 | on_pushed: event_listener::Event::new(), |
200 | 21 | on_popped: event_listener::Event::new(), |
201 | 21 | }), |
202 | 21 | on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(), |
203 | 21 | }), |
204 | 21 | }; |
205 | | |
206 | 21 | let serialized_requests_io = SerializedRequestsIo { |
207 | 21 | serialized_io: Arc::downgrade(&task.inner.serialized_io), |
208 | 21 | on_serialized_requests_io_destroyed, |
209 | 21 | }; |
210 | | |
211 | 21 | (task, serialized_requests_io) |
212 | 21 | } Unexecuted instantiation: _RNvNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_task16client_main_task _RNvNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_task16client_main_task Line | Count | Source | 160 | 21 | pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) { | 161 | 21 | let buffers_capacity = usize::try_from(config.max_pending_requests.get()) | 162 | 21 | .unwrap_or(usize::MAX) | 163 | 21 | .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX)); | 164 | | | 165 | 21 | let on_serialized_requests_io_destroyed = event_listener::Event::new(); | 166 | | | 167 | 21 | let task = ClientMainTask { | 168 | 21 | inner: Box::new(Inner { | 169 | 21 | next_subscription_id: 1, | 170 | 21 | active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher( | 171 | 21 | cmp::min( | 172 | 21 | usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX), | 173 | 21 | 32, | 174 | 21 | ), | 175 | 21 | Default::default(), | 176 | 21 | ), | 177 | 21 | max_active_subscriptions: config.max_active_subscriptions, | 178 | 21 | serialized_io: Arc::new(SerializedIo { | 179 | 21 | requests_queue: crossbeam_queue::SegQueue::new(), | 180 | 21 | on_request_pushed: event_listener::Event::new(), | 181 | 21 | on_request_pulled_or_task_destroyed: event_listener::Event::new(), | 182 | 21 | num_requests_in_fly: AtomicU32::new(0), | 183 | 21 | max_requests_in_fly: config.max_pending_requests, | 184 | 21 | responses_queue: Mutex::new(SerializedIoResponses { | 185 | 21 | pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min( | 186 | 21 | 64, | 187 | 21 | buffers_capacity, | 188 | 21 | )), | 189 | 21 | pending_serialized_responses: Slab::with_capacity(cmp::min( | 190 | 21 | 64, | 191 | 21 | buffers_capacity, | 192 | 21 | )), | 193 | 21 | }), | 194 | 21 | on_response_pushed_or_task_destroyed: event_listener::Event::new(), | 195 | 21 | }), | 196 | 21 | responses_notifications_queue: Arc::new(ResponsesNotificationsQueue { | 197 | 21 | queue: crossbeam_queue::SegQueue::new(), | 198 | 21 | max_len: buffers_capacity, | 199 | 21 | on_pushed: event_listener::Event::new(), | 200 | 21 | on_popped: event_listener::Event::new(), | 201 | 21 | }), | 202 | 21 | on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(), | 203 | 21 | }), | 204 | 21 | }; | 205 | | | 206 | 21 | let serialized_requests_io = SerializedRequestsIo { | 207 | 21 | serialized_io: Arc::downgrade(&task.inner.serialized_io), | 208 | 21 | on_serialized_requests_io_destroyed, | 209 | 21 | }; | 210 | | | 211 | 21 | (task, serialized_requests_io) | 212 | 21 | } |
|
213 | | |
214 | | impl ClientMainTask { |
215 | | /// Processes the task's internals and waits until something noteworthy happens. |
216 | 44 | pub async fn run_until_event(mut self) -> Event { Unexecuted instantiation: _RNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event _RNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event Line | Count | Source | 216 | 44 | pub async fn run_until_event(mut self) -> Event { |
|
217 | | loop { |
218 | | enum WakeUpReason { |
219 | | NewRequest(String), |
220 | | Message(ToMainTask), |
221 | | } |
222 | | |
223 | 48 | let wake_up_reason = { |
224 | 69 | let serialized_requests_io_destroyed = async { |
225 | 69 | (&mut self.inner.on_serialized_requests_io_destroyed).await; |
226 | 21 | Err(()) |
227 | 21 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_ _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00CsfFWJyR6nd6r_17smoldot_full_node Line | Count | Source | 224 | 69 | let serialized_requests_io_destroyed = async { | 225 | 69 | (&mut self.inner.on_serialized_requests_io_destroyed).await; | 226 | 21 | Err(()) | 227 | 21 | }; |
Unexecuted instantiation: _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_ |
228 | | |
229 | 69 | let next_serialized_request = async {67 |
230 | 67 | let mut wait = None; |
231 | | loop { |
232 | 133 | if let Some(elem25 ) = self.inner.serialized_io.requests_queue.pop() { |
233 | 25 | self.inner |
234 | 25 | .serialized_io |
235 | 25 | .on_request_pulled_or_task_destroyed |
236 | 25 | .notify(usize::MAX); |
237 | 25 | break Ok(WakeUpReason::NewRequest(elem)); |
238 | 108 | } |
239 | 108 | if let Some(wait54 ) = wait.take() { |
240 | 54 | wait.await |
241 | 54 | } else { |
242 | 54 | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); |
243 | 54 | } |
244 | | } |
245 | 25 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_ _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0CsfFWJyR6nd6r_17smoldot_full_node Line | Count | Source | 229 | 67 | let next_serialized_request = async { | 230 | 67 | let mut wait = None; | 231 | | loop { | 232 | 133 | if let Some(elem25 ) = self.inner.serialized_io.requests_queue.pop() { | 233 | 25 | self.inner | 234 | 25 | .serialized_io | 235 | 25 | .on_request_pulled_or_task_destroyed | 236 | 25 | .notify(usize::MAX); | 237 | 25 | break Ok(WakeUpReason::NewRequest(elem)); | 238 | 108 | } | 239 | 108 | if let Some(wait54 ) = wait.take() { | 240 | 54 | wait.await | 241 | 54 | } else { | 242 | 54 | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); | 243 | 54 | } | 244 | | } | 245 | 25 | }; |
Unexecuted instantiation: _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_ |
246 | | |
247 | 69 | let response_notif = async {54 |
248 | 54 | let mut wait = None; |
249 | | loop { |
250 | 131 | if let Some(elem23 ) = self.inner.responses_notifications_queue.queue.pop() { |
251 | 23 | break Ok(WakeUpReason::Message(elem)); |
252 | 108 | } |
253 | 108 | if let Some(wait54 ) = wait.take() { |
254 | 54 | wait.await |
255 | 54 | } else { |
256 | 54 | wait = |
257 | 54 | Some(self.inner.responses_notifications_queue.on_pushed.listen()); |
258 | 54 | } |
259 | | } |
260 | 23 | }; Unexecuted instantiation: _RNCNCNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_ _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0CsfFWJyR6nd6r_17smoldot_full_node Line | Count | Source | 247 | 54 | let response_notif = async { | 248 | 54 | let mut wait = None; | 249 | | loop { | 250 | 131 | if let Some(elem23 ) = self.inner.responses_notifications_queue.queue.pop() { | 251 | 23 | break Ok(WakeUpReason::Message(elem)); | 252 | 108 | } | 253 | 108 | if let Some(wait54 ) = wait.take() { | 254 | 54 | wait.await | 255 | 54 | } else { | 256 | 54 | wait = | 257 | 54 | Some(self.inner.responses_notifications_queue.on_pushed.listen()); | 258 | 54 | } | 259 | | } | 260 | 23 | }; |
Unexecuted instantiation: _RNCNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_ |
261 | | |
262 | 69 | match serialized_requests_io_destroyed |
263 | 69 | .or(next_serialized_request) |
264 | 69 | .or(response_notif) |
265 | 69 | .await |
266 | | { |
267 | 48 | Ok(wake_up_reason) => wake_up_reason, |
268 | 21 | Err(()) => return Event::SerializedRequestsIoClosed, |
269 | | } |
270 | | }; |
271 | | |
272 | | // Immediately handle every event apart from `NewRequest`. |
273 | 25 | let new_request = match wake_up_reason23 { |
274 | 25 | WakeUpReason::NewRequest(request) => request, |
275 | 0 | WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => { |
276 | | let InnerSubscription { |
277 | 0 | unsubscribe_response, |
278 | | .. |
279 | 0 | } = self |
280 | 0 | .inner |
281 | 0 | .active_subscriptions |
282 | 0 | .remove(&subscription_id) |
283 | 0 | .unwrap(); |
284 | | // TODO: post a `stop`/`error` event for chainhead subscriptions |
285 | 0 | if let Some(unsubscribe_response) = unsubscribe_response { |
286 | 0 | let mut responses_queue = |
287 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
288 | 0 | let pos = responses_queue |
289 | 0 | .pending_serialized_responses |
290 | 0 | .insert((unsubscribe_response, true)); |
291 | 0 | responses_queue |
292 | 0 | .pending_serialized_responses_queue |
293 | 0 | .push_back(pos); |
294 | 0 | self.inner |
295 | 0 | .serialized_io |
296 | 0 | .on_response_pushed_or_task_destroyed |
297 | 0 | .notify(usize::MAX); |
298 | 0 | } |
299 | | |
300 | | // Shrink the list of active subscriptions if necessary. |
301 | 0 | if self.inner.active_subscriptions.capacity() |
302 | 0 | >= 2 * self.inner.active_subscriptions.len() + 16 |
303 | 0 | { |
304 | 0 | self.inner.active_subscriptions.shrink_to_fit(); |
305 | 0 | } |
306 | | |
307 | 0 | return Event::SubscriptionDestroyed { |
308 | 0 | task: self, |
309 | 0 | subscription_id, |
310 | 0 | }; |
311 | | } |
312 | 23 | WakeUpReason::Message(ToMainTask::RequestResponse(response)) => { |
313 | 23 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; |
314 | 23 | let pos = responses_queue |
315 | 23 | .pending_serialized_responses |
316 | 23 | .insert((response, true)); |
317 | 23 | responses_queue |
318 | 23 | .pending_serialized_responses_queue |
319 | 23 | .push_back(pos); |
320 | 23 | self.inner |
321 | 23 | .serialized_io |
322 | 23 | .on_response_pushed_or_task_destroyed |
323 | 23 | .notify(usize::MAX); |
324 | 23 | continue; |
325 | | } |
326 | 0 | WakeUpReason::Message(ToMainTask::Notification(notification)) => { |
327 | | // TODO: filter out redundant notifications, as it's the entire point of this module |
328 | 0 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; |
329 | 0 | let pos = responses_queue |
330 | 0 | .pending_serialized_responses |
331 | 0 | .insert((notification, false)); |
332 | 0 | responses_queue |
333 | 0 | .pending_serialized_responses_queue |
334 | 0 | .push_back(pos); |
335 | 0 | self.inner |
336 | 0 | .serialized_io |
337 | 0 | .on_response_pushed_or_task_destroyed |
338 | 0 | .notify(usize::MAX); |
339 | 0 | continue; |
340 | | } |
341 | | }; |
342 | | |
343 | 23 | let (request_id, parsed_request) = |
344 | 25 | match methods::parse_jsonrpc_client_to_server(&new_request) { |
345 | 23 | Ok((request_id, method)) => (request_id, method), |
346 | 1 | Err(methods::ParseClientToServerError::Method { request_id, error }) => { |
347 | 1 | let response = error.to_json_error(request_id); |
348 | 1 | let mut responses_queue = |
349 | 1 | self.inner.serialized_io.responses_queue.lock().await; |
350 | 1 | let pos = responses_queue |
351 | 1 | .pending_serialized_responses |
352 | 1 | .insert((response, true)); |
353 | 1 | responses_queue |
354 | 1 | .pending_serialized_responses_queue |
355 | 1 | .push_back(pos); |
356 | 1 | self.inner |
357 | 1 | .serialized_io |
358 | 1 | .on_response_pushed_or_task_destroyed |
359 | 1 | .notify(usize::MAX); |
360 | 1 | continue; |
361 | | } |
362 | 0 | Err(methods::ParseClientToServerError::UnknownNotification { .. }) => continue, |
363 | | Err(methods::ParseClientToServerError::JsonRpcParse(_)) => { |
364 | 1 | let response = parse::build_parse_error_response(); |
365 | 1 | let mut responses_queue = |
366 | 1 | self.inner.serialized_io.responses_queue.lock().await; |
367 | 1 | let pos = responses_queue |
368 | 1 | .pending_serialized_responses |
369 | 1 | .insert((response, true)); |
370 | 1 | responses_queue |
371 | 1 | .pending_serialized_responses_queue |
372 | 1 | .push_back(pos); |
373 | 1 | self.inner |
374 | 1 | .serialized_io |
375 | 1 | .on_response_pushed_or_task_destroyed |
376 | 1 | .notify(usize::MAX); |
377 | 1 | continue; |
378 | | } |
379 | | }; |
380 | | |
381 | | // There exists three types of requests: |
382 | | // |
383 | | // - Requests that follow a simple one-request-one-response schema. |
384 | | // - Requests that, if accepted, start a subscription. |
385 | | // - Requests that unsubscribe from a subscription. |
386 | | // |
387 | 23 | match &parsed_request { |
388 | | methods::MethodCall::account_nextIndex { .. } |
389 | | | methods::MethodCall::author_hasKey { .. } |
390 | | | methods::MethodCall::author_hasSessionKeys { .. } |
391 | | | methods::MethodCall::author_insertKey { .. } |
392 | | | methods::MethodCall::author_pendingExtrinsics { .. } |
393 | | | methods::MethodCall::author_removeExtrinsic { .. } |
394 | | | methods::MethodCall::author_rotateKeys { .. } |
395 | | | methods::MethodCall::author_submitExtrinsic { .. } |
396 | | | methods::MethodCall::babe_epochAuthorship { .. } |
397 | | | methods::MethodCall::chain_getBlock { .. } |
398 | | | methods::MethodCall::chain_getBlockHash { .. } |
399 | | | methods::MethodCall::chain_getFinalizedHead { .. } |
400 | | | methods::MethodCall::chain_getHeader { .. } |
401 | | | methods::MethodCall::childstate_getKeys { .. } |
402 | | | methods::MethodCall::childstate_getStorage { .. } |
403 | | | methods::MethodCall::childstate_getStorageHash { .. } |
404 | | | methods::MethodCall::childstate_getStorageSize { .. } |
405 | | | methods::MethodCall::grandpa_roundState { .. } |
406 | | | methods::MethodCall::offchain_localStorageGet { .. } |
407 | | | methods::MethodCall::offchain_localStorageSet { .. } |
408 | | | methods::MethodCall::payment_queryInfo { .. } |
409 | | | methods::MethodCall::state_call { .. } |
410 | | | methods::MethodCall::state_getKeys { .. } |
411 | | | methods::MethodCall::state_getKeysPaged { .. } |
412 | | | methods::MethodCall::state_getMetadata { .. } |
413 | | | methods::MethodCall::state_getPairs { .. } |
414 | | | methods::MethodCall::state_getReadProof { .. } |
415 | | | methods::MethodCall::state_getRuntimeVersion { .. } |
416 | | | methods::MethodCall::state_getStorage { .. } |
417 | | | methods::MethodCall::state_getStorageHash { .. } |
418 | | | methods::MethodCall::state_getStorageSize { .. } |
419 | | | methods::MethodCall::state_queryStorage { .. } |
420 | | | methods::MethodCall::state_queryStorageAt { .. } |
421 | | | methods::MethodCall::system_accountNextIndex { .. } |
422 | | | methods::MethodCall::system_addReservedPeer { .. } |
423 | | | methods::MethodCall::system_chain { .. } |
424 | | | methods::MethodCall::system_chainType { .. } |
425 | | | methods::MethodCall::system_dryRun { .. } |
426 | | | methods::MethodCall::system_health { .. } |
427 | | | methods::MethodCall::system_localListenAddresses { .. } |
428 | | | methods::MethodCall::system_localPeerId { .. } |
429 | | | methods::MethodCall::system_name { .. } |
430 | | | methods::MethodCall::system_networkState { .. } |
431 | | | methods::MethodCall::system_nodeRoles { .. } |
432 | | | methods::MethodCall::system_peers { .. } |
433 | | | methods::MethodCall::system_properties { .. } |
434 | | | methods::MethodCall::system_removeReservedPeer { .. } |
435 | | | methods::MethodCall::system_version { .. } |
436 | | | methods::MethodCall::chainSpec_v1_chainName { .. } |
437 | | | methods::MethodCall::chainSpec_v1_genesisHash { .. } |
438 | | | methods::MethodCall::chainSpec_v1_properties { .. } |
439 | | | methods::MethodCall::rpc_methods { .. } |
440 | | | methods::MethodCall::sudo_unstable_p2pDiscover { .. } |
441 | | | methods::MethodCall::sudo_unstable_version { .. } |
442 | | | methods::MethodCall::chainHead_v1_body { .. } |
443 | | | methods::MethodCall::chainHead_v1_call { .. } |
444 | | | methods::MethodCall::chainHead_v1_continue { .. } |
445 | | | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. } |
446 | | | methods::MethodCall::chainHead_v1_header { .. } |
447 | | | methods::MethodCall::chainHead_v1_stopOperation { .. } |
448 | | | methods::MethodCall::chainHead_v1_storage { .. } |
449 | | | methods::MethodCall::chainHead_v1_unpin { .. } => { |
450 | | // Simple one-request-one-response. |
451 | 23 | return Event::HandleRequest { |
452 | 23 | request_process: RequestProcess { |
453 | 23 | responses_notifications_queue: self |
454 | 23 | .inner |
455 | 23 | .responses_notifications_queue |
456 | 23 | .clone(), |
457 | 23 | request: new_request, |
458 | 23 | has_sent_response: false, |
459 | 23 | }, |
460 | 23 | task: self, |
461 | 23 | }; |
462 | | } |
463 | | |
464 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } |
465 | | | methods::MethodCall::chain_subscribeAllHeads { .. } |
466 | | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } |
467 | | | methods::MethodCall::chain_subscribeNewHeads { .. } |
468 | | | methods::MethodCall::state_subscribeRuntimeVersion { .. } |
469 | | | methods::MethodCall::state_subscribeStorage { .. } |
470 | | | methods::MethodCall::transaction_v1_broadcast { .. } |
471 | | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } |
472 | | | methods::MethodCall::sudo_network_unstable_watch { .. } |
473 | | | methods::MethodCall::chainHead_v1_follow { .. } => { |
474 | | // Subscription starting requests. |
475 | | |
476 | | // We must check the maximum number of subscriptions. |
477 | 0 | let max_subscriptions = |
478 | 0 | usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX); |
479 | 0 | debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions); |
480 | 0 | if self.inner.active_subscriptions.len() >= max_subscriptions { |
481 | 0 | let response = parse::build_error_response( |
482 | 0 | request_id, |
483 | 0 | ErrorResponse::ServerError(-32000, "Too many active subscriptions"), |
484 | 0 | None, |
485 | | ); |
486 | 0 | let mut responses_queue = |
487 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
488 | 0 | let pos = responses_queue |
489 | 0 | .pending_serialized_responses |
490 | 0 | .insert((response, true)); |
491 | 0 | responses_queue |
492 | 0 | .pending_serialized_responses_queue |
493 | 0 | .push_back(pos); |
494 | 0 | self.inner |
495 | 0 | .serialized_io |
496 | 0 | .on_response_pushed_or_task_destroyed |
497 | 0 | .notify(usize::MAX); |
498 | 0 | continue; |
499 | 0 | } |
500 | | |
501 | | // Allocate the new subscription ID. |
502 | 0 | let subscription_id = self.allocate_subscription_id(); |
503 | 0 | debug_assert!( |
504 | 0 | !self |
505 | 0 | .inner |
506 | 0 | .active_subscriptions |
507 | 0 | .contains_key(&subscription_id) |
508 | | ); |
509 | | |
510 | | // Insert an "kill channel" in the local state. This kill channel is shared |
511 | | // with the subscription object and is used to notify when a subscription |
512 | | // should be killed. |
513 | 0 | let kill_channel = Arc::new(SubscriptionKillChannel { |
514 | 0 | dead: AtomicBool::new(false), |
515 | 0 | on_dead_changed: event_listener::Event::new(), |
516 | 0 | }); |
517 | 0 | self.inner.active_subscriptions.insert( |
518 | 0 | subscription_id.clone(), |
519 | 0 | InnerSubscription { |
520 | 0 | kill_channel: kill_channel.clone(), |
521 | 0 | unsubscribe_response: None, |
522 | 0 | }, |
523 | | ); |
524 | | |
525 | 0 | return Event::HandleSubscriptionStart { |
526 | 0 | subscription_start: SubscriptionStartProcess { |
527 | 0 | responses_notifications_queue: self |
528 | 0 | .inner |
529 | 0 | .responses_notifications_queue |
530 | 0 | .clone(), |
531 | 0 | request: new_request, |
532 | 0 | kill_channel, |
533 | 0 | subscription_id, |
534 | 0 | has_sent_response: false, |
535 | 0 | }, |
536 | 0 | task: self, |
537 | 0 | }; |
538 | | } |
539 | | |
540 | 0 | methods::MethodCall::author_unwatchExtrinsic { subscription, .. } |
541 | 0 | | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. } |
542 | 0 | | methods::MethodCall::state_unsubscribeStorage { subscription, .. } |
543 | | | methods::MethodCall::transaction_v1_stop { |
544 | 0 | operation_id: subscription, |
545 | | } |
546 | 0 | | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. } |
547 | 0 | | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. } |
548 | | | methods::MethodCall::chainHead_v1_unfollow { |
549 | 0 | follow_subscription: subscription, |
550 | | .. |
551 | | } => { |
552 | | // TODO: must check whether type of subscription matches |
553 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { |
554 | | Some(InnerSubscription { |
555 | 0 | kill_channel, |
556 | 0 | unsubscribe_response, |
557 | 0 | }) if unsubscribe_response.is_none() => { |
558 | 0 | *unsubscribe_response = Some( |
559 | 0 | match parsed_request { |
560 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { |
561 | 0 | methods::Response::author_unwatchExtrinsic(true) |
562 | | } |
563 | | methods::MethodCall::state_unsubscribeRuntimeVersion { |
564 | | .. |
565 | 0 | } => methods::Response::state_unsubscribeRuntimeVersion(true), |
566 | | methods::MethodCall::state_unsubscribeStorage { .. } => { |
567 | 0 | methods::Response::state_unsubscribeStorage(true) |
568 | | } |
569 | | methods::MethodCall::transaction_v1_stop { .. } => { |
570 | 0 | methods::Response::transaction_v1_stop(()) |
571 | | } |
572 | | methods::MethodCall::transactionWatch_v1_unwatch { .. } => { |
573 | 0 | methods::Response::transactionWatch_v1_unwatch(()) |
574 | | } |
575 | | methods::MethodCall::sudo_network_unstable_unwatch { |
576 | | .. |
577 | 0 | } => methods::Response::sudo_network_unstable_unwatch(()), |
578 | | methods::MethodCall::chainHead_v1_unfollow { .. } => { |
579 | 0 | methods::Response::chainHead_v1_unfollow(()) |
580 | | } |
581 | 0 | _ => unreachable!(), |
582 | | } |
583 | 0 | .to_json_response(request_id), |
584 | | ); |
585 | | |
586 | 0 | kill_channel.dead.store(true, Ordering::Release); |
587 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
588 | | } |
589 | | _ => { |
590 | 0 | let response = match parsed_request { |
591 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { |
592 | 0 | methods::Response::author_unwatchExtrinsic(false) |
593 | 0 | .to_json_response(request_id) |
594 | | } |
595 | | methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => { |
596 | 0 | methods::Response::state_unsubscribeRuntimeVersion(false) |
597 | 0 | .to_json_response(request_id) |
598 | | } |
599 | | methods::MethodCall::state_unsubscribeStorage { .. } => { |
600 | 0 | methods::Response::state_unsubscribeStorage(false) |
601 | 0 | .to_json_response(request_id) |
602 | | } |
603 | 0 | _ => parse::build_error_response( |
604 | 0 | request_id, |
605 | 0 | ErrorResponse::InvalidParams, |
606 | 0 | None, |
607 | | ), |
608 | | }; |
609 | | |
610 | 0 | let mut responses_queue = |
611 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
612 | 0 | let pos = responses_queue |
613 | 0 | .pending_serialized_responses |
614 | 0 | .insert((response, true)); |
615 | 0 | responses_queue |
616 | 0 | .pending_serialized_responses_queue |
617 | 0 | .push_back(pos); |
618 | 0 | self.inner |
619 | 0 | .serialized_io |
620 | 0 | .on_response_pushed_or_task_destroyed |
621 | 0 | .notify(usize::MAX); |
622 | | } |
623 | | } |
624 | | } |
625 | 0 | methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. } |
626 | 0 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. } |
627 | 0 | | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => { |
628 | | // TODO: DRY with above |
629 | | // TODO: must check whether type of subscription matches |
630 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { |
631 | | Some(InnerSubscription { |
632 | 0 | unsubscribe_response, |
633 | 0 | kill_channel, |
634 | 0 | }) if unsubscribe_response.is_none() => { |
635 | 0 | *unsubscribe_response = Some(match parsed_request { |
636 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { |
637 | 0 | methods::Response::chain_unsubscribeAllHeads(true) |
638 | 0 | .to_json_response(request_id) |
639 | | } |
640 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { |
641 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(true) |
642 | 0 | .to_json_response(request_id) |
643 | | } |
644 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { |
645 | 0 | methods::Response::chain_unsubscribeNewHeads(true) |
646 | 0 | .to_json_response(request_id) |
647 | | } |
648 | 0 | _ => unreachable!(), |
649 | | }); |
650 | | |
651 | 0 | kill_channel.dead.store(true, Ordering::Release); |
652 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
653 | | } |
654 | | _ => { |
655 | 0 | let response = match parsed_request { |
656 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { |
657 | 0 | methods::Response::chain_unsubscribeAllHeads(false) |
658 | 0 | .to_json_response(request_id) |
659 | | } |
660 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { |
661 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(false) |
662 | 0 | .to_json_response(request_id) |
663 | | } |
664 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { |
665 | 0 | methods::Response::chain_unsubscribeNewHeads(false) |
666 | 0 | .to_json_response(request_id) |
667 | | } |
668 | 0 | _ => unreachable!(), |
669 | | }; |
670 | | |
671 | 0 | let mut responses_queue = |
672 | 0 | self.inner.serialized_io.responses_queue.lock().await; |
673 | 0 | let pos = responses_queue |
674 | 0 | .pending_serialized_responses |
675 | 0 | .insert((response, true)); |
676 | 0 | responses_queue |
677 | 0 | .pending_serialized_responses_queue |
678 | 0 | .push_back(pos); |
679 | 0 | self.inner |
680 | 0 | .serialized_io |
681 | 0 | .on_response_pushed_or_task_destroyed |
682 | 0 | .notify(usize::MAX); |
683 | | } |
684 | | } |
685 | | } |
686 | | } |
687 | | } |
688 | 44 | } Unexecuted instantiation: _RNCNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_ _RNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0CsfFWJyR6nd6r_17smoldot_full_node Line | Count | Source | 216 | 44 | pub async fn run_until_event(mut self) -> Event { | 217 | | loop { | 218 | | enum WakeUpReason { | 219 | | NewRequest(String), | 220 | | Message(ToMainTask), | 221 | | } | 222 | | | 223 | 48 | let wake_up_reason = { | 224 | 69 | let serialized_requests_io_destroyed = async { | 225 | | (&mut self.inner.on_serialized_requests_io_destroyed).await; | 226 | | Err(()) | 227 | | }; | 228 | | | 229 | 69 | let next_serialized_request = async { | 230 | | let mut wait = None; | 231 | | loop { | 232 | | if let Some(elem) = self.inner.serialized_io.requests_queue.pop() { | 233 | | self.inner | 234 | | .serialized_io | 235 | | .on_request_pulled_or_task_destroyed | 236 | | .notify(usize::MAX); | 237 | | break Ok(WakeUpReason::NewRequest(elem)); | 238 | | } | 239 | | if let Some(wait) = wait.take() { | 240 | | wait.await | 241 | | } else { | 242 | | wait = Some(self.inner.serialized_io.on_request_pushed.listen()); | 243 | | } | 244 | | } | 245 | | }; | 246 | | | 247 | 69 | let response_notif = async { | 248 | | let mut wait = None; | 249 | | loop { | 250 | | if let Some(elem) = self.inner.responses_notifications_queue.queue.pop() { | 251 | | break Ok(WakeUpReason::Message(elem)); | 252 | | } | 253 | | if let Some(wait) = wait.take() { | 254 | | wait.await | 255 | | } else { | 256 | | wait = | 257 | | Some(self.inner.responses_notifications_queue.on_pushed.listen()); | 258 | | } | 259 | | } | 260 | | }; | 261 | | | 262 | 69 | match serialized_requests_io_destroyed | 263 | 69 | .or(next_serialized_request) | 264 | 69 | .or(response_notif) | 265 | 69 | .await | 266 | | { | 267 | 48 | Ok(wake_up_reason) => wake_up_reason, | 268 | 21 | Err(()) => return Event::SerializedRequestsIoClosed, | 269 | | } | 270 | | }; | 271 | | | 272 | | // Immediately handle every event apart from `NewRequest`. | 273 | 25 | let new_request = match wake_up_reason23 { | 274 | 25 | WakeUpReason::NewRequest(request) => request, | 275 | 0 | WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => { | 276 | | let InnerSubscription { | 277 | 0 | unsubscribe_response, | 278 | | .. | 279 | 0 | } = self | 280 | 0 | .inner | 281 | 0 | .active_subscriptions | 282 | 0 | .remove(&subscription_id) | 283 | 0 | .unwrap(); | 284 | | // TODO: post a `stop`/`error` event for chainhead subscriptions | 285 | 0 | if let Some(unsubscribe_response) = unsubscribe_response { | 286 | 0 | let mut responses_queue = | 287 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 288 | 0 | let pos = responses_queue | 289 | 0 | .pending_serialized_responses | 290 | 0 | .insert((unsubscribe_response, true)); | 291 | 0 | responses_queue | 292 | 0 | .pending_serialized_responses_queue | 293 | 0 | .push_back(pos); | 294 | 0 | self.inner | 295 | 0 | .serialized_io | 296 | 0 | .on_response_pushed_or_task_destroyed | 297 | 0 | .notify(usize::MAX); | 298 | 0 | } | 299 | | | 300 | | // Shrink the list of active subscriptions if necessary. | 301 | 0 | if self.inner.active_subscriptions.capacity() | 302 | 0 | >= 2 * self.inner.active_subscriptions.len() + 16 | 303 | 0 | { | 304 | 0 | self.inner.active_subscriptions.shrink_to_fit(); | 305 | 0 | } | 306 | | | 307 | 0 | return Event::SubscriptionDestroyed { | 308 | 0 | task: self, | 309 | 0 | subscription_id, | 310 | 0 | }; | 311 | | } | 312 | 23 | WakeUpReason::Message(ToMainTask::RequestResponse(response)) => { | 313 | 23 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; | 314 | 23 | let pos = responses_queue | 315 | 23 | .pending_serialized_responses | 316 | 23 | .insert((response, true)); | 317 | 23 | responses_queue | 318 | 23 | .pending_serialized_responses_queue | 319 | 23 | .push_back(pos); | 320 | 23 | self.inner | 321 | 23 | .serialized_io | 322 | 23 | .on_response_pushed_or_task_destroyed | 323 | 23 | .notify(usize::MAX); | 324 | 23 | continue; | 325 | | } | 326 | 0 | WakeUpReason::Message(ToMainTask::Notification(notification)) => { | 327 | | // TODO: filter out redundant notifications, as it's the entire point of this module | 328 | 0 | let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await; | 329 | 0 | let pos = responses_queue | 330 | 0 | .pending_serialized_responses | 331 | 0 | .insert((notification, false)); | 332 | 0 | responses_queue | 333 | 0 | .pending_serialized_responses_queue | 334 | 0 | .push_back(pos); | 335 | 0 | self.inner | 336 | 0 | .serialized_io | 337 | 0 | .on_response_pushed_or_task_destroyed | 338 | 0 | .notify(usize::MAX); | 339 | 0 | continue; | 340 | | } | 341 | | }; | 342 | | | 343 | 23 | let (request_id, parsed_request) = | 344 | 25 | match methods::parse_jsonrpc_client_to_server(&new_request) { | 345 | 23 | Ok((request_id, method)) => (request_id, method), | 346 | 1 | Err(methods::ParseClientToServerError::Method { request_id, error }) => { | 347 | 1 | let response = error.to_json_error(request_id); | 348 | 1 | let mut responses_queue = | 349 | 1 | self.inner.serialized_io.responses_queue.lock().await; | 350 | 1 | let pos = responses_queue | 351 | 1 | .pending_serialized_responses | 352 | 1 | .insert((response, true)); | 353 | 1 | responses_queue | 354 | 1 | .pending_serialized_responses_queue | 355 | 1 | .push_back(pos); | 356 | 1 | self.inner | 357 | 1 | .serialized_io | 358 | 1 | .on_response_pushed_or_task_destroyed | 359 | 1 | .notify(usize::MAX); | 360 | 1 | continue; | 361 | | } | 362 | 0 | Err(methods::ParseClientToServerError::UnknownNotification { .. }) => continue, | 363 | | Err(methods::ParseClientToServerError::JsonRpcParse(_)) => { | 364 | 1 | let response = parse::build_parse_error_response(); | 365 | 1 | let mut responses_queue = | 366 | 1 | self.inner.serialized_io.responses_queue.lock().await; | 367 | 1 | let pos = responses_queue | 368 | 1 | .pending_serialized_responses | 369 | 1 | .insert((response, true)); | 370 | 1 | responses_queue | 371 | 1 | .pending_serialized_responses_queue | 372 | 1 | .push_back(pos); | 373 | 1 | self.inner | 374 | 1 | .serialized_io | 375 | 1 | .on_response_pushed_or_task_destroyed | 376 | 1 | .notify(usize::MAX); | 377 | 1 | continue; | 378 | | } | 379 | | }; | 380 | | | 381 | | // There exists three types of requests: | 382 | | // | 383 | | // - Requests that follow a simple one-request-one-response schema. | 384 | | // - Requests that, if accepted, start a subscription. | 385 | | // - Requests that unsubscribe from a subscription. | 386 | | // | 387 | 23 | match &parsed_request { | 388 | | methods::MethodCall::account_nextIndex { .. } | 389 | | | methods::MethodCall::author_hasKey { .. } | 390 | | | methods::MethodCall::author_hasSessionKeys { .. } | 391 | | | methods::MethodCall::author_insertKey { .. } | 392 | | | methods::MethodCall::author_pendingExtrinsics { .. } | 393 | | | methods::MethodCall::author_removeExtrinsic { .. } | 394 | | | methods::MethodCall::author_rotateKeys { .. } | 395 | | | methods::MethodCall::author_submitExtrinsic { .. } | 396 | | | methods::MethodCall::babe_epochAuthorship { .. } | 397 | | | methods::MethodCall::chain_getBlock { .. } | 398 | | | methods::MethodCall::chain_getBlockHash { .. } | 399 | | | methods::MethodCall::chain_getFinalizedHead { .. } | 400 | | | methods::MethodCall::chain_getHeader { .. } | 401 | | | methods::MethodCall::childstate_getKeys { .. } | 402 | | | methods::MethodCall::childstate_getStorage { .. } | 403 | | | methods::MethodCall::childstate_getStorageHash { .. } | 404 | | | methods::MethodCall::childstate_getStorageSize { .. } | 405 | | | methods::MethodCall::grandpa_roundState { .. } | 406 | | | methods::MethodCall::offchain_localStorageGet { .. } | 407 | | | methods::MethodCall::offchain_localStorageSet { .. } | 408 | | | methods::MethodCall::payment_queryInfo { .. } | 409 | | | methods::MethodCall::state_call { .. } | 410 | | | methods::MethodCall::state_getKeys { .. } | 411 | | | methods::MethodCall::state_getKeysPaged { .. } | 412 | | | methods::MethodCall::state_getMetadata { .. } | 413 | | | methods::MethodCall::state_getPairs { .. } | 414 | | | methods::MethodCall::state_getReadProof { .. } | 415 | | | methods::MethodCall::state_getRuntimeVersion { .. } | 416 | | | methods::MethodCall::state_getStorage { .. } | 417 | | | methods::MethodCall::state_getStorageHash { .. } | 418 | | | methods::MethodCall::state_getStorageSize { .. } | 419 | | | methods::MethodCall::state_queryStorage { .. } | 420 | | | methods::MethodCall::state_queryStorageAt { .. } | 421 | | | methods::MethodCall::system_accountNextIndex { .. } | 422 | | | methods::MethodCall::system_addReservedPeer { .. } | 423 | | | methods::MethodCall::system_chain { .. } | 424 | | | methods::MethodCall::system_chainType { .. } | 425 | | | methods::MethodCall::system_dryRun { .. } | 426 | | | methods::MethodCall::system_health { .. } | 427 | | | methods::MethodCall::system_localListenAddresses { .. } | 428 | | | methods::MethodCall::system_localPeerId { .. } | 429 | | | methods::MethodCall::system_name { .. } | 430 | | | methods::MethodCall::system_networkState { .. } | 431 | | | methods::MethodCall::system_nodeRoles { .. } | 432 | | | methods::MethodCall::system_peers { .. } | 433 | | | methods::MethodCall::system_properties { .. } | 434 | | | methods::MethodCall::system_removeReservedPeer { .. } | 435 | | | methods::MethodCall::system_version { .. } | 436 | | | methods::MethodCall::chainSpec_v1_chainName { .. } | 437 | | | methods::MethodCall::chainSpec_v1_genesisHash { .. } | 438 | | | methods::MethodCall::chainSpec_v1_properties { .. } | 439 | | | methods::MethodCall::rpc_methods { .. } | 440 | | | methods::MethodCall::sudo_unstable_p2pDiscover { .. } | 441 | | | methods::MethodCall::sudo_unstable_version { .. } | 442 | | | methods::MethodCall::chainHead_v1_body { .. } | 443 | | | methods::MethodCall::chainHead_v1_call { .. } | 444 | | | methods::MethodCall::chainHead_v1_continue { .. } | 445 | | | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. } | 446 | | | methods::MethodCall::chainHead_v1_header { .. } | 447 | | | methods::MethodCall::chainHead_v1_stopOperation { .. } | 448 | | | methods::MethodCall::chainHead_v1_storage { .. } | 449 | | | methods::MethodCall::chainHead_v1_unpin { .. } => { | 450 | | // Simple one-request-one-response. | 451 | 23 | return Event::HandleRequest { | 452 | 23 | request_process: RequestProcess { | 453 | 23 | responses_notifications_queue: self | 454 | 23 | .inner | 455 | 23 | .responses_notifications_queue | 456 | 23 | .clone(), | 457 | 23 | request: new_request, | 458 | 23 | has_sent_response: false, | 459 | 23 | }, | 460 | 23 | task: self, | 461 | 23 | }; | 462 | | } | 463 | | | 464 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } | 465 | | | methods::MethodCall::chain_subscribeAllHeads { .. } | 466 | | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } | 467 | | | methods::MethodCall::chain_subscribeNewHeads { .. } | 468 | | | methods::MethodCall::state_subscribeRuntimeVersion { .. } | 469 | | | methods::MethodCall::state_subscribeStorage { .. } | 470 | | | methods::MethodCall::transaction_v1_broadcast { .. } | 471 | | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } | 472 | | | methods::MethodCall::sudo_network_unstable_watch { .. } | 473 | | | methods::MethodCall::chainHead_v1_follow { .. } => { | 474 | | // Subscription starting requests. | 475 | | | 476 | | // We must check the maximum number of subscriptions. | 477 | 0 | let max_subscriptions = | 478 | 0 | usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX); | 479 | 0 | debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions); | 480 | 0 | if self.inner.active_subscriptions.len() >= max_subscriptions { | 481 | 0 | let response = parse::build_error_response( | 482 | 0 | request_id, | 483 | 0 | ErrorResponse::ServerError(-32000, "Too many active subscriptions"), | 484 | 0 | None, | 485 | | ); | 486 | 0 | let mut responses_queue = | 487 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 488 | 0 | let pos = responses_queue | 489 | 0 | .pending_serialized_responses | 490 | 0 | .insert((response, true)); | 491 | 0 | responses_queue | 492 | 0 | .pending_serialized_responses_queue | 493 | 0 | .push_back(pos); | 494 | 0 | self.inner | 495 | 0 | .serialized_io | 496 | 0 | .on_response_pushed_or_task_destroyed | 497 | 0 | .notify(usize::MAX); | 498 | 0 | continue; | 499 | 0 | } | 500 | | | 501 | | // Allocate the new subscription ID. | 502 | 0 | let subscription_id = self.allocate_subscription_id(); | 503 | 0 | debug_assert!( | 504 | 0 | !self | 505 | 0 | .inner | 506 | 0 | .active_subscriptions | 507 | 0 | .contains_key(&subscription_id) | 508 | | ); | 509 | | | 510 | | // Insert an "kill channel" in the local state. This kill channel is shared | 511 | | // with the subscription object and is used to notify when a subscription | 512 | | // should be killed. | 513 | 0 | let kill_channel = Arc::new(SubscriptionKillChannel { | 514 | 0 | dead: AtomicBool::new(false), | 515 | 0 | on_dead_changed: event_listener::Event::new(), | 516 | 0 | }); | 517 | 0 | self.inner.active_subscriptions.insert( | 518 | 0 | subscription_id.clone(), | 519 | 0 | InnerSubscription { | 520 | 0 | kill_channel: kill_channel.clone(), | 521 | 0 | unsubscribe_response: None, | 522 | 0 | }, | 523 | | ); | 524 | | | 525 | 0 | return Event::HandleSubscriptionStart { | 526 | 0 | subscription_start: SubscriptionStartProcess { | 527 | 0 | responses_notifications_queue: self | 528 | 0 | .inner | 529 | 0 | .responses_notifications_queue | 530 | 0 | .clone(), | 531 | 0 | request: new_request, | 532 | 0 | kill_channel, | 533 | 0 | subscription_id, | 534 | 0 | has_sent_response: false, | 535 | 0 | }, | 536 | 0 | task: self, | 537 | 0 | }; | 538 | | } | 539 | | | 540 | 0 | methods::MethodCall::author_unwatchExtrinsic { subscription, .. } | 541 | 0 | | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. } | 542 | 0 | | methods::MethodCall::state_unsubscribeStorage { subscription, .. } | 543 | | | methods::MethodCall::transaction_v1_stop { | 544 | 0 | operation_id: subscription, | 545 | | } | 546 | 0 | | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. } | 547 | 0 | | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. } | 548 | | | methods::MethodCall::chainHead_v1_unfollow { | 549 | 0 | follow_subscription: subscription, | 550 | | .. | 551 | | } => { | 552 | | // TODO: must check whether type of subscription matches | 553 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { | 554 | | Some(InnerSubscription { | 555 | 0 | kill_channel, | 556 | 0 | unsubscribe_response, | 557 | 0 | }) if unsubscribe_response.is_none() => { | 558 | 0 | *unsubscribe_response = Some( | 559 | 0 | match parsed_request { | 560 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { | 561 | 0 | methods::Response::author_unwatchExtrinsic(true) | 562 | | } | 563 | | methods::MethodCall::state_unsubscribeRuntimeVersion { | 564 | | .. | 565 | 0 | } => methods::Response::state_unsubscribeRuntimeVersion(true), | 566 | | methods::MethodCall::state_unsubscribeStorage { .. } => { | 567 | 0 | methods::Response::state_unsubscribeStorage(true) | 568 | | } | 569 | | methods::MethodCall::transaction_v1_stop { .. } => { | 570 | 0 | methods::Response::transaction_v1_stop(()) | 571 | | } | 572 | | methods::MethodCall::transactionWatch_v1_unwatch { .. } => { | 573 | 0 | methods::Response::transactionWatch_v1_unwatch(()) | 574 | | } | 575 | | methods::MethodCall::sudo_network_unstable_unwatch { | 576 | | .. | 577 | 0 | } => methods::Response::sudo_network_unstable_unwatch(()), | 578 | | methods::MethodCall::chainHead_v1_unfollow { .. } => { | 579 | 0 | methods::Response::chainHead_v1_unfollow(()) | 580 | | } | 581 | 0 | _ => unreachable!(), | 582 | | } | 583 | 0 | .to_json_response(request_id), | 584 | | ); | 585 | | | 586 | 0 | kill_channel.dead.store(true, Ordering::Release); | 587 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 588 | | } | 589 | | _ => { | 590 | 0 | let response = match parsed_request { | 591 | | methods::MethodCall::author_unwatchExtrinsic { .. } => { | 592 | 0 | methods::Response::author_unwatchExtrinsic(false) | 593 | 0 | .to_json_response(request_id) | 594 | | } | 595 | | methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => { | 596 | 0 | methods::Response::state_unsubscribeRuntimeVersion(false) | 597 | 0 | .to_json_response(request_id) | 598 | | } | 599 | | methods::MethodCall::state_unsubscribeStorage { .. } => { | 600 | 0 | methods::Response::state_unsubscribeStorage(false) | 601 | 0 | .to_json_response(request_id) | 602 | | } | 603 | 0 | _ => parse::build_error_response( | 604 | 0 | request_id, | 605 | 0 | ErrorResponse::InvalidParams, | 606 | 0 | None, | 607 | | ), | 608 | | }; | 609 | | | 610 | 0 | let mut responses_queue = | 611 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 612 | 0 | let pos = responses_queue | 613 | 0 | .pending_serialized_responses | 614 | 0 | .insert((response, true)); | 615 | 0 | responses_queue | 616 | 0 | .pending_serialized_responses_queue | 617 | 0 | .push_back(pos); | 618 | 0 | self.inner | 619 | 0 | .serialized_io | 620 | 0 | .on_response_pushed_or_task_destroyed | 621 | 0 | .notify(usize::MAX); | 622 | | } | 623 | | } | 624 | | } | 625 | 0 | methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. } | 626 | 0 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. } | 627 | 0 | | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => { | 628 | | // TODO: DRY with above | 629 | | // TODO: must check whether type of subscription matches | 630 | 0 | match self.inner.active_subscriptions.get_mut(&**subscription) { | 631 | | Some(InnerSubscription { | 632 | 0 | unsubscribe_response, | 633 | 0 | kill_channel, | 634 | 0 | }) if unsubscribe_response.is_none() => { | 635 | 0 | *unsubscribe_response = Some(match parsed_request { | 636 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { | 637 | 0 | methods::Response::chain_unsubscribeAllHeads(true) | 638 | 0 | .to_json_response(request_id) | 639 | | } | 640 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { | 641 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(true) | 642 | 0 | .to_json_response(request_id) | 643 | | } | 644 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { | 645 | 0 | methods::Response::chain_unsubscribeNewHeads(true) | 646 | 0 | .to_json_response(request_id) | 647 | | } | 648 | 0 | _ => unreachable!(), | 649 | | }); | 650 | | | 651 | 0 | kill_channel.dead.store(true, Ordering::Release); | 652 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 653 | | } | 654 | | _ => { | 655 | 0 | let response = match parsed_request { | 656 | | methods::MethodCall::chain_unsubscribeAllHeads { .. } => { | 657 | 0 | methods::Response::chain_unsubscribeAllHeads(false) | 658 | 0 | .to_json_response(request_id) | 659 | | } | 660 | | methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => { | 661 | 0 | methods::Response::chain_unsubscribeFinalizedHeads(false) | 662 | 0 | .to_json_response(request_id) | 663 | | } | 664 | | methods::MethodCall::chain_unsubscribeNewHeads { .. } => { | 665 | 0 | methods::Response::chain_unsubscribeNewHeads(false) | 666 | 0 | .to_json_response(request_id) | 667 | | } | 668 | 0 | _ => unreachable!(), | 669 | | }; | 670 | | | 671 | 0 | let mut responses_queue = | 672 | 0 | self.inner.serialized_io.responses_queue.lock().await; | 673 | 0 | let pos = responses_queue | 674 | 0 | .pending_serialized_responses | 675 | 0 | .insert((response, true)); | 676 | 0 | responses_queue | 677 | 0 | .pending_serialized_responses_queue | 678 | 0 | .push_back(pos); | 679 | 0 | self.inner | 680 | 0 | .serialized_io | 681 | 0 | .on_response_pushed_or_task_destroyed | 682 | 0 | .notify(usize::MAX); | 683 | | } | 684 | | } | 685 | | } | 686 | | } | 687 | | } | 688 | 44 | } |
Unexecuted instantiation: _RNCNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_ |
689 | | |
690 | 0 | fn allocate_subscription_id(&mut self) -> String { |
691 | 0 | let subscription_id = self.inner.next_subscription_id.to_string(); |
692 | 0 | self.inner.next_subscription_id += 1; |
693 | 0 | subscription_id |
694 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id Unexecuted instantiation: _RNvMNtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id |
695 | | } |
696 | | |
697 | | impl fmt::Debug for ClientMainTask { |
698 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
699 | 0 | f.debug_tuple("ClientMainTask").finish() |
700 | 0 | } Unexecuted instantiation: _RNvXs_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt |
701 | | } |
702 | | |
703 | | impl Drop for ClientMainTask { |
704 | 21 | fn drop(&mut self) { |
705 | | // Notify the `SerializedRequestsIo`. |
706 | 21 | self.inner |
707 | 21 | .serialized_io |
708 | 21 | .on_response_pushed_or_task_destroyed |
709 | 21 | .notify(usize::MAX); |
710 | 21 | self.inner |
711 | 21 | .serialized_io |
712 | 21 | .on_request_pulled_or_task_destroyed |
713 | 21 | .notify(usize::MAX); |
714 | | |
715 | | // Mark all active subscriptions as dead. |
716 | 21 | for (_, InnerSubscription { kill_channel0 , .. }) in self.inner.active_subscriptions.drain() { |
717 | 0 | kill_channel.dead.store(true, Ordering::Release); |
718 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); |
719 | 0 | } |
720 | 21 | } Unexecuted instantiation: _RNvXs0_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop _RNvXs0_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop Line | Count | Source | 704 | 21 | fn drop(&mut self) { | 705 | | // Notify the `SerializedRequestsIo`. | 706 | 21 | self.inner | 707 | 21 | .serialized_io | 708 | 21 | .on_response_pushed_or_task_destroyed | 709 | 21 | .notify(usize::MAX); | 710 | 21 | self.inner | 711 | 21 | .serialized_io | 712 | 21 | .on_request_pulled_or_task_destroyed | 713 | 21 | .notify(usize::MAX); | 714 | | | 715 | | // Mark all active subscriptions as dead. | 716 | 21 | for (_, InnerSubscription { kill_channel0 , .. }) in self.inner.active_subscriptions.drain() { | 717 | 0 | kill_channel.dead.store(true, Ordering::Release); | 718 | 0 | kill_channel.on_dead_changed.notify(usize::MAX); | 719 | 0 | } | 720 | 21 | } |
|
721 | | } |
722 | | |
723 | | /// Outcome of the processing of [`ClientMainTask::run_until_event`]. |
724 | | #[derive(Debug)] |
725 | | pub enum Event { |
726 | | /// JSON-RPC client has sent a plain request (i.e. that isn't related to subscriptions). |
727 | | HandleRequest { |
728 | | /// The task that generated the event. |
729 | | task: ClientMainTask, |
730 | | /// Object connected to the [`ClientMainTask`] and containing the information about the |
731 | | /// request to process. |
732 | | request_process: RequestProcess, |
733 | | }, |
734 | | |
735 | | /// JSON-RPC client desires starting a new subscription. |
736 | | /// |
737 | | /// Note that the [`ClientMainTask`] automatically enforces a limit to the maximum number of |
738 | | /// subscriptions. If this event is generated, this check has already passed. |
739 | | HandleSubscriptionStart { |
740 | | /// The task that generated the event. |
741 | | task: ClientMainTask, |
742 | | /// Object connected to the [`ClientMainTask`] and containing the information about the |
743 | | /// request to process. |
744 | | subscription_start: SubscriptionStartProcess, |
745 | | }, |
746 | | |
747 | | /// A [`SubscriptionStartProcess`] object or a [`Subscription`] object has been destroyed. |
748 | | SubscriptionDestroyed { |
749 | | /// The task that generated the event. |
750 | | task: ClientMainTask, |
751 | | /// Id of the subscription that was destroyed. Equals to the value that |
752 | | /// [`Subscription::subscription_id`] would have returned for the now-dead subscription. |
753 | | subscription_id: String, |
754 | | }, |
755 | | |
756 | | /// The [`SerializedRequestsIo`] has been dropped. The [`ClientMainTask`] has been destroyed. |
757 | | SerializedRequestsIoClosed, |
758 | | } |
759 | | |
760 | | /// Object connected to the [`ClientMainTask`] that allows sending requests to the task and |
761 | | /// receiving responses. |
762 | | pub struct SerializedRequestsIo { |
763 | | serialized_io: Weak<SerializedIo>, |
764 | | |
765 | | /// Event notified after the [`SerializedRequestsIo`] is destroyed. |
766 | | on_serialized_requests_io_destroyed: event_listener::Event, |
767 | | } |
768 | | |
769 | | impl SerializedRequestsIo { |
770 | | /// Waits for a response or a notification to send to the JSON-RPC client to be available, |
771 | | /// and returns it. |
772 | | /// |
773 | | /// Returns `None` if the [`ClientMainTask`] has been destroyed. |
774 | | /// |
775 | | /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to |
776 | | /// > this function, otherwise it might never return. |
777 | 25 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { Unexecuted instantiation: _RNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response _RNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response Line | Count | Source | 777 | 25 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { |
|
778 | 25 | let mut wait = None; |
779 | | |
780 | | loop { |
781 | 67 | let Some(queue) = self.serialized_io.upgrade() else { |
782 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); |
783 | | }; |
784 | | |
785 | | // Lock the responses queue. |
786 | | { |
787 | 67 | let mut responses_queue = queue.responses_queue.lock().await; |
788 | | |
789 | 67 | if let Some(response_index25 ) = responses_queue |
790 | 67 | .pending_serialized_responses_queue |
791 | 67 | .pop_front() |
792 | | { |
793 | 25 | let (response_or_notif, is_response) = responses_queue |
794 | 25 | .pending_serialized_responses |
795 | 25 | .remove(response_index); |
796 | | |
797 | 25 | if is_response { |
798 | 25 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); |
799 | 25 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. |
800 | 0 | } |
801 | | |
802 | | // Shrink containers if necessary in order to reduce memory usage after a |
803 | | // burst of requests. |
804 | 25 | if responses_queue.pending_serialized_responses.capacity() |
805 | 25 | > responses_queue |
806 | 25 | .pending_serialized_responses |
807 | 25 | .len() |
808 | 25 | .saturating_mul(4) |
809 | 25 | { |
810 | 25 | responses_queue.pending_serialized_responses.shrink_to_fit(); |
811 | 25 | }0 |
812 | 25 | if responses_queue |
813 | 25 | .pending_serialized_responses_queue |
814 | 25 | .capacity() |
815 | 25 | > responses_queue |
816 | 25 | .pending_serialized_responses_queue |
817 | 25 | .len() |
818 | 25 | .saturating_mul(4) |
819 | 25 | { |
820 | 25 | responses_queue |
821 | 25 | .pending_serialized_responses_queue |
822 | 25 | .shrink_to_fit(); |
823 | 25 | }0 |
824 | | |
825 | 25 | return Ok(response_or_notif); |
826 | 42 | } |
827 | | } |
828 | | |
829 | 42 | if let Some(wait21 ) = wait.take() { |
830 | 21 | wait.await |
831 | 21 | } else { |
832 | 21 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); |
833 | 21 | } |
834 | | } |
835 | 25 | } Unexecuted instantiation: _RNCNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_ Unexecuted instantiation: _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CscoAnRPySggw_6author Unexecuted instantiation: _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsfFWJyR6nd6r_17smoldot_full_node Unexecuted instantiation: _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_ _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsjyNE3yDMkgA_14json_rpc_basic Line | Count | Source | 777 | 2 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { | 778 | 2 | let mut wait = None; | 779 | | | 780 | | loop { | 781 | 6 | let Some(queue) = self.serialized_io.upgrade() else { | 782 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); | 783 | | }; | 784 | | | 785 | | // Lock the responses queue. | 786 | | { | 787 | 6 | let mut responses_queue = queue.responses_queue.lock().await; | 788 | | | 789 | 6 | if let Some(response_index2 ) = responses_queue | 790 | 6 | .pending_serialized_responses_queue | 791 | 6 | .pop_front() | 792 | | { | 793 | 2 | let (response_or_notif, is_response) = responses_queue | 794 | 2 | .pending_serialized_responses | 795 | 2 | .remove(response_index); | 796 | | | 797 | 2 | if is_response { | 798 | 2 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); | 799 | 2 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. | 800 | 0 | } | 801 | | | 802 | | // Shrink containers if necessary in order to reduce memory usage after a | 803 | | // burst of requests. | 804 | 2 | if responses_queue.pending_serialized_responses.capacity() | 805 | 2 | > responses_queue | 806 | 2 | .pending_serialized_responses | 807 | 2 | .len() | 808 | 2 | .saturating_mul(4) | 809 | 2 | { | 810 | 2 | responses_queue.pending_serialized_responses.shrink_to_fit(); | 811 | 2 | }0 | 812 | 2 | if responses_queue | 813 | 2 | .pending_serialized_responses_queue | 814 | 2 | .capacity() | 815 | 2 | > responses_queue | 816 | 2 | .pending_serialized_responses_queue | 817 | 2 | .len() | 818 | 2 | .saturating_mul(4) | 819 | 2 | { | 820 | 2 | responses_queue | 821 | 2 | .pending_serialized_responses_queue | 822 | 2 | .shrink_to_fit(); | 823 | 2 | }0 | 824 | | | 825 | 2 | return Ok(response_or_notif); | 826 | 4 | } | 827 | | } | 828 | | | 829 | 4 | if let Some(wait2 ) = wait.take() { | 830 | 2 | wait.await | 831 | 2 | } else { | 832 | 2 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); | 833 | 2 | } | 834 | | } | 835 | 2 | } |
_RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Cs4VrkfB1pvQ3_25json_rpc_general_requests Line | Count | Source | 777 | 23 | pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> { | 778 | 23 | let mut wait = None; | 779 | | | 780 | | loop { | 781 | 61 | let Some(queue) = self.serialized_io.upgrade() else { | 782 | 0 | return Err(WaitNextResponseError::ClientMainTaskDestroyed); | 783 | | }; | 784 | | | 785 | | // Lock the responses queue. | 786 | | { | 787 | 61 | let mut responses_queue = queue.responses_queue.lock().await; | 788 | | | 789 | 61 | if let Some(response_index23 ) = responses_queue | 790 | 61 | .pending_serialized_responses_queue | 791 | 61 | .pop_front() | 792 | | { | 793 | 23 | let (response_or_notif, is_response) = responses_queue | 794 | 23 | .pending_serialized_responses | 795 | 23 | .remove(response_index); | 796 | | | 797 | 23 | if is_response { | 798 | 23 | let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release); | 799 | 23 | debug_assert_ne!(_prev_val, u32::MAX); // Check underflows. | 800 | 0 | } | 801 | | | 802 | | // Shrink containers if necessary in order to reduce memory usage after a | 803 | | // burst of requests. | 804 | 23 | if responses_queue.pending_serialized_responses.capacity() | 805 | 23 | > responses_queue | 806 | 23 | .pending_serialized_responses | 807 | 23 | .len() | 808 | 23 | .saturating_mul(4) | 809 | 23 | { | 810 | 23 | responses_queue.pending_serialized_responses.shrink_to_fit(); | 811 | 23 | }0 | 812 | 23 | if responses_queue | 813 | 23 | .pending_serialized_responses_queue | 814 | 23 | .capacity() | 815 | 23 | > responses_queue | 816 | 23 | .pending_serialized_responses_queue | 817 | 23 | .len() | 818 | 23 | .saturating_mul(4) | 819 | 23 | { | 820 | 23 | responses_queue | 821 | 23 | .pending_serialized_responses_queue | 822 | 23 | .shrink_to_fit(); | 823 | 23 | }0 | 824 | | | 825 | 23 | return Ok(response_or_notif); | 826 | 38 | } | 827 | | } | 828 | | | 829 | 38 | if let Some(wait19 ) = wait.take() { | 830 | 19 | wait.await | 831 | 19 | } else { | 832 | 19 | wait = Some(queue.on_response_pushed_or_task_destroyed.listen()); | 833 | 19 | } | 834 | | } | 835 | 23 | } |
|
836 | | |
837 | | /// Adds a JSON-RPC request to the queue of requests of the [`ClientMainTask`]. Waits if the |
838 | | /// queue is full. |
839 | | /// |
840 | | /// This might cause a call to [`ClientMainTask::run_until_event`] to return |
841 | | /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`]. |
842 | 0 | pub async fn send_request(&self, request: String) -> Result<(), SendRequestError> { Unexecuted instantiation: _RNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request Unexecuted instantiation: _RNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request |
843 | | // Wait until it is possible to increment `num_requests_in_fly`. |
844 | 0 | let mut wait = None; |
845 | 0 | let queue = loop { |
846 | 0 | let Some(queue) = self.serialized_io.upgrade() else { |
847 | 0 | return Err(SendRequestError { |
848 | 0 | request, |
849 | 0 | cause: SendRequestErrorCause::ClientMainTaskDestroyed, |
850 | 0 | }); |
851 | | }; |
852 | | |
853 | 0 | if queue |
854 | 0 | .num_requests_in_fly |
855 | 0 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { |
856 | 0 | if old_value < queue.max_requests_in_fly.get() { |
857 | | // Considering that `old_value < max`, and `max` fits in a `u32` by |
858 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. |
859 | | // There's no risk of overflow. |
860 | 0 | Some(old_value + 1) |
861 | | } else { |
862 | 0 | None |
863 | | } |
864 | 0 | }) Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_ Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00CsfFWJyR6nd6r_17smoldot_full_node Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_ |
865 | 0 | .is_ok() |
866 | | { |
867 | 0 | break queue; |
868 | 0 | } |
869 | | |
870 | 0 | if let Some(wait) = wait.take() { |
871 | 0 | wait.await; |
872 | 0 | } else { |
873 | 0 | wait = Some(queue.on_request_pulled_or_task_destroyed.listen()); |
874 | 0 | } |
875 | | }; |
876 | | |
877 | | // Everything successful. |
878 | 0 | queue.requests_queue.push(request); |
879 | 0 | queue.on_request_pushed.notify(usize::MAX); |
880 | 0 | Ok(()) |
881 | 0 | } Unexecuted instantiation: _RNCNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_ Unexecuted instantiation: _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0CsfFWJyR6nd6r_17smoldot_full_node Unexecuted instantiation: _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_ |
882 | | |
883 | | /// Tries to add a JSON-RPC request to the queue of requests of the [`ClientMainTask`]. |
884 | | /// |
885 | | /// This might cause a call to [`ClientMainTask::run_until_event`] to return |
886 | | /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`]. |
887 | 25 | pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> { |
888 | 25 | let Some(queue) = self.serialized_io.upgrade() else { |
889 | 0 | return Err(TrySendRequestError { |
890 | 0 | request, |
891 | 0 | cause: TrySendRequestErrorCause::ClientMainTaskDestroyed, |
892 | 0 | }); |
893 | | }; |
894 | | |
895 | | // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum. |
896 | 25 | if queue |
897 | 25 | .num_requests_in_fly |
898 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { |
899 | 25 | if old_value < queue.max_requests_in_fly.get() { |
900 | | // Considering that `old_value < max`, and `max` fits in a `u32` by |
901 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. |
902 | | // There's no risk of overflow. |
903 | 25 | Some(old_value + 1) |
904 | | } else { |
905 | 0 | None |
906 | | } |
907 | 25 | }) Unexecuted instantiation: _RNCNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_ _RNCNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_ Line | Count | Source | 898 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { | 899 | 25 | if old_value < queue.max_requests_in_fly.get() { | 900 | | // Considering that `old_value < max`, and `max` fits in a `u32` by | 901 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. | 902 | | // There's no risk of overflow. | 903 | 25 | Some(old_value + 1) | 904 | | } else { | 905 | 0 | None | 906 | | } | 907 | 25 | }) |
|
908 | 25 | .is_err() |
909 | | { |
910 | 0 | return Err(TrySendRequestError { |
911 | 0 | request, |
912 | 0 | cause: TrySendRequestErrorCause::TooManyPendingRequests, |
913 | 0 | }); |
914 | 25 | } |
915 | | |
916 | | // Everything successful. |
917 | 25 | queue.requests_queue.push(request); |
918 | 25 | queue.on_request_pushed.notify(usize::MAX); |
919 | 25 | Ok(()) |
920 | 25 | } Unexecuted instantiation: _RNvMs1_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request _RNvMs1_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request Line | Count | Source | 887 | 25 | pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> { | 888 | 25 | let Some(queue) = self.serialized_io.upgrade() else { | 889 | 0 | return Err(TrySendRequestError { | 890 | 0 | request, | 891 | 0 | cause: TrySendRequestErrorCause::ClientMainTaskDestroyed, | 892 | 0 | }); | 893 | | }; | 894 | | | 895 | | // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum. | 896 | 25 | if queue | 897 | 25 | .num_requests_in_fly | 898 | 25 | .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| { | 899 | | if old_value < queue.max_requests_in_fly.get() { | 900 | | // Considering that `old_value < max`, and `max` fits in a `u32` by | 901 | | // definition, then `old_value + 1` also always fits in a `u32`. QED. | 902 | | // There's no risk of overflow. | 903 | | Some(old_value + 1) | 904 | | } else { | 905 | | None | 906 | | } | 907 | | }) | 908 | 25 | .is_err() | 909 | | { | 910 | 0 | return Err(TrySendRequestError { | 911 | 0 | request, | 912 | 0 | cause: TrySendRequestErrorCause::TooManyPendingRequests, | 913 | 0 | }); | 914 | 25 | } | 915 | | | 916 | | // Everything successful. | 917 | 25 | queue.requests_queue.push(request); | 918 | 25 | queue.on_request_pushed.notify(usize::MAX); | 919 | 25 | Ok(()) | 920 | 25 | } |
|
921 | | } |
922 | | |
923 | | impl fmt::Debug for SerializedRequestsIo { |
924 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
925 | 0 | f.debug_tuple("SerializedRequestsIo").finish() |
926 | 0 | } Unexecuted instantiation: _RNvXs2_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs2_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt |
927 | | } |
928 | | |
929 | | impl Drop for SerializedRequestsIo { |
930 | 21 | fn drop(&mut self) { |
931 | 21 | self.on_serialized_requests_io_destroyed.notify(usize::MAX); |
932 | 21 | } Unexecuted instantiation: _RNvXs3_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop _RNvXs3_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop Line | Count | Source | 930 | 21 | fn drop(&mut self) { | 931 | 21 | self.on_serialized_requests_io_destroyed.notify(usize::MAX); | 932 | 21 | } |
|
933 | | } |
934 | | |
935 | | /// See [`SerializedRequestsIo::wait_next_response`]. |
936 | | #[derive(Debug, Clone, derive_more::Display, derive_more::Error)] |
937 | | pub enum WaitNextResponseError { |
938 | | /// The attached [`ClientMainTask`] has been destroyed. |
939 | | ClientMainTaskDestroyed, |
940 | | } |
941 | | |
942 | | /// Error returned by [`SerializedRequestsIo::send_request`]. |
943 | | #[derive(Debug, derive_more::Display, derive_more::Error)] |
944 | | #[display("{cause}")] |
945 | | pub struct SendRequestError { |
946 | | /// The JSON-RPC request that was passed as parameter. |
947 | | pub request: String, |
948 | | /// Reason for the error. |
949 | | #[error(source)] |
950 | | pub cause: SendRequestErrorCause, |
951 | | } |
952 | | |
953 | | /// See [`SendRequestError::cause`]. |
954 | | #[derive(Debug, derive_more::Display, derive_more::Error)] |
955 | | pub enum SendRequestErrorCause { |
956 | | /// The attached [`ClientMainTask`] has been destroyed. |
957 | | ClientMainTaskDestroyed, |
958 | | } |
959 | | |
960 | | /// Error returned by [`SerializedRequestsIo::try_send_request`]. |
961 | | #[derive(Debug, derive_more::Display, derive_more::Error)] |
962 | | #[display("{cause}")] |
963 | | pub struct TrySendRequestError { |
964 | | /// The JSON-RPC request that was passed as parameter. |
965 | | pub request: String, |
966 | | /// Reason for the error. |
967 | | #[error(source)] |
968 | | pub cause: TrySendRequestErrorCause, |
969 | | } |
970 | | |
971 | | /// See [`TrySendRequestError::cause`]. |
972 | | #[derive(Debug, derive_more::Display, derive_more::Error)] |
973 | | pub enum TrySendRequestErrorCause { |
974 | | /// Limit to the maximum number of pending requests that was passed as |
975 | | /// [`Config::max_pending_requests`] has been reached. No more requests can be sent before |
976 | | /// some responses have been pulled. |
977 | | TooManyPendingRequests, |
978 | | /// The attached [`ClientMainTask`] has been destroyed. |
979 | | ClientMainTaskDestroyed, |
980 | | } |
981 | | |
982 | | /// Object connected to the [`ClientMainTask`] and containing a request expecting an answer. |
983 | | /// |
984 | | /// If this object is dropped before the request has been answered, an automatic "internal error" |
985 | | /// error response is automatically sent back. |
986 | | pub struct RequestProcess { |
987 | | /// Queue where responses and subscriptions push responses/notifications. |
988 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
989 | | /// Request in JSON form. Guaranteed to decode successfully. |
990 | | request: String, |
991 | | /// `true` if a response has already been sent. |
992 | | has_sent_response: bool, |
993 | | } |
994 | | |
995 | | impl RequestProcess { |
996 | | /// Returns the request which must be processed. |
997 | | /// |
998 | | /// The request is guaranteed to not be related to subscriptions in any way. |
999 | | // TODO: with stronger typing users wouldn't have to worry about the type of request |
1000 | 46 | pub fn request(&self) -> methods::MethodCall { |
1001 | 46 | methods::parse_jsonrpc_client_to_server(&self.request) |
1002 | 46 | .unwrap() |
1003 | 46 | .1 |
1004 | 46 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request _RNvMs4_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request Line | Count | Source | 1000 | 46 | pub fn request(&self) -> methods::MethodCall { | 1001 | 46 | methods::parse_jsonrpc_client_to_server(&self.request) | 1002 | 46 | .unwrap() | 1003 | 46 | .1 | 1004 | 46 | } |
|
1005 | | |
1006 | | /// Indicate the response to the request to the [`ClientMainTask`]. |
1007 | | /// |
1008 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1009 | 19 | pub fn respond(mut self, response: methods::Response) { |
1010 | 19 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1011 | 19 | .unwrap() |
1012 | 19 | .0; |
1013 | 19 | let serialized = response.to_json_response(request_id); |
1014 | 19 | self.responses_notifications_queue |
1015 | 19 | .queue |
1016 | 19 | .push(ToMainTask::RequestResponse(serialized)); |
1017 | 19 | self.responses_notifications_queue |
1018 | 19 | .on_pushed |
1019 | 19 | .notify(usize::MAX); |
1020 | 19 | self.has_sent_response = true; |
1021 | 19 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond _RNvMs4_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond Line | Count | Source | 1009 | 19 | pub fn respond(mut self, response: methods::Response) { | 1010 | 19 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1011 | 19 | .unwrap() | 1012 | 19 | .0; | 1013 | 19 | let serialized = response.to_json_response(request_id); | 1014 | 19 | self.responses_notifications_queue | 1015 | 19 | .queue | 1016 | 19 | .push(ToMainTask::RequestResponse(serialized)); | 1017 | 19 | self.responses_notifications_queue | 1018 | 19 | .on_pushed | 1019 | 19 | .notify(usize::MAX); | 1020 | 19 | self.has_sent_response = true; | 1021 | 19 | } |
|
1022 | | |
1023 | | /// Indicate to the [`ClientMainTask`] that the response to the request is `null`. |
1024 | | /// |
1025 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1026 | | // TODO: the necessity for this function is basically a hack |
1027 | 2 | pub fn respond_null(mut self) { |
1028 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1029 | 2 | .unwrap() |
1030 | 2 | .0; |
1031 | 2 | let serialized = parse::build_success_response(request_id, "null"); |
1032 | 2 | self.responses_notifications_queue |
1033 | 2 | .queue |
1034 | 2 | .push(ToMainTask::RequestResponse(serialized)); |
1035 | 2 | self.responses_notifications_queue |
1036 | 2 | .on_pushed |
1037 | 2 | .notify(usize::MAX); |
1038 | 2 | self.has_sent_response = true; |
1039 | 2 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null _RNvMs4_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null Line | Count | Source | 1027 | 2 | pub fn respond_null(mut self) { | 1028 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1029 | 2 | .unwrap() | 1030 | 2 | .0; | 1031 | 2 | let serialized = parse::build_success_response(request_id, "null"); | 1032 | 2 | self.responses_notifications_queue | 1033 | 2 | .queue | 1034 | 2 | .push(ToMainTask::RequestResponse(serialized)); | 1035 | 2 | self.responses_notifications_queue | 1036 | 2 | .on_pushed | 1037 | 2 | .notify(usize::MAX); | 1038 | 2 | self.has_sent_response = true; | 1039 | 2 | } |
|
1040 | | |
1041 | | /// Indicate to the [`ClientMainTask`] that the request should return an error. |
1042 | | /// |
1043 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1044 | 2 | pub fn fail(mut self, error: ErrorResponse) { |
1045 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1046 | 2 | .unwrap() |
1047 | 2 | .0; |
1048 | 2 | let serialized = parse::build_error_response(request_id, error, None); |
1049 | 2 | self.responses_notifications_queue |
1050 | 2 | .queue |
1051 | 2 | .push(ToMainTask::RequestResponse(serialized)); |
1052 | 2 | self.responses_notifications_queue |
1053 | 2 | .on_pushed |
1054 | 2 | .notify(usize::MAX); |
1055 | 2 | self.has_sent_response = true; |
1056 | 2 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail _RNvMs4_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail Line | Count | Source | 1044 | 2 | pub fn fail(mut self, error: ErrorResponse) { | 1045 | 2 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1046 | 2 | .unwrap() | 1047 | 2 | .0; | 1048 | 2 | let serialized = parse::build_error_response(request_id, error, None); | 1049 | 2 | self.responses_notifications_queue | 1050 | 2 | .queue | 1051 | 2 | .push(ToMainTask::RequestResponse(serialized)); | 1052 | 2 | self.responses_notifications_queue | 1053 | 2 | .on_pushed | 1054 | 2 | .notify(usize::MAX); | 1055 | 2 | self.has_sent_response = true; | 1056 | 2 | } |
|
1057 | | |
1058 | | /// Indicate to the [`ClientMainTask`] that the request should return an error. |
1059 | | /// |
1060 | | /// This function is similar to [`RequestProcess`], except that an additional JSON payload is |
1061 | | /// attached to the error. |
1062 | | /// |
1063 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1064 | 0 | pub fn fail_with_attached_json(mut self, error: ErrorResponse, json: &str) { |
1065 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1066 | 0 | .unwrap() |
1067 | 0 | .0; |
1068 | 0 | let serialized = parse::build_error_response(request_id, error, Some(json)); |
1069 | 0 | self.responses_notifications_queue |
1070 | 0 | .queue |
1071 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1072 | 0 | self.responses_notifications_queue |
1073 | 0 | .on_pushed |
1074 | 0 | .notify(usize::MAX); |
1075 | 0 | self.has_sent_response = true; |
1076 | 0 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json Unexecuted instantiation: _RNvMs4_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json |
1077 | | } |
1078 | | |
1079 | | impl fmt::Debug for RequestProcess { |
1080 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1081 | 0 | fmt::Debug::fmt(&self.request, f) |
1082 | 0 | } Unexecuted instantiation: _RNvXs5_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs5_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt |
1083 | | } |
1084 | | |
1085 | | impl Drop for RequestProcess { |
1086 | 23 | fn drop(&mut self) { |
1087 | 23 | if !self.has_sent_response { |
1088 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1089 | 0 | .unwrap() |
1090 | 0 | .0; |
1091 | 0 | let serialized = |
1092 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); |
1093 | 0 | self.responses_notifications_queue |
1094 | 0 | .queue |
1095 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1096 | 0 | self.responses_notifications_queue |
1097 | 0 | .on_pushed |
1098 | 0 | .notify(usize::MAX); |
1099 | 23 | } |
1100 | 23 | } Unexecuted instantiation: _RNvXs6_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop _RNvXs6_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop Line | Count | Source | 1086 | 23 | fn drop(&mut self) { | 1087 | 23 | if !self.has_sent_response { | 1088 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) | 1089 | 0 | .unwrap() | 1090 | 0 | .0; | 1091 | 0 | let serialized = | 1092 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); | 1093 | 0 | self.responses_notifications_queue | 1094 | 0 | .queue | 1095 | 0 | .push(ToMainTask::RequestResponse(serialized)); | 1096 | 0 | self.responses_notifications_queue | 1097 | 0 | .on_pushed | 1098 | 0 | .notify(usize::MAX); | 1099 | 23 | } | 1100 | 23 | } |
|
1101 | | } |
1102 | | |
1103 | | /// Object connected to the [`ClientMainTask`] and containing a request that leads to the creation |
1104 | | /// of a subscription. |
1105 | | /// |
1106 | | /// If this object is dropped before the request has been answered, an automatic "internal error" |
1107 | | /// error response is automatically sent back. |
1108 | | pub struct SubscriptionStartProcess { |
1109 | | /// Queue where responses and subscriptions push responses/notifications. |
1110 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
1111 | | /// `Arc` shared with the client main task and that is used to notify that the subscription |
1112 | | /// should be killed. |
1113 | | kill_channel: Arc<SubscriptionKillChannel>, |
1114 | | /// Request in JSON form. Guaranteed to decode successfully. |
1115 | | request: String, |
1116 | | /// Identifier of the subscription. Assigned by the client task. |
1117 | | subscription_id: String, |
1118 | | /// `true` if a response has already been sent. |
1119 | | has_sent_response: bool, |
1120 | | } |
1121 | | |
1122 | | impl SubscriptionStartProcess { |
1123 | | /// Returns the request which must be processed. |
1124 | | /// |
1125 | | /// The request is guaranteed to be a request that starts a subscription. |
1126 | | // TODO: with stronger typing users wouldn't have to worry about the type of request |
1127 | 0 | pub fn request(&self) -> methods::MethodCall { |
1128 | 0 | methods::parse_jsonrpc_client_to_server(&self.request) |
1129 | 0 | .unwrap() |
1130 | 0 | .1 |
1131 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request Unexecuted instantiation: _RNvMs7_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request |
1132 | | |
1133 | | /// Indicate to the [`ClientMainTask`] that the subscription is accepted. |
1134 | | /// |
1135 | | /// The [`ClientMainTask`] will send the confirmation to the JSON-RPC client. |
1136 | | /// |
1137 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1138 | 0 | pub fn accept(mut self) -> Subscription { |
1139 | 0 | let (request_id, parsed_request) = |
1140 | 0 | methods::parse_jsonrpc_client_to_server(&self.request).unwrap(); |
1141 | | |
1142 | 0 | let serialized_response = match parsed_request { |
1143 | | methods::MethodCall::author_submitAndWatchExtrinsic { .. } => { |
1144 | 0 | methods::Response::author_submitAndWatchExtrinsic(Cow::Borrowed( |
1145 | 0 | &self.subscription_id, |
1146 | 0 | )) |
1147 | | } |
1148 | | methods::MethodCall::chain_subscribeAllHeads { .. } => { |
1149 | 0 | methods::Response::chain_subscribeAllHeads(Cow::Borrowed(&self.subscription_id)) |
1150 | | } |
1151 | | methods::MethodCall::chain_subscribeFinalizedHeads { .. } => { |
1152 | 0 | methods::Response::chain_subscribeFinalizedHeads(Cow::Borrowed( |
1153 | 0 | &self.subscription_id, |
1154 | 0 | )) |
1155 | | } |
1156 | | methods::MethodCall::chain_subscribeNewHeads { .. } => { |
1157 | 0 | methods::Response::chain_subscribeNewHeads(Cow::Borrowed(&self.subscription_id)) |
1158 | | } |
1159 | | methods::MethodCall::state_subscribeRuntimeVersion { .. } => { |
1160 | 0 | methods::Response::state_subscribeRuntimeVersion(Cow::Borrowed( |
1161 | 0 | &self.subscription_id, |
1162 | 0 | )) |
1163 | | } |
1164 | | methods::MethodCall::state_subscribeStorage { .. } => { |
1165 | 0 | methods::Response::state_subscribeStorage(Cow::Borrowed(&self.subscription_id)) |
1166 | | } |
1167 | | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } => { |
1168 | 0 | methods::Response::transactionWatch_v1_submitAndWatch(Cow::Borrowed( |
1169 | 0 | &self.subscription_id, |
1170 | 0 | )) |
1171 | | } |
1172 | | methods::MethodCall::sudo_network_unstable_watch { .. } => { |
1173 | 0 | methods::Response::sudo_network_unstable_watch(Cow::Borrowed(&self.subscription_id)) |
1174 | | } |
1175 | | methods::MethodCall::chainHead_v1_follow { .. } => { |
1176 | 0 | methods::Response::chainHead_v1_follow(Cow::Borrowed(&self.subscription_id)) |
1177 | | } |
1178 | 0 | _ => unreachable!(), |
1179 | | } |
1180 | 0 | .to_json_response(request_id); |
1181 | | |
1182 | 0 | self.responses_notifications_queue |
1183 | 0 | .queue |
1184 | 0 | .push(ToMainTask::RequestResponse(serialized_response)); |
1185 | 0 | self.responses_notifications_queue |
1186 | 0 | .on_pushed |
1187 | 0 | .notify(usize::MAX); |
1188 | 0 | self.has_sent_response = true; |
1189 | | |
1190 | 0 | Subscription { |
1191 | 0 | responses_notifications_queue: self.responses_notifications_queue.clone(), |
1192 | 0 | kill_channel: self.kill_channel.clone(), |
1193 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1194 | 0 | } |
1195 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept Unexecuted instantiation: _RNvMs7_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept |
1196 | | |
1197 | | /// Indicate to the [`ClientMainTask`] that the subscription start request should return an |
1198 | | /// error. |
1199 | | /// |
1200 | | /// Has no effect if the [`ClientMainTask`] has been destroyed. |
1201 | 0 | pub fn fail(mut self, error: ErrorResponse) { |
1202 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1203 | 0 | .unwrap() |
1204 | 0 | .0; |
1205 | 0 | let serialized = parse::build_error_response(request_id, error, None); |
1206 | 0 | self.responses_notifications_queue |
1207 | 0 | .queue |
1208 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1209 | 0 | self.responses_notifications_queue |
1210 | 0 | .queue |
1211 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1212 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1213 | 0 | }); |
1214 | 0 | self.responses_notifications_queue |
1215 | 0 | .on_pushed |
1216 | 0 | .notify(usize::MAX); |
1217 | 0 | self.has_sent_response = true; |
1218 | 0 | } Unexecuted instantiation: _RNvMs7_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail Unexecuted instantiation: _RNvMs7_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail |
1219 | | } |
1220 | | |
1221 | | impl fmt::Debug for SubscriptionStartProcess { |
1222 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1223 | 0 | fmt::Debug::fmt(&self.request, f) |
1224 | 0 | } Unexecuted instantiation: _RNvXs8_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs8_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt |
1225 | | } |
1226 | | |
1227 | | impl Drop for SubscriptionStartProcess { |
1228 | 0 | fn drop(&mut self) { |
1229 | 0 | if !self.has_sent_response { |
1230 | 0 | let request_id = methods::parse_jsonrpc_client_to_server(&self.request) |
1231 | 0 | .unwrap() |
1232 | 0 | .0; |
1233 | 0 | let serialized = |
1234 | 0 | parse::build_error_response(request_id, ErrorResponse::InternalError, None); |
1235 | 0 | self.responses_notifications_queue |
1236 | 0 | .queue |
1237 | 0 | .push(ToMainTask::RequestResponse(serialized)); |
1238 | 0 | self.responses_notifications_queue |
1239 | 0 | .queue |
1240 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1241 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1242 | 0 | }); |
1243 | 0 | self.responses_notifications_queue |
1244 | 0 | .on_pushed |
1245 | 0 | .notify(usize::MAX); |
1246 | 0 | } |
1247 | 0 | } Unexecuted instantiation: _RNvXs9_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop Unexecuted instantiation: _RNvXs9_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop |
1248 | | } |
1249 | | |
1250 | | /// Object connected to the [`ClientMainTask`] representing an active subscription. |
1251 | | pub struct Subscription { |
1252 | | /// Queue where responses and subscriptions push responses/notifications. |
1253 | | responses_notifications_queue: Arc<ResponsesNotificationsQueue>, |
1254 | | /// `Arc` shared with the client main task and that is used to notify that the subscription |
1255 | | /// should be killed. |
1256 | | kill_channel: Arc<SubscriptionKillChannel>, |
1257 | | /// Identifier of the subscription. Assigned by the client task. |
1258 | | subscription_id: String, |
1259 | | } |
1260 | | |
1261 | | /// See [`Subscription::kill_channel`]. |
1262 | | struct SubscriptionKillChannel { |
1263 | | /// `true` if this subscription should be destroyed as soon as possible. |
1264 | | dead: AtomicBool, |
1265 | | /// Notified whenever [`SubscriptionKillChannel::dead`] is modified. |
1266 | | on_dead_changed: event_listener::Event, |
1267 | | } |
1268 | | |
1269 | | impl Subscription { |
1270 | | /// Return the identifier of this subscription. Necessary in order to generate answers. |
1271 | 0 | pub fn subscription_id(&self) -> &str { |
1272 | 0 | &self.subscription_id |
1273 | 0 | } Unexecuted instantiation: _RNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id Unexecuted instantiation: _RNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id |
1274 | | |
1275 | | /// Send a notification the [`ClientMainTask`]. |
1276 | | /// |
1277 | | /// Has no effect if [`Subscription::is_stale`] would return `true`. |
1278 | | /// |
1279 | | /// This notification might end up being discarded if the queue of responses to send back to |
1280 | | /// the JSON-RPC client is full and/or if the notification is redundant with another |
1281 | | /// notification sent earlier. |
1282 | | /// |
1283 | | /// While this function is asynchronous, it is expected to not take very long provided that |
1284 | | /// [`ClientMainTask::run_until_event`] is called in parallel. |
1285 | | /// |
1286 | | /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to |
1287 | | /// > this function, otherwise it might never return. |
1288 | | // TODO: with stronger typing we could automatically fill the subscription_id |
1289 | 0 | pub async fn send_notification(&mut self, notification: methods::ServerToClient<'_>) { Unexecuted instantiation: _RNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification Unexecuted instantiation: _RNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification |
1290 | 0 | let serialized = notification.to_json_request_object_parameters(None); |
1291 | | |
1292 | | // Wait until there is space in the queue or that the subscription is dead. |
1293 | | // Note that this is intentionally racy. |
1294 | | { |
1295 | 0 | let mut wait = None; |
1296 | | loop { |
1297 | | // If the subscription is dead, simply do nothing. This is purely an optimization. |
1298 | 0 | if self.kill_channel.dead.load(Ordering::Relaxed) { |
1299 | 0 | return; |
1300 | 0 | } |
1301 | | |
1302 | | // If there is space, break out of the loop in order to send. |
1303 | 0 | if self.responses_notifications_queue.queue.len() |
1304 | 0 | < self.responses_notifications_queue.max_len |
1305 | | { |
1306 | 0 | break; |
1307 | 0 | } |
1308 | | |
1309 | 0 | if let Some(wait) = wait.take() { |
1310 | 0 | wait.await |
1311 | 0 | } else { |
1312 | 0 | wait = Some( |
1313 | 0 | self.responses_notifications_queue |
1314 | 0 | .on_popped |
1315 | 0 | .listen() |
1316 | 0 | .or(self.kill_channel.on_dead_changed.listen()), |
1317 | 0 | ); |
1318 | 0 | } |
1319 | | } |
1320 | | } |
1321 | | |
1322 | | // Actually push the element. |
1323 | 0 | self.responses_notifications_queue |
1324 | 0 | .queue |
1325 | 0 | .push(ToMainTask::Notification(serialized)); |
1326 | 0 | self.responses_notifications_queue |
1327 | 0 | .on_pushed |
1328 | 0 | .notify(usize::MAX); |
1329 | 0 | } Unexecuted instantiation: _RNCNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_ Unexecuted instantiation: _RNCNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0CsfFWJyR6nd6r_17smoldot_full_node Unexecuted instantiation: _RNCNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_ |
1330 | | |
1331 | | /// Returns `true` if the JSON-RPC client has unsubscribed, or the [`ClientMainTask`] has been |
1332 | | /// destroyed, or the queue of responses to send to the JSON-RPC client is clogged and the |
1333 | | /// logic of the subscription requires that it stops altogether in that situation. |
1334 | | /// |
1335 | | /// Due to the racy nature of this function, a value of `false` can at any moment switch to |
1336 | | /// `true` and thus should be interpreted as "maybe". A value of `true`, however, actually |
1337 | | /// means "yes", as it can't ever switch back to `false`. |
1338 | 0 | pub fn is_stale(&self) -> bool { |
1339 | 0 | self.kill_channel.dead.load(Ordering::Relaxed) |
1340 | 0 | } Unexecuted instantiation: _RNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale Unexecuted instantiation: _RNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale |
1341 | | |
1342 | | /// Run indefinitely until [`Subscription::is_stale`] returns `true`. |
1343 | 0 | pub async fn wait_until_stale(&mut self) { Unexecuted instantiation: _RNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale Unexecuted instantiation: _RNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale |
1344 | | // The control flow of this function is a bit magic, but simple enough that it should be |
1345 | | // easy to understand. |
1346 | 0 | let mut wait = None; |
1347 | | loop { |
1348 | 0 | if self.kill_channel.dead.load(Ordering::Acquire) { |
1349 | 0 | return; |
1350 | 0 | } |
1351 | | |
1352 | 0 | if let Some(wait) = wait.take() { |
1353 | 0 | wait.await; |
1354 | 0 | } else { |
1355 | 0 | wait = Some(self.kill_channel.on_dead_changed.listen()); |
1356 | 0 | } |
1357 | | } |
1358 | 0 | } Unexecuted instantiation: _RNCNvMsa_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_ Unexecuted instantiation: _RNCNvMsa_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_ |
1359 | | } |
1360 | | |
1361 | | impl fmt::Debug for Subscription { |
1362 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1363 | 0 | f.debug_tuple("Subscription") |
1364 | 0 | .field(&self.subscription_id) |
1365 | 0 | .finish() |
1366 | 0 | } Unexecuted instantiation: _RNvXsb_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXsb_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCs1p5UDGgVI4d_4core3fmt5Debug3fmt |
1367 | | } |
1368 | | |
1369 | | impl Drop for Subscription { |
1370 | 0 | fn drop(&mut self) { |
1371 | 0 | self.responses_notifications_queue |
1372 | 0 | .queue |
1373 | 0 | .push(ToMainTask::SubscriptionDestroyed { |
1374 | 0 | subscription_id: mem::take(&mut self.subscription_id), |
1375 | 0 | }); |
1376 | 0 | self.responses_notifications_queue |
1377 | 0 | .on_pushed |
1378 | 0 | .notify(usize::MAX); |
1379 | 0 | } Unexecuted instantiation: _RNvXsc_NtNtNtCsjlkOsLH0Zfj_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop Unexecuted instantiation: _RNvXsc_NtNtNtCsc1ywvx6YAnK_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCs1p5UDGgVI4d_4core3ops4drop4Drop4drop |
1380 | | } |