Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/json_rpc/service/client_main_task.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2023  Pierre Krieger
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
// TODO: doc
19
20
use crate::json_rpc::{methods, parse};
21
use alloc::{
22
    borrow::Cow,
23
    boxed::Box,
24
    collections::VecDeque,
25
    string::{String, ToString as _},
26
    sync::{Arc, Weak},
27
};
28
use async_lock::Mutex;
29
use core::{
30
    cmp, fmt, mem,
31
    num::NonZeroU32,
32
    sync::atomic::{AtomicBool, AtomicU32, Ordering},
33
};
34
use futures_lite::FutureExt as _;
35
use slab::Slab;
36
37
pub use crate::json_rpc::parse::{ErrorResponse, ParseError};
38
39
/// See [module-level-documentation](..).
40
pub struct ClientMainTask {
41
    /// Because we move the task around a lot, all the fields are actually within a `Box`.
42
    inner: Box<Inner>,
43
}
44
45
struct Inner {
46
    /// Identifier to allocate to the new subscription requested by the user.
47
    // TODO: better strategy than just integers?
48
    next_subscription_id: u64,
49
50
    /// List of all active subscriptions. Keys are subscription IDs.
51
    ///
52
    /// Given that the subscription IDs are allocated locally, there is no harm in using a
53
    /// non-HashDoS-resilient hash function.
54
    ///
55
    /// Entries are removed only when the [`SubscriptionStartProcess`] or [`Subscription`] object
56
    /// is destroyed. This is necessary given that the maximum number of subscriptions exists in
57
    /// order to avoid spam attacks, and that resources are free'd only when the
58
    /// [`SubscriptionStartProcess`] or [`Subscription`] is destroyed.
59
    active_subscriptions: hashbrown::HashMap<String, InnerSubscription, fnv::FnvBuildHasher>,
60
    /// Maximum size that [`Inner::active_subscriptions`] is allowed to reach. Beyond this,
61
    /// subscription start requests are automatically denied.
62
    max_active_subscriptions: u32,
63
64
    /// Structure shared with the [`SerializedRequestsIo`].
65
    serialized_io: Arc<SerializedIo>,
66
67
    /// Queue where responses and subscriptions push responses/notifications.
68
    responses_notifications_queue: Arc<ResponsesNotificationsQueue>,
69
70
    /// Event notified after the [`SerializedRequestsIo`] is destroyed.
71
    on_serialized_requests_io_destroyed: event_listener::EventListener,
72
}
73
74
struct InnerSubscription {
75
    /// Shared with the subscription. Used to notify the subscription that it should be killed.
76
    kill_channel: Arc<SubscriptionKillChannel>,
77
    /// Response to an unsubscribe request that must be sent out once the subscription is killed.
78
    unsubscribe_response: Option<String>,
79
}
80
81
struct SerializedIo {
82
    /// Queue of requests. The requests are guaranteed to be a valid request JSON, but not
83
    /// necessarily to use a known method.
84
    requests_queue: crossbeam_queue::SegQueue<String>,
85
86
    /// Event notified after an element has been pushed to [`SerializedIo::requests_queue`].
87
    on_request_pushed: event_listener::Event,
88
89
    /// Event notified after an element from [`SerializedIo::requests_queue`] has been pulled.
90
    on_request_pulled_or_task_destroyed: event_listener::Event,
91
92
    /// Number of requests that have have been received from the client but whose answer hasn't
93
    /// been pulled out from [`SerializedIo::requests_queue`] yet.
94
    num_requests_in_fly: AtomicU32,
95
96
    /// Maximum value that [`SerializedIo::num_requests_in_fly`] is allowed to reach.
97
    /// Beyond this, no more request should be added to [`SerializedIo::requests_queue`].
98
    max_requests_in_fly: NonZeroU32,
99
100
    /// Queue of responses.
101
    responses_queue: Mutex<SerializedIoResponses>,
102
103
    /// Event notified after an element has been pushed to [`SerializedIo::responses_queue`], or
104
    /// when the [`ClientMainTask`] has been destroyed.
105
    on_response_pushed_or_task_destroyed: event_listener::Event,
106
}
107
108
struct SerializedIoResponses {
109
    /// Unordered list of responses and notifications to send back to the client.
110
    ///
111
    /// Each entry contains the response/notification, and a boolean equal to `true` if this is
112
    /// a request response or `false` if this is a notification.
113
    pending_serialized_responses: Slab<(String, bool)>,
114
115
    /// Ordered list of responses and notifications to send back to the client, as indices within
116
    /// [`SerializedIoResponses::pending_serialized_responses`].
117
    pending_serialized_responses_queue: VecDeque<usize>,
118
}
119
120
/// Queue where responses and subscriptions push responses/notifications.
121
struct ResponsesNotificationsQueue {
122
    /// The actual queue.
123
    queue: crossbeam_queue::SegQueue<ToMainTask>,
124
    /// Maximum size that [`ResponsesNotificationsQueue::queue`] should reach.
125
    /// This is however not a hard limit. Pushing a response to a request and pushing a
126
    /// subscription destroyed event ignore this maximum (as doing so must always be lock-free),
127
    /// and pushing a notification checks against this limit in a racy way. For this reason, in
128
    /// the worst case scenario the queue can reach up to
129
    /// `max_requests_in_fly + max_active_subscriptions` elements. What matters, however, is that
130
    /// the queue is bounded in a way or the other more than the exact bound.
131
    max_len: usize,
132
    /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been pushed.
133
    on_pushed: event_listener::Event,
134
    /// Event notified after an element from [`ResponsesNotificationsQueue::queue`] has been popped.
135
    on_popped: event_listener::Event,
136
}
137
138
// TODO: weird enum
139
enum ToMainTask {
140
    RequestResponse(String),
141
    Notification(String),
142
    SubscriptionDestroyed { subscription_id: String },
143
}
144
145
/// Configuration for [`client_main_task`].
146
pub struct Config {
147
    /// Maximum number of requests that have been sent by the [`SerializedRequestsIo`] but whose
148
    /// response hasn't been pulled through the [`SerializedRequestsIo`] yet.
149
    ///
150
    /// If this limit is reached, it is not possible to send further requests without pulling
151
    /// responses first.
152
    pub max_pending_requests: NonZeroU32,
153
154
    /// Maximum number of simultaneous subscriptions allowed. Trying to create a subscription will
155
    /// be automatically rejected if this limit is reached.
156
    pub max_active_subscriptions: u32,
157
}
158
159
/// Creates a new [`ClientMainTask`] and a [`SerializedRequestsIo`] connected to it.
160
21
pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) {
161
21
    let buffers_capacity = usize::try_from(config.max_pending_requests.get())
162
21
        .unwrap_or(usize::MAX)
163
21
        .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX));
164
21
165
21
    let on_serialized_requests_io_destroyed = event_listener::Event::new();
166
21
167
21
    let task = ClientMainTask {
168
21
        inner: Box::new(Inner {
169
21
            next_subscription_id: 1,
170
21
            active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
171
21
                cmp::min(
172
21
                    usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX),
173
21
                    32,
174
21
                ),
175
21
                Default::default(),
176
21
            ),
177
21
            max_active_subscriptions: config.max_active_subscriptions,
178
21
            serialized_io: Arc::new(SerializedIo {
179
21
                requests_queue: crossbeam_queue::SegQueue::new(),
180
21
                on_request_pushed: event_listener::Event::new(),
181
21
                on_request_pulled_or_task_destroyed: event_listener::Event::new(),
182
21
                num_requests_in_fly: AtomicU32::new(0),
183
21
                max_requests_in_fly: config.max_pending_requests,
184
21
                responses_queue: Mutex::new(SerializedIoResponses {
185
21
                    pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min(
186
21
                        64,
187
21
                        buffers_capacity,
188
21
                    )),
189
21
                    pending_serialized_responses: Slab::with_capacity(cmp::min(
190
21
                        64,
191
21
                        buffers_capacity,
192
21
                    )),
193
21
                }),
194
21
                on_response_pushed_or_task_destroyed: event_listener::Event::new(),
195
21
            }),
196
21
            responses_notifications_queue: Arc::new(ResponsesNotificationsQueue {
197
21
                queue: crossbeam_queue::SegQueue::new(),
198
21
                max_len: buffers_capacity,
199
21
                on_pushed: event_listener::Event::new(),
200
21
                on_popped: event_listener::Event::new(),
201
21
            }),
202
21
            on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(),
203
21
        }),
204
21
    };
205
21
206
21
    let serialized_requests_io = SerializedRequestsIo {
207
21
        serialized_io: Arc::downgrade(&task.inner.serialized_io),
208
21
        on_serialized_requests_io_destroyed,
209
21
    };
210
21
211
21
    (task, serialized_requests_io)
212
21
}
Unexecuted instantiation: _RNvNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_task16client_main_task
_RNvNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_task16client_main_task
Line
Count
Source
160
21
pub fn client_main_task(config: Config) -> (ClientMainTask, SerializedRequestsIo) {
161
21
    let buffers_capacity = usize::try_from(config.max_pending_requests.get())
162
21
        .unwrap_or(usize::MAX)
163
21
        .saturating_add(usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX));
164
21
165
21
    let on_serialized_requests_io_destroyed = event_listener::Event::new();
166
21
167
21
    let task = ClientMainTask {
168
21
        inner: Box::new(Inner {
169
21
            next_subscription_id: 1,
170
21
            active_subscriptions: hashbrown::HashMap::with_capacity_and_hasher(
171
21
                cmp::min(
172
21
                    usize::try_from(config.max_active_subscriptions).unwrap_or(usize::MAX),
173
21
                    32,
174
21
                ),
175
21
                Default::default(),
176
21
            ),
177
21
            max_active_subscriptions: config.max_active_subscriptions,
178
21
            serialized_io: Arc::new(SerializedIo {
179
21
                requests_queue: crossbeam_queue::SegQueue::new(),
180
21
                on_request_pushed: event_listener::Event::new(),
181
21
                on_request_pulled_or_task_destroyed: event_listener::Event::new(),
182
21
                num_requests_in_fly: AtomicU32::new(0),
183
21
                max_requests_in_fly: config.max_pending_requests,
184
21
                responses_queue: Mutex::new(SerializedIoResponses {
185
21
                    pending_serialized_responses_queue: VecDeque::with_capacity(cmp::min(
186
21
                        64,
187
21
                        buffers_capacity,
188
21
                    )),
189
21
                    pending_serialized_responses: Slab::with_capacity(cmp::min(
190
21
                        64,
191
21
                        buffers_capacity,
192
21
                    )),
193
21
                }),
194
21
                on_response_pushed_or_task_destroyed: event_listener::Event::new(),
195
21
            }),
196
21
            responses_notifications_queue: Arc::new(ResponsesNotificationsQueue {
197
21
                queue: crossbeam_queue::SegQueue::new(),
198
21
                max_len: buffers_capacity,
199
21
                on_pushed: event_listener::Event::new(),
200
21
                on_popped: event_listener::Event::new(),
201
21
            }),
202
21
            on_serialized_requests_io_destroyed: on_serialized_requests_io_destroyed.listen(),
203
21
        }),
204
21
    };
205
21
206
21
    let serialized_requests_io = SerializedRequestsIo {
207
21
        serialized_io: Arc::downgrade(&task.inner.serialized_io),
208
21
        on_serialized_requests_io_destroyed,
209
21
    };
210
21
211
21
    (task, serialized_requests_io)
212
21
}
213
214
impl ClientMainTask {
215
    /// Processes the task's internals and waits until something noteworthy happens.
216
44
    
pub async fn run_until_event(mut self) -> Event 0
{
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask15run_until_event
217
        loop {
218
            enum WakeUpReason {
219
                NewRequest(String),
220
                Message(ToMainTask),
221
            }
222
223
48
            let wake_up_reason = {
224
69
                let serialized_requests_io_destroyed = async {
225
69
                    (&mut self.inner.on_serialized_requests_io_destroyed).
await65
;
226
21
                    Err(())
227
21
                };
Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_
Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00Bc_
_RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event00CsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
224
69
                let serialized_requests_io_destroyed = async {
225
69
                    (&mut self.inner.on_serialized_requests_io_destroyed).
await65
;
226
21
                    Err(())
227
21
                };
228
229
69
                let next_serialized_request = async {
230
69
                    let mut wait = None;
231
                    loop {
232
155
                        if let Some(
elem25
) = self.inner.serialized_io.requests_queue.pop() {
233
25
                            self.inner
234
25
                                .serialized_io
235
25
                                .on_request_pulled_or_task_destroyed
236
25
                                .notify(usize::MAX);
237
25
                            break Ok(WakeUpReason::NewRequest(elem));
238
130
                        }
239
130
                        if let Some(
wait65
) = wait.take() {
240
65
                            wait.
await44
241
65
                        } else {
242
65
                            wait = Some(self.inner.serialized_io.on_request_pushed.listen());
243
65
                        }
244
                    }
245
25
                };
Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_
Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0Bc_
_RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s_0CsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
229
69
                let next_serialized_request = async {
230
69
                    let mut wait = None;
231
                    loop {
232
155
                        if let Some(
elem25
) = self.inner.serialized_io.requests_queue.pop() {
233
25
                            self.inner
234
25
                                .serialized_io
235
25
                                .on_request_pulled_or_task_destroyed
236
25
                                .notify(usize::MAX);
237
25
                            break Ok(WakeUpReason::NewRequest(elem));
238
130
                        }
239
130
                        if let Some(
wait65
) = wait.take() {
240
65
                            wait.
await44
241
65
                        } else {
242
65
                            wait = Some(self.inner.serialized_io.on_request_pushed.listen());
243
65
                        }
244
                    }
245
25
                };
246
247
69
                let response_notif = async {
248
65
                    let mut wait = None;
249
                    loop {
250
153
                        if let Some(
elem23
) = self.inner.responses_notifications_queue.queue.pop() {
251
23
                            break Ok(WakeUpReason::Message(elem));
252
130
                        }
253
130
                        if let Some(
wait65
) = wait.take() {
254
65
                            wait.
await23
255
65
                        } else {
256
65
                            wait =
257
65
                                Some(self.inner.responses_notifications_queue.on_pushed.listen());
258
65
                        }
259
                    }
260
23
                };
Unexecuted instantiation: _RNCNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_
Unexecuted instantiation: _RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0Bc_
_RNCNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB6_14ClientMainTask15run_until_event0s0_0CsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
247
65
                let response_notif = async {
248
65
                    let mut wait = None;
249
                    loop {
250
153
                        if let Some(
elem23
) = self.inner.responses_notifications_queue.queue.pop() {
251
23
                            break Ok(WakeUpReason::Message(elem));
252
130
                        }
253
130
                        if let Some(
wait65
) = wait.take() {
254
65
                            wait.
await23
255
65
                        } else {
256
65
                            wait =
257
65
                                Some(self.inner.responses_notifications_queue.on_pushed.listen());
258
65
                        }
259
                    }
260
23
                };
261
262
69
                match serialized_requests_io_destroyed
263
69
                    .or(next_serialized_request)
264
69
                    .or(response_notif)
265
65
                    .await
266
                {
267
48
                    Ok(wake_up_reason) => wake_up_reason,
268
21
                    Err(()) => return Event::SerializedRequestsIoClosed,
269
                }
270
            };
271
272
            // Immediately handle every event apart from `NewRequest`.
273
25
            let new_request = match 
wake_up_reason23
{
274
25
                WakeUpReason::NewRequest(request) => request,
275
0
                WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => {
276
0
                    let InnerSubscription {
277
0
                        unsubscribe_response,
278
0
                        ..
279
0
                    } = self
280
0
                        .inner
281
0
                        .active_subscriptions
282
0
                        .remove(&subscription_id)
283
0
                        .unwrap();
284
                    // TODO: post a `stop`/`error` event for chainhead subscriptions
285
0
                    if let Some(unsubscribe_response) = unsubscribe_response {
286
0
                        let mut responses_queue =
287
0
                            self.inner.serialized_io.responses_queue.lock().await;
288
0
                        let pos = responses_queue
289
0
                            .pending_serialized_responses
290
0
                            .insert((unsubscribe_response, true));
291
0
                        responses_queue
292
0
                            .pending_serialized_responses_queue
293
0
                            .push_back(pos);
294
0
                        self.inner
295
0
                            .serialized_io
296
0
                            .on_response_pushed_or_task_destroyed
297
0
                            .notify(usize::MAX);
298
0
                    }
299
300
                    // Shrink the list of active subscriptions if necessary.
301
0
                    if self.inner.active_subscriptions.capacity()
302
0
                        >= 2 * self.inner.active_subscriptions.len() + 16
303
0
                    {
304
0
                        self.inner.active_subscriptions.shrink_to_fit();
305
0
                    }
306
307
0
                    return Event::SubscriptionDestroyed {
308
0
                        task: self,
309
0
                        subscription_id,
310
0
                    };
311
                }
312
23
                WakeUpReason::Message(ToMainTask::RequestResponse(response)) => {
313
23
                    let mut responses_queue = self.inner.serialized_io.responses_queue.lock().
await0
;
314
23
                    let pos = responses_queue
315
23
                        .pending_serialized_responses
316
23
                        .insert((response, true));
317
23
                    responses_queue
318
23
                        .pending_serialized_responses_queue
319
23
                        .push_back(pos);
320
23
                    self.inner
321
23
                        .serialized_io
322
23
                        .on_response_pushed_or_task_destroyed
323
23
                        .notify(usize::MAX);
324
23
                    continue;
325
                }
326
0
                WakeUpReason::Message(ToMainTask::Notification(notification)) => {
327
                    // TODO: filter out redundant notifications, as it's the entire point of this module
328
0
                    let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await;
329
0
                    let pos = responses_queue
330
0
                        .pending_serialized_responses
331
0
                        .insert((notification, false));
332
0
                    responses_queue
333
0
                        .pending_serialized_responses_queue
334
0
                        .push_back(pos);
335
0
                    self.inner
336
0
                        .serialized_io
337
0
                        .on_response_pushed_or_task_destroyed
338
0
                        .notify(usize::MAX);
339
0
                    continue;
340
                }
341
            };
342
343
23
            let (request_id, parsed_request) =
344
25
                match methods::parse_jsonrpc_client_to_server(&new_request) {
345
23
                    Ok((request_id, method)) => (request_id, method),
346
1
                    Err(methods::ParseClientToServerError::Method { request_id, error }) => {
347
1
                        let response = error.to_json_error(request_id);
348
1
                        let mut responses_queue =
349
1
                            self.inner.serialized_io.responses_queue.lock().
await0
;
350
1
                        let pos = responses_queue
351
1
                            .pending_serialized_responses
352
1
                            .insert((response, true));
353
1
                        responses_queue
354
1
                            .pending_serialized_responses_queue
355
1
                            .push_back(pos);
356
1
                        self.inner
357
1
                            .serialized_io
358
1
                            .on_response_pushed_or_task_destroyed
359
1
                            .notify(usize::MAX);
360
1
                        continue;
361
                    }
362
0
                    Err(methods::ParseClientToServerError::UnknownNotification(_)) => continue,
363
                    Err(methods::ParseClientToServerError::JsonRpcParse(_)) => {
364
1
                        let response = parse::build_parse_error_response();
365
1
                        let mut responses_queue =
366
1
                            self.inner.serialized_io.responses_queue.lock().
await0
;
367
1
                        let pos = responses_queue
368
1
                            .pending_serialized_responses
369
1
                            .insert((response, true));
370
1
                        responses_queue
371
1
                            .pending_serialized_responses_queue
372
1
                            .push_back(pos);
373
1
                        self.inner
374
1
                            .serialized_io
375
1
                            .on_response_pushed_or_task_destroyed
376
1
                            .notify(usize::MAX);
377
1
                        continue;
378
                    }
379
                };
380
381
            // There exists three types of requests:
382
            //
383
            // - Requests that follow a simple one-request-one-response schema.
384
            // - Requests that, if accepted, start a subscription.
385
            // - Requests that unsubscribe from a subscription.
386
            //
387
23
            match &parsed_request {
388
                methods::MethodCall::account_nextIndex { .. }
389
                | methods::MethodCall::author_hasKey { .. }
390
                | methods::MethodCall::author_hasSessionKeys { .. }
391
                | methods::MethodCall::author_insertKey { .. }
392
                | methods::MethodCall::author_pendingExtrinsics { .. }
393
                | methods::MethodCall::author_removeExtrinsic { .. }
394
                | methods::MethodCall::author_rotateKeys { .. }
395
                | methods::MethodCall::author_submitExtrinsic { .. }
396
                | methods::MethodCall::babe_epochAuthorship { .. }
397
                | methods::MethodCall::chain_getBlock { .. }
398
                | methods::MethodCall::chain_getBlockHash { .. }
399
                | methods::MethodCall::chain_getFinalizedHead { .. }
400
                | methods::MethodCall::chain_getHeader { .. }
401
                | methods::MethodCall::childstate_getKeys { .. }
402
                | methods::MethodCall::childstate_getStorage { .. }
403
                | methods::MethodCall::childstate_getStorageHash { .. }
404
                | methods::MethodCall::childstate_getStorageSize { .. }
405
                | methods::MethodCall::grandpa_roundState { .. }
406
                | methods::MethodCall::offchain_localStorageGet { .. }
407
                | methods::MethodCall::offchain_localStorageSet { .. }
408
                | methods::MethodCall::payment_queryInfo { .. }
409
                | methods::MethodCall::state_call { .. }
410
                | methods::MethodCall::state_getKeys { .. }
411
                | methods::MethodCall::state_getKeysPaged { .. }
412
                | methods::MethodCall::state_getMetadata { .. }
413
                | methods::MethodCall::state_getPairs { .. }
414
                | methods::MethodCall::state_getReadProof { .. }
415
                | methods::MethodCall::state_getRuntimeVersion { .. }
416
                | methods::MethodCall::state_getStorage { .. }
417
                | methods::MethodCall::state_getStorageHash { .. }
418
                | methods::MethodCall::state_getStorageSize { .. }
419
                | methods::MethodCall::state_queryStorage { .. }
420
                | methods::MethodCall::state_queryStorageAt { .. }
421
                | methods::MethodCall::system_accountNextIndex { .. }
422
                | methods::MethodCall::system_addReservedPeer { .. }
423
                | methods::MethodCall::system_chain { .. }
424
                | methods::MethodCall::system_chainType { .. }
425
                | methods::MethodCall::system_dryRun { .. }
426
                | methods::MethodCall::system_health { .. }
427
                | methods::MethodCall::system_localListenAddresses { .. }
428
                | methods::MethodCall::system_localPeerId { .. }
429
                | methods::MethodCall::system_name { .. }
430
                | methods::MethodCall::system_networkState { .. }
431
                | methods::MethodCall::system_nodeRoles { .. }
432
                | methods::MethodCall::system_peers { .. }
433
                | methods::MethodCall::system_properties { .. }
434
                | methods::MethodCall::system_removeReservedPeer { .. }
435
                | methods::MethodCall::system_version { .. }
436
                | methods::MethodCall::chainSpec_v1_chainName { .. }
437
                | methods::MethodCall::chainSpec_v1_genesisHash { .. }
438
                | methods::MethodCall::chainSpec_v1_properties { .. }
439
                | methods::MethodCall::rpc_methods { .. }
440
                | methods::MethodCall::sudo_unstable_p2pDiscover { .. }
441
                | methods::MethodCall::sudo_unstable_version { .. }
442
                | methods::MethodCall::chainHead_v1_body { .. }
443
                | methods::MethodCall::chainHead_v1_call { .. }
444
                | methods::MethodCall::chainHead_v1_continue { .. }
445
                | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. }
446
                | methods::MethodCall::chainHead_v1_header { .. }
447
                | methods::MethodCall::chainHead_v1_stopOperation { .. }
448
                | methods::MethodCall::chainHead_v1_storage { .. }
449
                | methods::MethodCall::chainHead_v1_unpin { .. } => {
450
                    // Simple one-request-one-response.
451
23
                    return Event::HandleRequest {
452
23
                        request_process: RequestProcess {
453
23
                            responses_notifications_queue: self
454
23
                                .inner
455
23
                                .responses_notifications_queue
456
23
                                .clone(),
457
23
                            request: new_request,
458
23
                            has_sent_response: false,
459
23
                        },
460
23
                        task: self,
461
23
                    };
462
                }
463
464
                methods::MethodCall::author_submitAndWatchExtrinsic { .. }
465
                | methods::MethodCall::chain_subscribeAllHeads { .. }
466
                | methods::MethodCall::chain_subscribeFinalizedHeads { .. }
467
                | methods::MethodCall::chain_subscribeNewHeads { .. }
468
                | methods::MethodCall::state_subscribeRuntimeVersion { .. }
469
                | methods::MethodCall::state_subscribeStorage { .. }
470
                | methods::MethodCall::transaction_v1_broadcast { .. }
471
                | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. }
472
                | methods::MethodCall::sudo_network_unstable_watch { .. }
473
                | methods::MethodCall::chainHead_v1_follow { .. } => {
474
                    // Subscription starting requests.
475
476
                    // We must check the maximum number of subscriptions.
477
0
                    let max_subscriptions =
478
0
                        usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX);
479
0
                    debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions);
480
0
                    if self.inner.active_subscriptions.len() >= max_subscriptions {
481
0
                        let response = parse::build_error_response(
482
0
                            request_id,
483
0
                            ErrorResponse::ServerError(-32000, "Too many active subscriptions"),
484
0
                            None,
485
0
                        );
486
0
                        let mut responses_queue =
487
0
                            self.inner.serialized_io.responses_queue.lock().await;
488
0
                        let pos = responses_queue
489
0
                            .pending_serialized_responses
490
0
                            .insert((response, true));
491
0
                        responses_queue
492
0
                            .pending_serialized_responses_queue
493
0
                            .push_back(pos);
494
0
                        self.inner
495
0
                            .serialized_io
496
0
                            .on_response_pushed_or_task_destroyed
497
0
                            .notify(usize::MAX);
498
0
                        continue;
499
0
                    }
500
0
501
0
                    // Allocate the new subscription ID.
502
0
                    let subscription_id = self.allocate_subscription_id();
503
0
                    debug_assert!(!self
504
0
                        .inner
505
0
                        .active_subscriptions
506
0
                        .contains_key(&subscription_id));
507
508
                    // Insert an "kill channel" in the local state. This kill channel is shared
509
                    // with the subscription object and is used to notify when a subscription
510
                    // should be killed.
511
0
                    let kill_channel = Arc::new(SubscriptionKillChannel {
512
0
                        dead: AtomicBool::new(false),
513
0
                        on_dead_changed: event_listener::Event::new(),
514
0
                    });
515
0
                    self.inner.active_subscriptions.insert(
516
0
                        subscription_id.clone(),
517
0
                        InnerSubscription {
518
0
                            kill_channel: kill_channel.clone(),
519
0
                            unsubscribe_response: None,
520
0
                        },
521
0
                    );
522
0
523
0
                    return Event::HandleSubscriptionStart {
524
0
                        subscription_start: SubscriptionStartProcess {
525
0
                            responses_notifications_queue: self
526
0
                                .inner
527
0
                                .responses_notifications_queue
528
0
                                .clone(),
529
0
                            request: new_request,
530
0
                            kill_channel,
531
0
                            subscription_id,
532
0
                            has_sent_response: false,
533
0
                        },
534
0
                        task: self,
535
0
                    };
536
                }
537
538
0
                methods::MethodCall::author_unwatchExtrinsic { subscription, .. }
539
0
                | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. }
540
0
                | methods::MethodCall::state_unsubscribeStorage { subscription, .. }
541
                | methods::MethodCall::transaction_v1_stop {
542
0
                    operation_id: subscription,
543
                }
544
0
                | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. }
545
0
                | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. }
546
                | methods::MethodCall::chainHead_v1_unfollow {
547
0
                    follow_subscription: subscription,
548
                    ..
549
                } => {
550
                    // TODO: must check whether type of subscription matches
551
0
                    match self.inner.active_subscriptions.get_mut(&**subscription) {
552
                        Some(InnerSubscription {
553
0
                            kill_channel,
554
0
                            unsubscribe_response,
555
0
                        }) if unsubscribe_response.is_none() => {
556
0
                            *unsubscribe_response = Some(
557
0
                                match parsed_request {
558
                                    methods::MethodCall::author_unwatchExtrinsic { .. } => {
559
0
                                        methods::Response::author_unwatchExtrinsic(true)
560
                                    }
561
                                    methods::MethodCall::state_unsubscribeRuntimeVersion {
562
                                        ..
563
0
                                    } => methods::Response::state_unsubscribeRuntimeVersion(true),
564
                                    methods::MethodCall::state_unsubscribeStorage { .. } => {
565
0
                                        methods::Response::state_unsubscribeStorage(true)
566
                                    }
567
                                    methods::MethodCall::transaction_v1_stop { .. } => {
568
0
                                        methods::Response::transaction_v1_stop(())
569
                                    }
570
                                    methods::MethodCall::transactionWatch_v1_unwatch { .. } => {
571
0
                                        methods::Response::transactionWatch_v1_unwatch(())
572
                                    }
573
                                    methods::MethodCall::sudo_network_unstable_unwatch {
574
                                        ..
575
0
                                    } => methods::Response::sudo_network_unstable_unwatch(()),
576
                                    methods::MethodCall::chainHead_v1_unfollow { .. } => {
577
0
                                        methods::Response::chainHead_v1_unfollow(())
578
                                    }
579
0
                                    _ => unreachable!(),
580
                                }
581
0
                                .to_json_response(request_id),
582
0
                            );
583
0
584
0
                            kill_channel.dead.store(true, Ordering::Release);
585
0
                            kill_channel.on_dead_changed.notify(usize::MAX);
586
                        }
587
                        _ => {
588
0
                            let response = match parsed_request {
589
                                methods::MethodCall::author_unwatchExtrinsic { .. } => {
590
0
                                    methods::Response::author_unwatchExtrinsic(false)
591
0
                                        .to_json_response(request_id)
592
                                }
593
                                methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => {
594
0
                                    methods::Response::state_unsubscribeRuntimeVersion(false)
595
0
                                        .to_json_response(request_id)
596
                                }
597
                                methods::MethodCall::state_unsubscribeStorage { .. } => {
598
0
                                    methods::Response::state_unsubscribeStorage(false)
599
0
                                        .to_json_response(request_id)
600
                                }
601
0
                                _ => parse::build_error_response(
602
0
                                    request_id,
603
0
                                    ErrorResponse::InvalidParams,
604
0
                                    None,
605
0
                                ),
606
                            };
607
608
0
                            let mut responses_queue =
609
0
                                self.inner.serialized_io.responses_queue.lock().await;
610
0
                            let pos = responses_queue
611
0
                                .pending_serialized_responses
612
0
                                .insert((response, true));
613
0
                            responses_queue
614
0
                                .pending_serialized_responses_queue
615
0
                                .push_back(pos);
616
0
                            self.inner
617
0
                                .serialized_io
618
0
                                .on_response_pushed_or_task_destroyed
619
0
                                .notify(usize::MAX);
620
                        }
621
                    }
622
                }
623
0
                methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. }
624
0
                | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. }
625
0
                | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => {
626
                    // TODO: DRY with above
627
                    // TODO: must check whether type of subscription matches
628
0
                    match self.inner.active_subscriptions.get_mut(&**subscription) {
629
                        Some(InnerSubscription {
630
0
                            unsubscribe_response,
631
0
                            kill_channel,
632
0
                        }) if unsubscribe_response.is_none() => {
633
0
                            *unsubscribe_response = Some(match parsed_request {
634
                                methods::MethodCall::chain_unsubscribeAllHeads { .. } => {
635
0
                                    methods::Response::chain_unsubscribeAllHeads(true)
636
0
                                        .to_json_response(request_id)
637
                                }
638
                                methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => {
639
0
                                    methods::Response::chain_unsubscribeFinalizedHeads(true)
640
0
                                        .to_json_response(request_id)
641
                                }
642
                                methods::MethodCall::chain_unsubscribeNewHeads { .. } => {
643
0
                                    methods::Response::chain_unsubscribeNewHeads(true)
644
0
                                        .to_json_response(request_id)
645
                                }
646
0
                                _ => unreachable!(),
647
                            });
648
649
0
                            kill_channel.dead.store(true, Ordering::Release);
650
0
                            kill_channel.on_dead_changed.notify(usize::MAX);
651
                        }
652
                        _ => {
653
0
                            let response = match parsed_request {
654
                                methods::MethodCall::chain_unsubscribeAllHeads { .. } => {
655
0
                                    methods::Response::chain_unsubscribeAllHeads(false)
656
0
                                        .to_json_response(request_id)
657
                                }
658
                                methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => {
659
0
                                    methods::Response::chain_unsubscribeFinalizedHeads(false)
660
0
                                        .to_json_response(request_id)
661
                                }
662
                                methods::MethodCall::chain_unsubscribeNewHeads { .. } => {
663
0
                                    methods::Response::chain_unsubscribeNewHeads(false)
664
0
                                        .to_json_response(request_id)
665
                                }
666
0
                                _ => unreachable!(),
667
                            };
668
669
0
                            let mut responses_queue =
670
0
                                self.inner.serialized_io.responses_queue.lock().await;
671
0
                            let pos = responses_queue
672
0
                                .pending_serialized_responses
673
0
                                .insert((response, true));
674
0
                            responses_queue
675
0
                                .pending_serialized_responses_queue
676
0
                                .push_back(pos);
677
0
                            self.inner
678
0
                                .serialized_io
679
0
                                .on_response_pushed_or_task_destroyed
680
0
                                .notify(usize::MAX);
681
                        }
682
                    }
683
                }
684
            }
685
        }
686
44
    }
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0Ba_
_RNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTask15run_until_event0CsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
216
44
    pub async fn run_until_event(mut self) -> Event {
217
        loop {
218
            enum WakeUpReason {
219
                NewRequest(String),
220
                Message(ToMainTask),
221
            }
222
223
48
            let wake_up_reason = {
224
69
                let serialized_requests_io_destroyed = async {
225
                    (&mut self.inner.on_serialized_requests_io_destroyed).await;
226
                    Err(())
227
                };
228
229
69
                let next_serialized_request = async {
230
                    let mut wait = None;
231
                    loop {
232
                        if let Some(elem) = self.inner.serialized_io.requests_queue.pop() {
233
                            self.inner
234
                                .serialized_io
235
                                .on_request_pulled_or_task_destroyed
236
                                .notify(usize::MAX);
237
                            break Ok(WakeUpReason::NewRequest(elem));
238
                        }
239
                        if let Some(wait) = wait.take() {
240
                            wait.await
241
                        } else {
242
                            wait = Some(self.inner.serialized_io.on_request_pushed.listen());
243
                        }
244
                    }
245
                };
246
247
69
                let response_notif = async {
248
                    let mut wait = None;
249
                    loop {
250
                        if let Some(elem) = self.inner.responses_notifications_queue.queue.pop() {
251
                            break Ok(WakeUpReason::Message(elem));
252
                        }
253
                        if let Some(wait) = wait.take() {
254
                            wait.await
255
                        } else {
256
                            wait =
257
                                Some(self.inner.responses_notifications_queue.on_pushed.listen());
258
                        }
259
                    }
260
                };
261
262
69
                match serialized_requests_io_destroyed
263
69
                    .or(next_serialized_request)
264
69
                    .or(response_notif)
265
65
                    .await
266
                {
267
48
                    Ok(wake_up_reason) => wake_up_reason,
268
21
                    Err(()) => return Event::SerializedRequestsIoClosed,
269
                }
270
            };
271
272
            // Immediately handle every event apart from `NewRequest`.
273
25
            let new_request = match 
wake_up_reason23
{
274
25
                WakeUpReason::NewRequest(request) => request,
275
0
                WakeUpReason::Message(ToMainTask::SubscriptionDestroyed { subscription_id }) => {
276
0
                    let InnerSubscription {
277
0
                        unsubscribe_response,
278
0
                        ..
279
0
                    } = self
280
0
                        .inner
281
0
                        .active_subscriptions
282
0
                        .remove(&subscription_id)
283
0
                        .unwrap();
284
                    // TODO: post a `stop`/`error` event for chainhead subscriptions
285
0
                    if let Some(unsubscribe_response) = unsubscribe_response {
286
0
                        let mut responses_queue =
287
0
                            self.inner.serialized_io.responses_queue.lock().await;
288
0
                        let pos = responses_queue
289
0
                            .pending_serialized_responses
290
0
                            .insert((unsubscribe_response, true));
291
0
                        responses_queue
292
0
                            .pending_serialized_responses_queue
293
0
                            .push_back(pos);
294
0
                        self.inner
295
0
                            .serialized_io
296
0
                            .on_response_pushed_or_task_destroyed
297
0
                            .notify(usize::MAX);
298
0
                    }
299
300
                    // Shrink the list of active subscriptions if necessary.
301
0
                    if self.inner.active_subscriptions.capacity()
302
0
                        >= 2 * self.inner.active_subscriptions.len() + 16
303
0
                    {
304
0
                        self.inner.active_subscriptions.shrink_to_fit();
305
0
                    }
306
307
0
                    return Event::SubscriptionDestroyed {
308
0
                        task: self,
309
0
                        subscription_id,
310
0
                    };
311
                }
312
23
                WakeUpReason::Message(ToMainTask::RequestResponse(response)) => {
313
23
                    let mut responses_queue = self.inner.serialized_io.responses_queue.lock().
await0
;
314
23
                    let pos = responses_queue
315
23
                        .pending_serialized_responses
316
23
                        .insert((response, true));
317
23
                    responses_queue
318
23
                        .pending_serialized_responses_queue
319
23
                        .push_back(pos);
320
23
                    self.inner
321
23
                        .serialized_io
322
23
                        .on_response_pushed_or_task_destroyed
323
23
                        .notify(usize::MAX);
324
23
                    continue;
325
                }
326
0
                WakeUpReason::Message(ToMainTask::Notification(notification)) => {
327
                    // TODO: filter out redundant notifications, as it's the entire point of this module
328
0
                    let mut responses_queue = self.inner.serialized_io.responses_queue.lock().await;
329
0
                    let pos = responses_queue
330
0
                        .pending_serialized_responses
331
0
                        .insert((notification, false));
332
0
                    responses_queue
333
0
                        .pending_serialized_responses_queue
334
0
                        .push_back(pos);
335
0
                    self.inner
336
0
                        .serialized_io
337
0
                        .on_response_pushed_or_task_destroyed
338
0
                        .notify(usize::MAX);
339
0
                    continue;
340
                }
341
            };
342
343
23
            let (request_id, parsed_request) =
344
25
                match methods::parse_jsonrpc_client_to_server(&new_request) {
345
23
                    Ok((request_id, method)) => (request_id, method),
346
1
                    Err(methods::ParseClientToServerError::Method { request_id, error }) => {
347
1
                        let response = error.to_json_error(request_id);
348
1
                        let mut responses_queue =
349
1
                            self.inner.serialized_io.responses_queue.lock().
await0
;
350
1
                        let pos = responses_queue
351
1
                            .pending_serialized_responses
352
1
                            .insert((response, true));
353
1
                        responses_queue
354
1
                            .pending_serialized_responses_queue
355
1
                            .push_back(pos);
356
1
                        self.inner
357
1
                            .serialized_io
358
1
                            .on_response_pushed_or_task_destroyed
359
1
                            .notify(usize::MAX);
360
1
                        continue;
361
                    }
362
0
                    Err(methods::ParseClientToServerError::UnknownNotification(_)) => continue,
363
                    Err(methods::ParseClientToServerError::JsonRpcParse(_)) => {
364
1
                        let response = parse::build_parse_error_response();
365
1
                        let mut responses_queue =
366
1
                            self.inner.serialized_io.responses_queue.lock().
await0
;
367
1
                        let pos = responses_queue
368
1
                            .pending_serialized_responses
369
1
                            .insert((response, true));
370
1
                        responses_queue
371
1
                            .pending_serialized_responses_queue
372
1
                            .push_back(pos);
373
1
                        self.inner
374
1
                            .serialized_io
375
1
                            .on_response_pushed_or_task_destroyed
376
1
                            .notify(usize::MAX);
377
1
                        continue;
378
                    }
379
                };
380
381
            // There exists three types of requests:
382
            //
383
            // - Requests that follow a simple one-request-one-response schema.
384
            // - Requests that, if accepted, start a subscription.
385
            // - Requests that unsubscribe from a subscription.
386
            //
387
23
            match &parsed_request {
388
                methods::MethodCall::account_nextIndex { .. }
389
                | methods::MethodCall::author_hasKey { .. }
390
                | methods::MethodCall::author_hasSessionKeys { .. }
391
                | methods::MethodCall::author_insertKey { .. }
392
                | methods::MethodCall::author_pendingExtrinsics { .. }
393
                | methods::MethodCall::author_removeExtrinsic { .. }
394
                | methods::MethodCall::author_rotateKeys { .. }
395
                | methods::MethodCall::author_submitExtrinsic { .. }
396
                | methods::MethodCall::babe_epochAuthorship { .. }
397
                | methods::MethodCall::chain_getBlock { .. }
398
                | methods::MethodCall::chain_getBlockHash { .. }
399
                | methods::MethodCall::chain_getFinalizedHead { .. }
400
                | methods::MethodCall::chain_getHeader { .. }
401
                | methods::MethodCall::childstate_getKeys { .. }
402
                | methods::MethodCall::childstate_getStorage { .. }
403
                | methods::MethodCall::childstate_getStorageHash { .. }
404
                | methods::MethodCall::childstate_getStorageSize { .. }
405
                | methods::MethodCall::grandpa_roundState { .. }
406
                | methods::MethodCall::offchain_localStorageGet { .. }
407
                | methods::MethodCall::offchain_localStorageSet { .. }
408
                | methods::MethodCall::payment_queryInfo { .. }
409
                | methods::MethodCall::state_call { .. }
410
                | methods::MethodCall::state_getKeys { .. }
411
                | methods::MethodCall::state_getKeysPaged { .. }
412
                | methods::MethodCall::state_getMetadata { .. }
413
                | methods::MethodCall::state_getPairs { .. }
414
                | methods::MethodCall::state_getReadProof { .. }
415
                | methods::MethodCall::state_getRuntimeVersion { .. }
416
                | methods::MethodCall::state_getStorage { .. }
417
                | methods::MethodCall::state_getStorageHash { .. }
418
                | methods::MethodCall::state_getStorageSize { .. }
419
                | methods::MethodCall::state_queryStorage { .. }
420
                | methods::MethodCall::state_queryStorageAt { .. }
421
                | methods::MethodCall::system_accountNextIndex { .. }
422
                | methods::MethodCall::system_addReservedPeer { .. }
423
                | methods::MethodCall::system_chain { .. }
424
                | methods::MethodCall::system_chainType { .. }
425
                | methods::MethodCall::system_dryRun { .. }
426
                | methods::MethodCall::system_health { .. }
427
                | methods::MethodCall::system_localListenAddresses { .. }
428
                | methods::MethodCall::system_localPeerId { .. }
429
                | methods::MethodCall::system_name { .. }
430
                | methods::MethodCall::system_networkState { .. }
431
                | methods::MethodCall::system_nodeRoles { .. }
432
                | methods::MethodCall::system_peers { .. }
433
                | methods::MethodCall::system_properties { .. }
434
                | methods::MethodCall::system_removeReservedPeer { .. }
435
                | methods::MethodCall::system_version { .. }
436
                | methods::MethodCall::chainSpec_v1_chainName { .. }
437
                | methods::MethodCall::chainSpec_v1_genesisHash { .. }
438
                | methods::MethodCall::chainSpec_v1_properties { .. }
439
                | methods::MethodCall::rpc_methods { .. }
440
                | methods::MethodCall::sudo_unstable_p2pDiscover { .. }
441
                | methods::MethodCall::sudo_unstable_version { .. }
442
                | methods::MethodCall::chainHead_v1_body { .. }
443
                | methods::MethodCall::chainHead_v1_call { .. }
444
                | methods::MethodCall::chainHead_v1_continue { .. }
445
                | methods::MethodCall::chainHead_unstable_finalizedDatabase { .. }
446
                | methods::MethodCall::chainHead_v1_header { .. }
447
                | methods::MethodCall::chainHead_v1_stopOperation { .. }
448
                | methods::MethodCall::chainHead_v1_storage { .. }
449
                | methods::MethodCall::chainHead_v1_unpin { .. } => {
450
                    // Simple one-request-one-response.
451
23
                    return Event::HandleRequest {
452
23
                        request_process: RequestProcess {
453
23
                            responses_notifications_queue: self
454
23
                                .inner
455
23
                                .responses_notifications_queue
456
23
                                .clone(),
457
23
                            request: new_request,
458
23
                            has_sent_response: false,
459
23
                        },
460
23
                        task: self,
461
23
                    };
462
                }
463
464
                methods::MethodCall::author_submitAndWatchExtrinsic { .. }
465
                | methods::MethodCall::chain_subscribeAllHeads { .. }
466
                | methods::MethodCall::chain_subscribeFinalizedHeads { .. }
467
                | methods::MethodCall::chain_subscribeNewHeads { .. }
468
                | methods::MethodCall::state_subscribeRuntimeVersion { .. }
469
                | methods::MethodCall::state_subscribeStorage { .. }
470
                | methods::MethodCall::transaction_v1_broadcast { .. }
471
                | methods::MethodCall::transactionWatch_v1_submitAndWatch { .. }
472
                | methods::MethodCall::sudo_network_unstable_watch { .. }
473
                | methods::MethodCall::chainHead_v1_follow { .. } => {
474
                    // Subscription starting requests.
475
476
                    // We must check the maximum number of subscriptions.
477
0
                    let max_subscriptions =
478
0
                        usize::try_from(self.inner.max_active_subscriptions).unwrap_or(usize::MAX);
479
0
                    debug_assert!(self.inner.active_subscriptions.len() <= max_subscriptions);
480
0
                    if self.inner.active_subscriptions.len() >= max_subscriptions {
481
0
                        let response = parse::build_error_response(
482
0
                            request_id,
483
0
                            ErrorResponse::ServerError(-32000, "Too many active subscriptions"),
484
0
                            None,
485
0
                        );
486
0
                        let mut responses_queue =
487
0
                            self.inner.serialized_io.responses_queue.lock().await;
488
0
                        let pos = responses_queue
489
0
                            .pending_serialized_responses
490
0
                            .insert((response, true));
491
0
                        responses_queue
492
0
                            .pending_serialized_responses_queue
493
0
                            .push_back(pos);
494
0
                        self.inner
495
0
                            .serialized_io
496
0
                            .on_response_pushed_or_task_destroyed
497
0
                            .notify(usize::MAX);
498
0
                        continue;
499
0
                    }
500
0
501
0
                    // Allocate the new subscription ID.
502
0
                    let subscription_id = self.allocate_subscription_id();
503
0
                    debug_assert!(!self
504
0
                        .inner
505
0
                        .active_subscriptions
506
0
                        .contains_key(&subscription_id));
507
508
                    // Insert an "kill channel" in the local state. This kill channel is shared
509
                    // with the subscription object and is used to notify when a subscription
510
                    // should be killed.
511
0
                    let kill_channel = Arc::new(SubscriptionKillChannel {
512
0
                        dead: AtomicBool::new(false),
513
0
                        on_dead_changed: event_listener::Event::new(),
514
0
                    });
515
0
                    self.inner.active_subscriptions.insert(
516
0
                        subscription_id.clone(),
517
0
                        InnerSubscription {
518
0
                            kill_channel: kill_channel.clone(),
519
0
                            unsubscribe_response: None,
520
0
                        },
521
0
                    );
522
0
523
0
                    return Event::HandleSubscriptionStart {
524
0
                        subscription_start: SubscriptionStartProcess {
525
0
                            responses_notifications_queue: self
526
0
                                .inner
527
0
                                .responses_notifications_queue
528
0
                                .clone(),
529
0
                            request: new_request,
530
0
                            kill_channel,
531
0
                            subscription_id,
532
0
                            has_sent_response: false,
533
0
                        },
534
0
                        task: self,
535
0
                    };
536
                }
537
538
0
                methods::MethodCall::author_unwatchExtrinsic { subscription, .. }
539
0
                | methods::MethodCall::state_unsubscribeRuntimeVersion { subscription, .. }
540
0
                | methods::MethodCall::state_unsubscribeStorage { subscription, .. }
541
                | methods::MethodCall::transaction_v1_stop {
542
0
                    operation_id: subscription,
543
                }
544
0
                | methods::MethodCall::transactionWatch_v1_unwatch { subscription, .. }
545
0
                | methods::MethodCall::sudo_network_unstable_unwatch { subscription, .. }
546
                | methods::MethodCall::chainHead_v1_unfollow {
547
0
                    follow_subscription: subscription,
548
                    ..
549
                } => {
550
                    // TODO: must check whether type of subscription matches
551
0
                    match self.inner.active_subscriptions.get_mut(&**subscription) {
552
                        Some(InnerSubscription {
553
0
                            kill_channel,
554
0
                            unsubscribe_response,
555
0
                        }) if unsubscribe_response.is_none() => {
556
0
                            *unsubscribe_response = Some(
557
0
                                match parsed_request {
558
                                    methods::MethodCall::author_unwatchExtrinsic { .. } => {
559
0
                                        methods::Response::author_unwatchExtrinsic(true)
560
                                    }
561
                                    methods::MethodCall::state_unsubscribeRuntimeVersion {
562
                                        ..
563
0
                                    } => methods::Response::state_unsubscribeRuntimeVersion(true),
564
                                    methods::MethodCall::state_unsubscribeStorage { .. } => {
565
0
                                        methods::Response::state_unsubscribeStorage(true)
566
                                    }
567
                                    methods::MethodCall::transaction_v1_stop { .. } => {
568
0
                                        methods::Response::transaction_v1_stop(())
569
                                    }
570
                                    methods::MethodCall::transactionWatch_v1_unwatch { .. } => {
571
0
                                        methods::Response::transactionWatch_v1_unwatch(())
572
                                    }
573
                                    methods::MethodCall::sudo_network_unstable_unwatch {
574
                                        ..
575
0
                                    } => methods::Response::sudo_network_unstable_unwatch(()),
576
                                    methods::MethodCall::chainHead_v1_unfollow { .. } => {
577
0
                                        methods::Response::chainHead_v1_unfollow(())
578
                                    }
579
0
                                    _ => unreachable!(),
580
                                }
581
0
                                .to_json_response(request_id),
582
0
                            );
583
0
584
0
                            kill_channel.dead.store(true, Ordering::Release);
585
0
                            kill_channel.on_dead_changed.notify(usize::MAX);
586
                        }
587
                        _ => {
588
0
                            let response = match parsed_request {
589
                                methods::MethodCall::author_unwatchExtrinsic { .. } => {
590
0
                                    methods::Response::author_unwatchExtrinsic(false)
591
0
                                        .to_json_response(request_id)
592
                                }
593
                                methods::MethodCall::state_unsubscribeRuntimeVersion { .. } => {
594
0
                                    methods::Response::state_unsubscribeRuntimeVersion(false)
595
0
                                        .to_json_response(request_id)
596
                                }
597
                                methods::MethodCall::state_unsubscribeStorage { .. } => {
598
0
                                    methods::Response::state_unsubscribeStorage(false)
599
0
                                        .to_json_response(request_id)
600
                                }
601
0
                                _ => parse::build_error_response(
602
0
                                    request_id,
603
0
                                    ErrorResponse::InvalidParams,
604
0
                                    None,
605
0
                                ),
606
                            };
607
608
0
                            let mut responses_queue =
609
0
                                self.inner.serialized_io.responses_queue.lock().await;
610
0
                            let pos = responses_queue
611
0
                                .pending_serialized_responses
612
0
                                .insert((response, true));
613
0
                            responses_queue
614
0
                                .pending_serialized_responses_queue
615
0
                                .push_back(pos);
616
0
                            self.inner
617
0
                                .serialized_io
618
0
                                .on_response_pushed_or_task_destroyed
619
0
                                .notify(usize::MAX);
620
                        }
621
                    }
622
                }
623
0
                methods::MethodCall::chain_unsubscribeAllHeads { subscription, .. }
624
0
                | methods::MethodCall::chain_unsubscribeFinalizedHeads { subscription, .. }
625
0
                | methods::MethodCall::chain_unsubscribeNewHeads { subscription, .. } => {
626
                    // TODO: DRY with above
627
                    // TODO: must check whether type of subscription matches
628
0
                    match self.inner.active_subscriptions.get_mut(&**subscription) {
629
                        Some(InnerSubscription {
630
0
                            unsubscribe_response,
631
0
                            kill_channel,
632
0
                        }) if unsubscribe_response.is_none() => {
633
0
                            *unsubscribe_response = Some(match parsed_request {
634
                                methods::MethodCall::chain_unsubscribeAllHeads { .. } => {
635
0
                                    methods::Response::chain_unsubscribeAllHeads(true)
636
0
                                        .to_json_response(request_id)
637
                                }
638
                                methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => {
639
0
                                    methods::Response::chain_unsubscribeFinalizedHeads(true)
640
0
                                        .to_json_response(request_id)
641
                                }
642
                                methods::MethodCall::chain_unsubscribeNewHeads { .. } => {
643
0
                                    methods::Response::chain_unsubscribeNewHeads(true)
644
0
                                        .to_json_response(request_id)
645
                                }
646
0
                                _ => unreachable!(),
647
                            });
648
649
0
                            kill_channel.dead.store(true, Ordering::Release);
650
0
                            kill_channel.on_dead_changed.notify(usize::MAX);
651
                        }
652
                        _ => {
653
0
                            let response = match parsed_request {
654
                                methods::MethodCall::chain_unsubscribeAllHeads { .. } => {
655
0
                                    methods::Response::chain_unsubscribeAllHeads(false)
656
0
                                        .to_json_response(request_id)
657
                                }
658
                                methods::MethodCall::chain_unsubscribeFinalizedHeads { .. } => {
659
0
                                    methods::Response::chain_unsubscribeFinalizedHeads(false)
660
0
                                        .to_json_response(request_id)
661
                                }
662
                                methods::MethodCall::chain_unsubscribeNewHeads { .. } => {
663
0
                                    methods::Response::chain_unsubscribeNewHeads(false)
664
0
                                        .to_json_response(request_id)
665
                                }
666
0
                                _ => unreachable!(),
667
                            };
668
669
0
                            let mut responses_queue =
670
0
                                self.inner.serialized_io.responses_queue.lock().await;
671
0
                            let pos = responses_queue
672
0
                                .pending_serialized_responses
673
0
                                .insert((response, true));
674
0
                            responses_queue
675
0
                                .pending_serialized_responses_queue
676
0
                                .push_back(pos);
677
0
                            self.inner
678
0
                                .serialized_io
679
0
                                .on_response_pushed_or_task_destroyed
680
0
                                .notify(usize::MAX);
681
                        }
682
                    }
683
                }
684
            }
685
        }
686
44
    }
687
688
0
    fn allocate_subscription_id(&mut self) -> String {
689
0
        let subscription_id = self.inner.next_subscription_id.to_string();
690
0
        self.inner.next_subscription_id += 1;
691
0
        subscription_id
692
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB2_14ClientMainTask24allocate_subscription_id
693
}
694
695
impl fmt::Debug for ClientMainTask {
696
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
697
0
        f.debug_tuple("ClientMainTask").finish()
698
0
    }
Unexecuted instantiation: _RNvXs_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXs_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB4_14ClientMainTaskNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
699
}
700
701
impl Drop for ClientMainTask {
702
21
    fn drop(&mut self) {
703
21
        // Notify the `SerializedRequestsIo`.
704
21
        self.inner
705
21
            .serialized_io
706
21
            .on_response_pushed_or_task_destroyed
707
21
            .notify(usize::MAX);
708
21
        self.inner
709
21
            .serialized_io
710
21
            .on_request_pulled_or_task_destroyed
711
21
            .notify(usize::MAX);
712
713
        // Mark all active subscriptions as dead.
714
21
        for (_, InnerSubscription { 
kill_channel0
, .. }) in self.inner.active_subscriptions.drain() {
715
0
            kill_channel.dead.store(true, Ordering::Release);
716
0
            kill_channel.on_dead_changed.notify(usize::MAX);
717
0
        }
718
21
    }
Unexecuted instantiation: _RNvXs0_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
_RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14ClientMainTaskNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Line
Count
Source
702
21
    fn drop(&mut self) {
703
21
        // Notify the `SerializedRequestsIo`.
704
21
        self.inner
705
21
            .serialized_io
706
21
            .on_response_pushed_or_task_destroyed
707
21
            .notify(usize::MAX);
708
21
        self.inner
709
21
            .serialized_io
710
21
            .on_request_pulled_or_task_destroyed
711
21
            .notify(usize::MAX);
712
713
        // Mark all active subscriptions as dead.
714
21
        for (_, InnerSubscription { 
kill_channel0
, .. }) in self.inner.active_subscriptions.drain() {
715
0
            kill_channel.dead.store(true, Ordering::Release);
716
0
            kill_channel.on_dead_changed.notify(usize::MAX);
717
0
        }
718
21
    }
719
}
720
721
/// Outcome of the processing of [`ClientMainTask::run_until_event`].
722
#[derive(Debug)]
723
pub enum Event {
724
    /// JSON-RPC client has sent a plain request (i.e. that isn't related to subscriptions).
725
    HandleRequest {
726
        /// The task that generated the event.
727
        task: ClientMainTask,
728
        /// Object connected to the [`ClientMainTask`] and containing the information about the
729
        /// request to process.
730
        request_process: RequestProcess,
731
    },
732
733
    /// JSON-RPC client desires starting a new subscription.
734
    ///
735
    /// Note that the [`ClientMainTask`] automatically enforces a limit to the maximum number of
736
    /// subscriptions. If this event is generated, this check has already passed.
737
    HandleSubscriptionStart {
738
        /// The task that generated the event.
739
        task: ClientMainTask,
740
        /// Object connected to the [`ClientMainTask`] and containing the information about the
741
        /// request to process.
742
        subscription_start: SubscriptionStartProcess,
743
    },
744
745
    /// A [`SubscriptionStartProcess`] object or a [`Subscription`] object has been destroyed.
746
    SubscriptionDestroyed {
747
        /// The task that generated the event.
748
        task: ClientMainTask,
749
        /// Id of the subscription that was destroyed. Equals to the value that
750
        /// [`Subscription::subscription_id`] would have returned for the now-dead subscription.
751
        subscription_id: String,
752
    },
753
754
    /// The [`SerializedRequestsIo`] has been dropped. The [`ClientMainTask`] has been destroyed.
755
    SerializedRequestsIoClosed,
756
}
757
758
/// Object connected to the [`ClientMainTask`] that allows sending requests to the task and
759
/// receiving responses.
760
pub struct SerializedRequestsIo {
761
    serialized_io: Weak<SerializedIo>,
762
763
    /// Event notified after the [`SerializedRequestsIo`] is destroyed.
764
    on_serialized_requests_io_destroyed: event_listener::Event,
765
}
766
767
impl SerializedRequestsIo {
768
    /// Waits for a response or a notification to send to the JSON-RPC client to be available,
769
    /// and returns it.
770
    ///
771
    /// Returns `None` if the [`ClientMainTask`] has been destroyed.
772
    ///
773
    /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to
774
    /// >           this function, otherwise it might never return.
775
25
    
pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> 0
{
Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response
Unexecuted instantiation: _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo18wait_next_response
776
25
        let mut wait = None;
777
778
        loop {
779
53
            let Some(queue) = self.serialized_io.upgrade() else {
780
0
                return Err(WaitNextResponseError::ClientMainTaskDestroyed);
781
            };
782
783
            // Lock the responses queue.
784
            {
785
53
                let mut responses_queue = queue.responses_queue.lock().
await0
;
786
787
53
                if let Some(
response_index25
) = responses_queue
788
53
                    .pending_serialized_responses_queue
789
53
                    .pop_front()
790
                {
791
25
                    let (response_or_notif, is_response) = responses_queue
792
25
                        .pending_serialized_responses
793
25
                        .remove(response_index);
794
25
795
25
                    if is_response {
796
25
                        let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release);
797
25
                        debug_assert_ne!(_prev_val, u32::MAX); // Check underflows.
798
0
                    }
799
800
                    // Shrink containers if necessary in order to reduce memory usage after a
801
                    // burst of requests.
802
25
                    if responses_queue.pending_serialized_responses.capacity()
803
25
                        > responses_queue
804
25
                            .pending_serialized_responses
805
25
                            .len()
806
25
                            .saturating_mul(4)
807
25
                    {
808
25
                        responses_queue.pending_serialized_responses.shrink_to_fit();
809
25
                    }
0
810
25
                    if responses_queue
811
25
                        .pending_serialized_responses_queue
812
25
                        .capacity()
813
25
                        > responses_queue
814
25
                            .pending_serialized_responses_queue
815
25
                            .len()
816
25
                            .saturating_mul(4)
817
25
                    {
818
25
                        responses_queue
819
25
                            .pending_serialized_responses_queue
820
25
                            .shrink_to_fit();
821
25
                    }
0
822
823
25
                    return Ok(response_or_notif);
824
28
                }
825
            }
826
827
28
            if let Some(
wait14
) = wait.take() {
828
14
                wait.await
829
14
            } else {
830
14
                wait = Some(queue.on_response_pushed_or_task_destroyed.listen());
831
14
            }
832
        }
833
25
    }
Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0Bd_
_RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
775
2
    pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> {
776
2
        let mut wait = None;
777
778
        loop {
779
4
            let Some(queue) = self.serialized_io.upgrade() else {
780
0
                return Err(WaitNextResponseError::ClientMainTaskDestroyed);
781
            };
782
783
            // Lock the responses queue.
784
            {
785
4
                let mut responses_queue = queue.responses_queue.lock().
await0
;
786
787
4
                if let Some(
response_index2
) = responses_queue
788
4
                    .pending_serialized_responses_queue
789
4
                    .pop_front()
790
                {
791
2
                    let (response_or_notif, is_response) = responses_queue
792
2
                        .pending_serialized_responses
793
2
                        .remove(response_index);
794
2
795
2
                    if is_response {
796
2
                        let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release);
797
2
                        debug_assert_ne!(_prev_val, u32::MAX); // Check underflows.
798
0
                    }
799
800
                    // Shrink containers if necessary in order to reduce memory usage after a
801
                    // burst of requests.
802
2
                    if responses_queue.pending_serialized_responses.capacity()
803
2
                        > responses_queue
804
2
                            .pending_serialized_responses
805
2
                            .len()
806
2
                            .saturating_mul(4)
807
2
                    {
808
2
                        responses_queue.pending_serialized_responses.shrink_to_fit();
809
2
                    }
0
810
2
                    if responses_queue
811
2
                        .pending_serialized_responses_queue
812
2
                        .capacity()
813
2
                        > responses_queue
814
2
                            .pending_serialized_responses_queue
815
2
                            .len()
816
2
                            .saturating_mul(4)
817
2
                    {
818
2
                        responses_queue
819
2
                            .pending_serialized_responses_queue
820
2
                            .shrink_to_fit();
821
2
                    }
0
822
823
2
                    return Ok(response_or_notif);
824
2
                }
825
            }
826
827
2
            if let Some(
wait1
) = wait.take() {
828
1
                wait.await
829
1
            } else {
830
1
                wait = Some(queue.on_response_pushed_or_task_destroyed.listen());
831
1
            }
832
        }
833
2
    }
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CscDgN54JpMGG_6author
_RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo18wait_next_response0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
775
23
    pub async fn wait_next_response(&self) -> Result<String, WaitNextResponseError> {
776
23
        let mut wait = None;
777
778
        loop {
779
49
            let Some(queue) = self.serialized_io.upgrade() else {
780
0
                return Err(WaitNextResponseError::ClientMainTaskDestroyed);
781
            };
782
783
            // Lock the responses queue.
784
            {
785
49
                let mut responses_queue = queue.responses_queue.lock().
await0
;
786
787
49
                if let Some(
response_index23
) = responses_queue
788
49
                    .pending_serialized_responses_queue
789
49
                    .pop_front()
790
                {
791
23
                    let (response_or_notif, is_response) = responses_queue
792
23
                        .pending_serialized_responses
793
23
                        .remove(response_index);
794
23
795
23
                    if is_response {
796
23
                        let _prev_val = queue.num_requests_in_fly.fetch_sub(1, Ordering::Release);
797
23
                        debug_assert_ne!(_prev_val, u32::MAX); // Check underflows.
798
0
                    }
799
800
                    // Shrink containers if necessary in order to reduce memory usage after a
801
                    // burst of requests.
802
23
                    if responses_queue.pending_serialized_responses.capacity()
803
23
                        > responses_queue
804
23
                            .pending_serialized_responses
805
23
                            .len()
806
23
                            .saturating_mul(4)
807
23
                    {
808
23
                        responses_queue.pending_serialized_responses.shrink_to_fit();
809
23
                    }
0
810
23
                    if responses_queue
811
23
                        .pending_serialized_responses_queue
812
23
                        .capacity()
813
23
                        > responses_queue
814
23
                            .pending_serialized_responses_queue
815
23
                            .len()
816
23
                            .saturating_mul(4)
817
23
                    {
818
23
                        responses_queue
819
23
                            .pending_serialized_responses_queue
820
23
                            .shrink_to_fit();
821
23
                    }
0
822
823
23
                    return Ok(response_or_notif);
824
26
                }
825
            }
826
827
26
            if let Some(
wait13
) = wait.take() {
828
13
                wait.await
829
13
            } else {
830
13
                wait = Some(queue.on_response_pushed_or_task_destroyed.listen());
831
13
            }
832
        }
833
23
    }
834
835
    /// Adds a JSON-RPC request to the queue of requests of the [`ClientMainTask`]. Waits if the
836
    /// queue is full.
837
    ///
838
    /// This might cause a call to [`ClientMainTask::run_until_event`] to return
839
    /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`].
840
0
    pub async fn send_request(&self, request: String) -> Result<(), SendRequestError> {
Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request
Unexecuted instantiation: _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo12send_request
841
0
        // Wait until it is possible to increment `num_requests_in_fly`.
842
0
        let mut wait = None;
843
0
        let queue = loop {
844
0
            let Some(queue) = self.serialized_io.upgrade() else {
845
0
                return Err(SendRequestError {
846
0
                    request,
847
0
                    cause: SendRequestErrorCause::ClientMainTaskDestroyed,
848
0
                });
849
            };
850
851
0
            if queue
852
0
                .num_requests_in_fly
853
0
                .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| {
854
0
                    if old_value < queue.max_requests_in_fly.get() {
855
                        // Considering that `old_value < max`, and `max` fits in a `u32` by
856
                        // definition, then `old_value + 1` also always fits in a `u32`. QED.
857
                        // There's no risk of overflow.
858
0
                        Some(old_value + 1)
859
                    } else {
860
0
                        None
861
                    }
862
0
                })
Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_
Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00Bf_
Unexecuted instantiation: _RNCNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB9_20SerializedRequestsIo12send_request00CsiUjFBJteJ7x_17smoldot_full_node
863
0
                .is_ok()
864
            {
865
0
                break queue;
866
0
            }
867
868
0
            if let Some(wait) = wait.take() {
869
0
                wait.await;
870
0
            } else {
871
0
                wait = Some(queue.on_request_pulled_or_task_destroyed.listen());
872
0
            }
873
        };
874
875
        // Everything successful.
876
0
        queue.requests_queue.push(request);
877
0
        queue.on_request_pushed.notify(usize::MAX);
878
0
        Ok(())
879
0
    }
Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0Bd_
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo12send_request0CsiUjFBJteJ7x_17smoldot_full_node
880
881
    /// Tries to add a JSON-RPC request to the queue of requests of the [`ClientMainTask`].
882
    ///
883
    /// This might cause a call to [`ClientMainTask::run_until_event`] to return
884
    /// [`Event::HandleRequest`] or [`Event::HandleSubscriptionStart`].
885
25
    pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> {
886
25
        let Some(queue) = self.serialized_io.upgrade() else {
887
0
            return Err(TrySendRequestError {
888
0
                request,
889
0
                cause: TrySendRequestErrorCause::ClientMainTaskDestroyed,
890
0
            });
891
        };
892
893
        // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum.
894
25
        if queue
895
25
            .num_requests_in_fly
896
25
            .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| {
897
25
                if old_value < queue.max_requests_in_fly.get() {
898
                    // Considering that `old_value < max`, and `max` fits in a `u32` by
899
                    // definition, then `old_value + 1` also always fits in a `u32`. QED.
900
                    // There's no risk of overflow.
901
25
                    Some(old_value + 1)
902
                } else {
903
0
                    None
904
                }
905
25
            })
Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_
_RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_20SerializedRequestsIo16try_send_request0Bd_
Line
Count
Source
896
25
            .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| {
897
25
                if old_value < queue.max_requests_in_fly.get() {
898
                    // Considering that `old_value < max`, and `max` fits in a `u32` by
899
                    // definition, then `old_value + 1` also always fits in a `u32`. QED.
900
                    // There's no risk of overflow.
901
25
                    Some(old_value + 1)
902
                } else {
903
0
                    None
904
                }
905
25
            })
906
25
            .is_err()
907
        {
908
0
            return Err(TrySendRequestError {
909
0
                request,
910
0
                cause: TrySendRequestErrorCause::TooManyPendingRequests,
911
0
            });
912
25
        }
913
25
914
25
        // Everything successful.
915
25
        queue.requests_queue.push(request);
916
25
        queue.on_request_pushed.notify(usize::MAX);
917
25
        Ok(())
918
25
    }
Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request
_RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIo16try_send_request
Line
Count
Source
885
25
    pub fn try_send_request(&self, request: String) -> Result<(), TrySendRequestError> {
886
25
        let Some(queue) = self.serialized_io.upgrade() else {
887
0
            return Err(TrySendRequestError {
888
0
                request,
889
0
                cause: TrySendRequestErrorCause::ClientMainTaskDestroyed,
890
0
            });
891
        };
892
893
        // Try to increment `num_requests_in_fly`. Return an error if it is past the maximum.
894
25
        if queue
895
25
            .num_requests_in_fly
896
25
            .fetch_update(Ordering::SeqCst, Ordering::Relaxed, |old_value| {
897
                if old_value < queue.max_requests_in_fly.get() {
898
                    // Considering that `old_value < max`, and `max` fits in a `u32` by
899
                    // definition, then `old_value + 1` also always fits in a `u32`. QED.
900
                    // There's no risk of overflow.
901
                    Some(old_value + 1)
902
                } else {
903
                    None
904
                }
905
25
            })
906
25
            .is_err()
907
        {
908
0
            return Err(TrySendRequestError {
909
0
                request,
910
0
                cause: TrySendRequestErrorCause::TooManyPendingRequests,
911
0
            });
912
25
        }
913
25
914
25
        // Everything successful.
915
25
        queue.requests_queue.push(request);
916
25
        queue.on_request_pushed.notify(usize::MAX);
917
25
        Ok(())
918
25
    }
919
}
920
921
impl fmt::Debug for SerializedRequestsIo {
922
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
923
0
        f.debug_tuple("SerializedRequestsIo").finish()
924
0
    }
Unexecuted instantiation: _RNvXs2_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXs2_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
925
}
926
927
impl Drop for SerializedRequestsIo {
928
21
    fn drop(&mut self) {
929
21
        self.on_serialized_requests_io_destroyed.notify(usize::MAX);
930
21
    }
Unexecuted instantiation: _RNvXs3_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
_RNvXs3_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_20SerializedRequestsIoNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Line
Count
Source
928
21
    fn drop(&mut self) {
929
21
        self.on_serialized_requests_io_destroyed.notify(usize::MAX);
930
21
    }
931
}
932
933
/// See [`SerializedRequestsIo::wait_next_response`].
934
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXsg_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_21WaitNextResponseErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsg_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_21WaitNextResponseErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
935
pub enum WaitNextResponseError {
936
    /// The attached [`ClientMainTask`] has been destroyed.
937
    ClientMainTaskDestroyed,
938
}
939
940
/// Error returned by [`SerializedRequestsIo::send_request`].
941
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsi_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_16SendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsi_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_16SendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
942
#[display(fmt = "{cause}")]
943
pub struct SendRequestError {
944
    /// The JSON-RPC request that was passed as parameter.
945
    pub request: String,
946
    /// Reason for the error.
947
    pub cause: SendRequestErrorCause,
948
}
949
950
/// See [`SendRequestError::cause`].
951
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsk_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_21SendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsk_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_21SendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
952
pub enum SendRequestErrorCause {
953
    /// The attached [`ClientMainTask`] has been destroyed.
954
    ClientMainTaskDestroyed,
955
}
956
957
/// Error returned by [`SerializedRequestsIo::try_send_request`].
958
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsm_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_19TrySendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsm_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_19TrySendRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
959
#[display(fmt = "{cause}")]
960
pub struct TrySendRequestError {
961
    /// The JSON-RPC request that was passed as parameter.
962
    pub request: String,
963
    /// Reason for the error.
964
    pub cause: TrySendRequestErrorCause,
965
}
966
967
/// See [`TrySendRequestError::cause`].
968
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXso_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24TrySendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXso_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24TrySendRequestErrorCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
969
pub enum TrySendRequestErrorCause {
970
    /// Limit to the maximum number of pending requests that was passed as
971
    /// [`Config::max_pending_requests`] has been reached. No more requests can be sent before
972
    /// some responses have been pulled.
973
    TooManyPendingRequests,
974
    /// The attached [`ClientMainTask`] has been destroyed.
975
    ClientMainTaskDestroyed,
976
}
977
978
/// Object connected to the [`ClientMainTask`] and containing a request expecting an answer.
979
///
980
/// If this object is dropped before the request has been answered, an automatic "internal error"
981
/// error response is automatically sent back.
982
pub struct RequestProcess {
983
    /// Queue where responses and subscriptions push responses/notifications.
984
    responses_notifications_queue: Arc<ResponsesNotificationsQueue>,
985
    /// Request in JSON form. Guaranteed to decode successfully.
986
    request: String,
987
    /// `true` if a response has already been sent.
988
    has_sent_response: bool,
989
}
990
991
impl RequestProcess {
992
    /// Returns the request which must be processed.
993
    ///
994
    /// The request is guaranteed to not be related to subscriptions in any way.
995
    // TODO: with stronger typing users wouldn't have to worry about the type of request
996
46
    pub fn request(&self) -> methods::MethodCall {
997
46
        methods::parse_jsonrpc_client_to_server(&self.request)
998
46
            .unwrap()
999
46
            .1
1000
46
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request
_RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7request
Line
Count
Source
996
46
    pub fn request(&self) -> methods::MethodCall {
997
46
        methods::parse_jsonrpc_client_to_server(&self.request)
998
46
            .unwrap()
999
46
            .1
1000
46
    }
1001
1002
    /// Indicate the response to the request to the [`ClientMainTask`].
1003
    ///
1004
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1005
19
    pub fn respond(mut self, response: methods::Response<'_>) {
1006
19
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1007
19
            .unwrap()
1008
19
            .0;
1009
19
        let serialized = response.to_json_response(request_id);
1010
19
        self.responses_notifications_queue
1011
19
            .queue
1012
19
            .push(ToMainTask::RequestResponse(serialized));
1013
19
        self.responses_notifications_queue
1014
19
            .on_pushed
1015
19
            .notify(usize::MAX);
1016
19
        self.has_sent_response = true;
1017
19
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond
_RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess7respond
Line
Count
Source
1005
19
    pub fn respond(mut self, response: methods::Response<'_>) {
1006
19
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1007
19
            .unwrap()
1008
19
            .0;
1009
19
        let serialized = response.to_json_response(request_id);
1010
19
        self.responses_notifications_queue
1011
19
            .queue
1012
19
            .push(ToMainTask::RequestResponse(serialized));
1013
19
        self.responses_notifications_queue
1014
19
            .on_pushed
1015
19
            .notify(usize::MAX);
1016
19
        self.has_sent_response = true;
1017
19
    }
1018
1019
    /// Indicate to the [`ClientMainTask`] that the response to the request is `null`.
1020
    ///
1021
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1022
    // TODO: the necessity for this function is basically a hack
1023
2
    pub fn respond_null(mut self) {
1024
2
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1025
2
            .unwrap()
1026
2
            .0;
1027
2
        let serialized = parse::build_success_response(request_id, "null");
1028
2
        self.responses_notifications_queue
1029
2
            .queue
1030
2
            .push(ToMainTask::RequestResponse(serialized));
1031
2
        self.responses_notifications_queue
1032
2
            .on_pushed
1033
2
            .notify(usize::MAX);
1034
2
        self.has_sent_response = true;
1035
2
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null
_RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess12respond_null
Line
Count
Source
1023
2
    pub fn respond_null(mut self) {
1024
2
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1025
2
            .unwrap()
1026
2
            .0;
1027
2
        let serialized = parse::build_success_response(request_id, "null");
1028
2
        self.responses_notifications_queue
1029
2
            .queue
1030
2
            .push(ToMainTask::RequestResponse(serialized));
1031
2
        self.responses_notifications_queue
1032
2
            .on_pushed
1033
2
            .notify(usize::MAX);
1034
2
        self.has_sent_response = true;
1035
2
    }
1036
1037
    /// Indicate to the [`ClientMainTask`] that the request should return an error.
1038
    ///
1039
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1040
2
    pub fn fail(mut self, error: ErrorResponse) {
1041
2
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1042
2
            .unwrap()
1043
2
            .0;
1044
2
        let serialized = parse::build_error_response(request_id, error, None);
1045
2
        self.responses_notifications_queue
1046
2
            .queue
1047
2
            .push(ToMainTask::RequestResponse(serialized));
1048
2
        self.responses_notifications_queue
1049
2
            .on_pushed
1050
2
            .notify(usize::MAX);
1051
2
        self.has_sent_response = true;
1052
2
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail
_RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess4fail
Line
Count
Source
1040
2
    pub fn fail(mut self, error: ErrorResponse) {
1041
2
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1042
2
            .unwrap()
1043
2
            .0;
1044
2
        let serialized = parse::build_error_response(request_id, error, None);
1045
2
        self.responses_notifications_queue
1046
2
            .queue
1047
2
            .push(ToMainTask::RequestResponse(serialized));
1048
2
        self.responses_notifications_queue
1049
2
            .on_pushed
1050
2
            .notify(usize::MAX);
1051
2
        self.has_sent_response = true;
1052
2
    }
1053
1054
    /// Indicate to the [`ClientMainTask`] that the request should return an error.
1055
    ///
1056
    /// This function is similar to [`RequestProcess`], except that an additional JSON payload is
1057
    /// attached to the error.
1058
    ///
1059
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1060
0
    pub fn fail_with_attached_json(mut self, error: ErrorResponse, json: &str) {
1061
0
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1062
0
            .unwrap()
1063
0
            .0;
1064
0
        let serialized = parse::build_error_response(request_id, error, Some(json));
1065
0
        self.responses_notifications_queue
1066
0
            .queue
1067
0
            .push(ToMainTask::RequestResponse(serialized));
1068
0
        self.responses_notifications_queue
1069
0
            .on_pushed
1070
0
            .notify(usize::MAX);
1071
0
        self.has_sent_response = true;
1072
0
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcess23fail_with_attached_json
1073
}
1074
1075
impl fmt::Debug for RequestProcess {
1076
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1077
0
        fmt::Debug::fmt(&self.request, f)
1078
0
    }
Unexecuted instantiation: _RNvXs5_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXs5_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
1079
}
1080
1081
impl Drop for RequestProcess {
1082
23
    fn drop(&mut self) {
1083
23
        if !self.has_sent_response {
1084
0
            let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1085
0
                .unwrap()
1086
0
                .0;
1087
0
            let serialized =
1088
0
                parse::build_error_response(request_id, ErrorResponse::InternalError, None);
1089
0
            self.responses_notifications_queue
1090
0
                .queue
1091
0
                .push(ToMainTask::RequestResponse(serialized));
1092
0
            self.responses_notifications_queue
1093
0
                .on_pushed
1094
0
                .notify(usize::MAX);
1095
23
        }
1096
23
    }
Unexecuted instantiation: _RNvXs6_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
_RNvXs6_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_14RequestProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Line
Count
Source
1082
23
    fn drop(&mut self) {
1083
23
        if !self.has_sent_response {
1084
0
            let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1085
0
                .unwrap()
1086
0
                .0;
1087
0
            let serialized =
1088
0
                parse::build_error_response(request_id, ErrorResponse::InternalError, None);
1089
0
            self.responses_notifications_queue
1090
0
                .queue
1091
0
                .push(ToMainTask::RequestResponse(serialized));
1092
0
            self.responses_notifications_queue
1093
0
                .on_pushed
1094
0
                .notify(usize::MAX);
1095
23
        }
1096
23
    }
1097
}
1098
1099
/// Object connected to the [`ClientMainTask`] and containing a request that leads to the creation
1100
/// of a subscription.
1101
///
1102
/// If this object is dropped before the request has been answered, an automatic "internal error"
1103
/// error response is automatically sent back.
1104
pub struct SubscriptionStartProcess {
1105
    /// Queue where responses and subscriptions push responses/notifications.
1106
    responses_notifications_queue: Arc<ResponsesNotificationsQueue>,
1107
    /// `Arc` shared with the client main task and that is used to notify that the subscription
1108
    /// should be killed.
1109
    kill_channel: Arc<SubscriptionKillChannel>,
1110
    /// Request in JSON form. Guaranteed to decode successfully.
1111
    request: String,
1112
    /// Identifier of the subscription. Assigned by the client task.
1113
    subscription_id: String,
1114
    /// `true` if a response has already been sent.
1115
    has_sent_response: bool,
1116
}
1117
1118
impl SubscriptionStartProcess {
1119
    /// Returns the request which must be processed.
1120
    ///
1121
    /// The request is guaranteed to be a request that starts a subscription.
1122
    // TODO: with stronger typing users wouldn't have to worry about the type of request
1123
0
    pub fn request(&self) -> methods::MethodCall {
1124
0
        methods::parse_jsonrpc_client_to_server(&self.request)
1125
0
            .unwrap()
1126
0
            .1
1127
0
    }
Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request
Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess7request
1128
1129
    /// Indicate to the [`ClientMainTask`] that the subscription is accepted.
1130
    ///
1131
    /// The [`ClientMainTask`] will send the confirmation to the JSON-RPC client.
1132
    ///
1133
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1134
0
    pub fn accept(mut self) -> Subscription {
1135
0
        let (request_id, parsed_request) =
1136
0
            methods::parse_jsonrpc_client_to_server(&self.request).unwrap();
1137
1138
0
        let serialized_response = match parsed_request {
1139
            methods::MethodCall::author_submitAndWatchExtrinsic { .. } => {
1140
0
                methods::Response::author_submitAndWatchExtrinsic(Cow::Borrowed(
1141
0
                    &self.subscription_id,
1142
0
                ))
1143
            }
1144
            methods::MethodCall::chain_subscribeAllHeads { .. } => {
1145
0
                methods::Response::chain_subscribeAllHeads(Cow::Borrowed(&self.subscription_id))
1146
            }
1147
            methods::MethodCall::chain_subscribeFinalizedHeads { .. } => {
1148
0
                methods::Response::chain_subscribeFinalizedHeads(Cow::Borrowed(
1149
0
                    &self.subscription_id,
1150
0
                ))
1151
            }
1152
            methods::MethodCall::chain_subscribeNewHeads { .. } => {
1153
0
                methods::Response::chain_subscribeNewHeads(Cow::Borrowed(&self.subscription_id))
1154
            }
1155
            methods::MethodCall::state_subscribeRuntimeVersion { .. } => {
1156
0
                methods::Response::state_subscribeRuntimeVersion(Cow::Borrowed(
1157
0
                    &self.subscription_id,
1158
0
                ))
1159
            }
1160
            methods::MethodCall::state_subscribeStorage { .. } => {
1161
0
                methods::Response::state_subscribeStorage(Cow::Borrowed(&self.subscription_id))
1162
            }
1163
            methods::MethodCall::transactionWatch_v1_submitAndWatch { .. } => {
1164
0
                methods::Response::transactionWatch_v1_submitAndWatch(Cow::Borrowed(
1165
0
                    &self.subscription_id,
1166
0
                ))
1167
            }
1168
            methods::MethodCall::sudo_network_unstable_watch { .. } => {
1169
0
                methods::Response::sudo_network_unstable_watch(Cow::Borrowed(&self.subscription_id))
1170
            }
1171
            methods::MethodCall::chainHead_v1_follow { .. } => {
1172
0
                methods::Response::chainHead_v1_follow(Cow::Borrowed(&self.subscription_id))
1173
            }
1174
0
            _ => unreachable!(),
1175
        }
1176
0
        .to_json_response(request_id);
1177
0
1178
0
        self.responses_notifications_queue
1179
0
            .queue
1180
0
            .push(ToMainTask::RequestResponse(serialized_response));
1181
0
        self.responses_notifications_queue
1182
0
            .on_pushed
1183
0
            .notify(usize::MAX);
1184
0
        self.has_sent_response = true;
1185
0
1186
0
        Subscription {
1187
0
            responses_notifications_queue: self.responses_notifications_queue.clone(),
1188
0
            kill_channel: self.kill_channel.clone(),
1189
0
            subscription_id: mem::take(&mut self.subscription_id),
1190
0
        }
1191
0
    }
Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept
Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess6accept
1192
1193
    /// Indicate to the [`ClientMainTask`] that the subscription start request should return an
1194
    /// error.
1195
    ///
1196
    /// Has no effect if the [`ClientMainTask`] has been destroyed.
1197
0
    pub fn fail(mut self, error: ErrorResponse) {
1198
0
        let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1199
0
            .unwrap()
1200
0
            .0;
1201
0
        let serialized = parse::build_error_response(request_id, error, None);
1202
0
        self.responses_notifications_queue
1203
0
            .queue
1204
0
            .push(ToMainTask::RequestResponse(serialized));
1205
0
        self.responses_notifications_queue
1206
0
            .queue
1207
0
            .push(ToMainTask::SubscriptionDestroyed {
1208
0
                subscription_id: mem::take(&mut self.subscription_id),
1209
0
            });
1210
0
        self.responses_notifications_queue
1211
0
            .on_pushed
1212
0
            .notify(usize::MAX);
1213
0
        self.has_sent_response = true;
1214
0
    }
Unexecuted instantiation: _RNvMs7_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail
Unexecuted instantiation: _RNvMs7_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcess4fail
1215
}
1216
1217
impl fmt::Debug for SubscriptionStartProcess {
1218
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1219
0
        fmt::Debug::fmt(&self.request, f)
1220
0
    }
Unexecuted instantiation: _RNvXs8_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXs8_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
1221
}
1222
1223
impl Drop for SubscriptionStartProcess {
1224
0
    fn drop(&mut self) {
1225
0
        if !self.has_sent_response {
1226
0
            let request_id = methods::parse_jsonrpc_client_to_server(&self.request)
1227
0
                .unwrap()
1228
0
                .0;
1229
0
            let serialized =
1230
0
                parse::build_error_response(request_id, ErrorResponse::InternalError, None);
1231
0
            self.responses_notifications_queue
1232
0
                .queue
1233
0
                .push(ToMainTask::RequestResponse(serialized));
1234
0
            self.responses_notifications_queue
1235
0
                .queue
1236
0
                .push(ToMainTask::SubscriptionDestroyed {
1237
0
                    subscription_id: mem::take(&mut self.subscription_id),
1238
0
                });
1239
0
            self.responses_notifications_queue
1240
0
                .on_pushed
1241
0
                .notify(usize::MAX);
1242
0
        }
1243
0
    }
Unexecuted instantiation: _RNvXs9_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Unexecuted instantiation: _RNvXs9_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_24SubscriptionStartProcessNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
1244
}
1245
1246
/// Object connected to the [`ClientMainTask`] representing an active subscription.
1247
pub struct Subscription {
1248
    /// Queue where responses and subscriptions push responses/notifications.
1249
    responses_notifications_queue: Arc<ResponsesNotificationsQueue>,
1250
    /// `Arc` shared with the client main task and that is used to notify that the subscription
1251
    /// should be killed.
1252
    kill_channel: Arc<SubscriptionKillChannel>,
1253
    /// Identifier of the subscription. Assigned by the client task.
1254
    subscription_id: String,
1255
}
1256
1257
/// See [`Subscription::kill_channel`].
1258
struct SubscriptionKillChannel {
1259
    /// `true` if this subscription should be destroyed as soon as possible.
1260
    dead: AtomicBool,
1261
    /// Notified whenever [`SubscriptionKillChannel::dead`] is modified.
1262
    on_dead_changed: event_listener::Event,
1263
}
1264
1265
impl Subscription {
1266
    /// Return the identifier of this subscription. Necessary in order to generate answers.
1267
0
    pub fn subscription_id(&self) -> &str {
1268
0
        &self.subscription_id
1269
0
    }
Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id
Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription15subscription_id
1270
1271
    /// Send a notification the [`ClientMainTask`].
1272
    ///
1273
    /// Has no effect if [`Subscription::is_stale`] would return `true`.
1274
    ///
1275
    /// This notification might end up being discarded if the queue of responses to send back to
1276
    /// the JSON-RPC client is full and/or if the notification is redundant with another
1277
    /// notification sent earlier.
1278
    ///
1279
    /// While this function is asynchronous, it is expected to not take very long provided that
1280
    /// [`ClientMainTask::run_until_event`] is called in parallel.
1281
    ///
1282
    /// > **Note**: It is important to run [`ClientMainTask::run_until_event`] concurrently to
1283
    /// >           this function, otherwise it might never return.
1284
    // TODO: with stronger typing we could automatically fill the subscription_id
1285
0
    pub async fn send_notification(&mut self, notification: methods::ServerToClient<'_>) {
Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification
Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription17send_notification
1286
0
        let serialized = notification.to_json_request_object_parameters(None);
1287
0
1288
0
        // Wait until there is space in the queue or that the subscription is dead.
1289
0
        // Note that this is intentionally racy.
1290
0
        {
1291
0
            let mut wait = None;
1292
0
            loop {
1293
0
                // If the subscription is dead, simply do nothing. This is purely an optimization.
1294
0
                if self.kill_channel.dead.load(Ordering::Relaxed) {
1295
0
                    return;
1296
0
                }
1297
0
1298
0
                // If there is space, break out of the loop in order to send.
1299
0
                if self.responses_notifications_queue.queue.len()
1300
0
                    < self.responses_notifications_queue.max_len
1301
                {
1302
0
                    break;
1303
0
                }
1304
1305
0
                if let Some(wait) = wait.take() {
1306
0
                    wait.await
1307
0
                } else {
1308
0
                    wait = Some(
1309
0
                        self.responses_notifications_queue
1310
0
                            .on_popped
1311
0
                            .listen()
1312
0
                            .or(self.kill_channel.on_dead_changed.listen()),
1313
0
                    );
1314
0
                }
1315
            }
1316
        }
1317
1318
        // Actually push the element.
1319
0
        self.responses_notifications_queue
1320
0
            .queue
1321
0
            .push(ToMainTask::Notification(serialized));
1322
0
        self.responses_notifications_queue
1323
0
            .on_pushed
1324
0
            .notify(usize::MAX);
1325
0
    }
Unexecuted instantiation: _RNCNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_
Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0Bd_
Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription17send_notification0CsiUjFBJteJ7x_17smoldot_full_node
1326
1327
    /// Returns `true` if the JSON-RPC client has unsubscribed, or the [`ClientMainTask`] has been
1328
    /// destroyed, or the queue of responses to send to the JSON-RPC client is clogged and the
1329
    /// logic of the subscription requires that it stops altogether in that situation.
1330
    ///
1331
    /// Due to the racy nature of this function, a value of `false` can at any moment switch to
1332
    /// `true` and thus should be interpreted as "maybe". A value of `true`, however, actually
1333
    /// means "yes", as it can't ever switch back to `false`.
1334
0
    pub fn is_stale(&self) -> bool {
1335
0
        self.kill_channel.dead.load(Ordering::Relaxed)
1336
0
    }
Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale
Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription8is_stale
1337
1338
    /// Run indefinitely until [`Subscription::is_stale`] returns `true`.
1339
0
    pub async fn wait_until_stale(&mut self) {
Unexecuted instantiation: _RNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale
Unexecuted instantiation: _RNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12Subscription16wait_until_stale
1340
0
        // The control flow of this function is a bit magic, but simple enough that it should be
1341
0
        // easy to understand.
1342
0
        let mut wait = None;
1343
0
        loop {
1344
0
            if self.kill_channel.dead.load(Ordering::Acquire) {
1345
0
                return;
1346
0
            }
1347
1348
0
            if let Some(wait) = wait.take() {
1349
0
                wait.await;
1350
0
            } else {
1351
0
                wait = Some(self.kill_channel.on_dead_changed.listen());
1352
0
            }
1353
        }
1354
0
    }
Unexecuted instantiation: _RNCNvMsa_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_
Unexecuted instantiation: _RNCNvMsa_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB7_12Subscription16wait_until_stale0Bd_
1355
}
1356
1357
impl fmt::Debug for Subscription {
1358
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1359
0
        f.debug_tuple("Subscription")
1360
0
            .field(&self.subscription_id)
1361
0
            .finish()
1362
0
    }
Unexecuted instantiation: _RNvXsb_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXsb_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
1363
}
1364
1365
impl Drop for Subscription {
1366
0
    fn drop(&mut self) {
1367
0
        self.responses_notifications_queue
1368
0
            .queue
1369
0
            .push(ToMainTask::SubscriptionDestroyed {
1370
0
                subscription_id: mem::take(&mut self.subscription_id),
1371
0
            });
1372
0
        self.responses_notifications_queue
1373
0
            .on_pushed
1374
0
            .notify(usize::MAX);
1375
0
    }
Unexecuted instantiation: _RNvXsc_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Unexecuted instantiation: _RNvXsc_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service16client_main_taskNtB5_12SubscriptionNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
1376
}