Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/collection/single_stream.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use super::{
19
    super::{
20
        connection::{established, single_stream_handshake},
21
        read_write::ReadWrite,
22
    },
23
    ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
24
    CoordinatorToConnectionInner, NotificationsOutErr, ShutdownCause, SubstreamId,
25
};
26
27
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
28
use core::{
29
    mem,
30
    ops::{Add, Sub},
31
    time::Duration,
32
};
33
34
pub(super) struct Config<TNow> {
35
    pub(super) randomness_seed: [u8; 32],
36
    pub(super) handshake: single_stream_handshake::HealthyHandshake,
37
    pub(super) handshake_timeout: TNow,
38
    pub(super) max_inbound_substreams: usize,
39
    pub(super) substreams_capacity: usize,
40
    pub(super) max_protocol_name_len: usize,
41
    pub(super) ping_protocol: Arc<str>,
42
}
43
44
/// State machine dedicated to a single single-stream connection.
45
pub struct SingleStreamConnectionTask<TNow> {
46
    /// State machine of the underlying connection.
47
    connection: SingleStreamConnectionTaskInner<TNow>,
48
49
    /// Buffer of messages destined to the coordinator.
50
    ///
51
    /// Never goes above a few elements.
52
    pending_messages: VecDeque<ConnectionToCoordinatorInner>,
53
}
54
55
enum SingleStreamConnectionTaskInner<TNow> {
56
    /// Connection is still in its handshake phase.
57
    Handshake {
58
        handshake: single_stream_handshake::HealthyHandshake,
59
60
        /// Seed that will be used to initialize randomness when building the
61
        /// [`established::SingleStream`].
62
        /// This seed is computed during the handshake in order to avoid having to access a shared
63
        /// state when the handshake is over. While it seems a bit dangerous to leave a randomness
64
        /// seed in plain memory, the randomness isn't used for anything critical or related to
65
        /// cryptography, but only for example to avoid hash collision attacks.
66
        randomness_seed: [u8; 32],
67
68
        /// When the handshake phase times out.
69
        timeout: TNow,
70
71
        /// See [`super::Config::max_inbound_substreams`].
72
        max_inbound_substreams: usize,
73
74
        /// See [`Config::substreams_capacity`].
75
        substreams_capacity: usize,
76
77
        max_protocol_name_len: usize,
78
79
        /// See [`super::Config::ping_protocol`].
80
        ping_protocol: Arc<str>,
81
    },
82
83
    /// Connection has been fully established.
84
    Established {
85
        established: established::SingleStream<TNow, Option<SubstreamId>>,
86
87
        /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping
88
        /// of the "outer ids" to "inner ids".
89
        outbound_substreams_map:
90
            hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>,
91
92
        /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a
93
        /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an
94
        /// entry is added to this list. If the coordinator accepts or refuses a substream in this
95
        /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed.
96
        notifications_in_close_acknowledgments: VecDeque<established::SubstreamId>,
97
98
        /// After a `NotificationsInOpenCancel` is emitted by the connection, an
99
        /// entry is added to this list. If the coordinator accepts or refuses a substream in this
100
        /// list, the acceptance/refusal is dismissed.
101
        // TODO: this works only because SubstreamIds aren't reused
102
        inbound_negotiated_cancel_acknowledgments:
103
            hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,
104
    },
105
106
    /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
107
    /// message has been sent and is waiting to be acknowledged.
108
    ShutdownWaitingAck {
109
        /// What has initiated the shutdown.
110
        initiator: ShutdownInitiator,
111
    },
112
113
    /// Connection has finished its shutdown and its shutdown has been acknowledged. There is
114
    /// nothing more to do except stop the connection task.
115
    ShutdownAcked {
116
        /// What has initiated the shutdown.
117
        initiator: ShutdownInitiator,
118
    },
119
120
    /// Temporary state used to satisfy the borrow checker during state transitions.
121
    Poisoned,
122
}
123
124
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
125
enum ShutdownInitiator {
126
    /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
127
    Coordinator,
128
    /// [`SingleStreamConnectionTask::reset`] has been called.
129
    Api,
130
    /// The shutdown has been initiated due to a protocol error or because the connection has been
131
    /// shut down cleanly by the remote.
132
    Remote,
133
}
134
135
impl<TNow> SingleStreamConnectionTask<TNow>
136
where
137
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
138
{
139
    // Note that the parameters of this function are a bit rough and undocumented, as this is
140
    // a function only called from the parent module.
141
0
    pub(super) fn new(config: Config<TNow>) -> Self {
142
0
        SingleStreamConnectionTask {
143
0
            connection: SingleStreamConnectionTaskInner::Handshake {
144
0
                handshake: config.handshake,
145
0
                randomness_seed: config.randomness_seed,
146
0
                timeout: config.handshake_timeout,
147
0
                max_inbound_substreams: config.max_inbound_substreams,
148
0
                substreams_capacity: config.substreams_capacity,
149
0
                max_protocol_name_len: config.max_protocol_name_len,
150
0
                ping_protocol: config.ping_protocol,
151
0
            },
152
0
            pending_messages: VecDeque::with_capacity({
153
0
                // We never buffer more than a few messages.
154
0
                4
155
0
            }),
156
0
        }
157
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE3newB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE3newB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsiUjFBJteJ7x_17smoldot_full_node
158
159
    /// Pulls a message to send back to the coordinator.
160
    ///
161
    /// This function takes ownership of `self` and optionally yields it back. If the first
162
    /// option contains `None`, then no more message will be generated and the
163
    /// [`SingleStreamConnectionTask`] has vanished. This will happen after the connection has been
164
    /// shut down or reset.
165
    /// It is possible for `self` to not be yielded back even if the [`ReadWrite`] that was last
166
    /// passed to [`SingleStreamConnectionTask::read_write`] is still fully open, in which case the
167
    /// API user should abruptly reset the connection, for example by sending a TCP RST flag. This
168
    /// can happen for example if the connection seems unresponsive and that an attempt at closing
169
    /// the connection in a clean way is futile.
170
    ///
171
    /// If any message is returned, it is the responsibility of the API user to send it to the
172
    /// coordinator.
173
    /// Do not attempt to buffer the message being returned, as it would work against the
174
    /// back-pressure strategy used internally. As soon as a message is returned, it should be
175
    /// delivered. If the coordinator is busy at the moment a message should be delivered, then
176
    /// the entire thread of execution dedicated to this [`SingleStreamConnectionTask`] should be
177
    /// paused until the coordinator is ready and the message delivered.
178
    ///
179
    /// Messages aren't generated spontaneously. In other words, you don't need to periodically
180
    /// call this function just in case there's a new message. Messages are always generated after
181
    /// either [`SingleStreamConnectionTask::read_write`] or [`SingleStreamConnectionTask::reset`]
182
    /// has been called. Multiple messages can happen in a row.
183
    ///
184
    /// Because this function frees space in a buffer, calling
185
    /// [`SingleStreamConnectionTask::read_write`] again after it has returned might read/write
186
    /// more data and generate an event again. In other words, the API user should call
187
    /// [`SingleStreamConnectionTask::read_write`] and
188
    /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no
189
    /// more message is generated.
190
0
    pub fn pull_message_to_coordinator(
191
0
        mut self,
192
0
    ) -> (Option<Self>, Option<ConnectionToCoordinator>) {
193
0
        // To be sure that there is no bug in the implementation, we make sure that the number of
194
0
        // buffered messages doesn't go above a certain small limit.
195
0
        debug_assert!(self.pending_messages.len() < 8);
196
197
0
        let message = self
198
0
            .pending_messages
199
0
            .pop_front()
200
0
            .map(|inner| ConnectionToCoordinator { inner });
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE27pull_message_to_coordinator0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE27pull_message_to_coordinator0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE27pull_message_to_coordinator0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE27pull_message_to_coordinator0CsiUjFBJteJ7x_17smoldot_full_node
201
202
        // The `ShutdownAcked` state causes the task to exit.
203
0
        let self_ret = if !matches!(
204
0
            self.connection,
205
            SingleStreamConnectionTaskInner::ShutdownAcked { .. }
206
        ) {
207
0
            Some(self)
208
        } else {
209
0
            None
210
        };
211
212
0
        (self_ret, message)
213
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE27pull_message_to_coordinatorB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE27pull_message_to_coordinatorCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE27pull_message_to_coordinatorB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE27pull_message_to_coordinatorCsiUjFBJteJ7x_17smoldot_full_node
214
215
    /// Injects a message that has been pulled from the coordinator.
216
    ///
217
    /// Calling this function might generate data to send to the connection. You should call
218
    /// [`SingleStreamConnectionTask::read_write`] after this function has returned (unless you
219
    /// have called [`SingleStreamConnectionTask::reset`] in the past).
220
0
    pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) {
221
0
        match (message.inner, &mut self.connection) {
222
            (
223
                CoordinatorToConnectionInner::AcceptInbound {
224
0
                    substream_id,
225
0
                    inbound_ty,
226
0
                },
227
0
                SingleStreamConnectionTaskInner::Established {
228
0
                    established,
229
0
                    inbound_negotiated_cancel_acknowledgments,
230
0
                    ..
231
0
                },
232
0
            ) => {
233
0
                if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
234
0
                    established.accept_inbound(substream_id, inbound_ty, None);
235
0
                } else {
236
0
                    self.pending_messages.push_back(
237
0
                        ConnectionToCoordinatorInner::InboundAcceptedCancel { id: substream_id },
238
0
                    )
239
                }
240
            }
241
            (
242
0
                CoordinatorToConnectionInner::RejectInbound { substream_id },
243
0
                SingleStreamConnectionTaskInner::Established {
244
0
                    established,
245
0
                    inbound_negotiated_cancel_acknowledgments,
246
0
                    ..
247
0
                },
248
0
            ) => {
249
0
                if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) {
250
0
                    established.reject_inbound(substream_id);
251
0
                }
252
            }
253
            (
254
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
255
0
                SingleStreamConnectionTaskInner::Handshake {
256
0
                    max_protocol_name_len,
257
0
                    ..
258
0
                },
259
0
            ) => {
260
0
                *max_protocol_name_len = new_max_length;
261
0
            }
262
            (
263
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
264
0
                SingleStreamConnectionTaskInner::Established { established, .. },
265
0
            ) => {
266
0
                established.set_max_protocol_name_len(new_max_length);
267
0
            }
268
            (
269
                CoordinatorToConnectionInner::StartRequest {
270
0
                    protocol_name,
271
0
                    request_data,
272
0
                    timeout,
273
0
                    max_response_size,
274
0
                    substream_id,
275
0
                },
276
0
                SingleStreamConnectionTaskInner::Established {
277
0
                    established,
278
0
                    outbound_substreams_map,
279
0
                    ..
280
0
                },
281
0
            ) => {
282
0
                let inner_substream_id = established.add_request(
283
0
                    protocol_name,
284
0
                    request_data,
285
0
                    now.clone() + timeout,
286
0
                    max_response_size,
287
0
                    Some(substream_id),
288
0
                );
289
0
290
0
                let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
291
0
                debug_assert!(_prev_value.is_none());
292
            }
293
            (
294
                CoordinatorToConnectionInner::OpenOutNotifications {
295
0
                    protocol_name,
296
0
                    max_handshake_size,
297
0
                    handshake,
298
0
                    handshake_timeout,
299
0
                    substream_id: outer_substream_id,
300
0
                },
301
0
                SingleStreamConnectionTaskInner::Established {
302
0
                    established,
303
0
                    outbound_substreams_map,
304
0
                    ..
305
0
                },
306
0
            ) => {
307
0
                let inner_substream_id = established.open_notifications_substream(
308
0
                    protocol_name,
309
0
                    handshake,
310
0
                    max_handshake_size,
311
0
                    now.clone() + handshake_timeout,
312
0
                    Some(outer_substream_id),
313
0
                );
314
0
315
0
                let _prev_value =
316
0
                    outbound_substreams_map.insert(outer_substream_id, inner_substream_id);
317
0
                debug_assert!(_prev_value.is_none());
318
            }
319
            (
320
0
                CoordinatorToConnectionInner::CloseOutNotifications { substream_id },
321
0
                SingleStreamConnectionTaskInner::Established {
322
0
                    established,
323
0
                    outbound_substreams_map,
324
                    ..
325
                },
326
            ) => {
327
                // It is possible that the remote has closed the outbound notification substream
328
                // while the `CloseOutNotifications` message was being delivered, or that the API
329
                // user close the substream before the message about the substream being closed
330
                // was delivered to the coordinator.
331
0
                if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) {
332
0
                    established.close_out_notifications_substream(inner_substream_id);
333
0
                }
334
            }
335
            (
336
                CoordinatorToConnectionInner::QueueNotification {
337
0
                    substream_id,
338
0
                    notification,
339
0
                },
340
0
                SingleStreamConnectionTaskInner::Established {
341
0
                    established,
342
0
                    outbound_substreams_map,
343
                    ..
344
                },
345
            ) => {
346
                // It is possible that the remote has closed the outbound notification substream
347
                // while a `QueueNotification` message was being delivered, or that the API user
348
                // queued a notification before the message about the substream being closed was
349
                // delivered to the coordinator.
350
                // If that happens, we intentionally silently discard the message, causing the
351
                // notification to not be sent. This is consistent with the guarantees about
352
                // notifications delivered that are documented in the public API.
353
0
                if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) {
354
0
                    established.write_notification_unbounded(*inner_substream_id, notification);
355
0
                }
356
            }
357
            (
358
                CoordinatorToConnectionInner::AnswerRequest {
359
0
                    substream_id,
360
0
                    response,
361
0
                },
362
0
                SingleStreamConnectionTaskInner::Established { established, .. },
363
0
            ) => match established.respond_in_request(substream_id, response) {
364
0
                Ok(()) => {}
365
0
                Err(established::RespondInRequestError::SubstreamClosed) => {
366
0
                    // As documented, answering an obsolete request is simply ignored.
367
0
                }
368
            },
369
            (
370
                CoordinatorToConnectionInner::AcceptInNotifications {
371
0
                    substream_id,
372
0
                    handshake,
373
0
                    max_notification_size,
374
0
                },
375
0
                SingleStreamConnectionTaskInner::Established {
376
0
                    established,
377
0
                    notifications_in_close_acknowledgments,
378
                    ..
379
                },
380
            ) => {
381
0
                if let Some(idx) = notifications_in_close_acknowledgments
382
0
                    .iter()
383
0
                    .position(|s| *s == substream_id)
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_message0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_message0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_message0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_message0CsiUjFBJteJ7x_17smoldot_full_node
384
0
                {
385
0
                    notifications_in_close_acknowledgments.remove(idx);
386
0
                } else {
387
0
                    established.accept_in_notifications_substream(
388
0
                        substream_id,
389
0
                        handshake,
390
0
                        max_notification_size,
391
0
                    );
392
0
                }
393
            }
394
            (
395
0
                CoordinatorToConnectionInner::RejectInNotifications { substream_id },
396
0
                SingleStreamConnectionTaskInner::Established {
397
0
                    established,
398
0
                    notifications_in_close_acknowledgments,
399
                    ..
400
                },
401
            ) => {
402
0
                if let Some(idx) = notifications_in_close_acknowledgments
403
0
                    .iter()
404
0
                    .position(|s| *s == substream_id)
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages_0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messages_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages_0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messages_0CsiUjFBJteJ7x_17smoldot_full_node
405
0
                {
406
0
                    notifications_in_close_acknowledgments.remove(idx);
407
0
                } else {
408
0
                    established.reject_in_notifications_substream(substream_id);
409
0
                }
410
            }
411
            (
412
                CoordinatorToConnectionInner::CloseInNotifications {
413
0
                    substream_id,
414
0
                    timeout,
415
0
                },
416
0
                SingleStreamConnectionTaskInner::Established {
417
0
                    established,
418
0
                    notifications_in_close_acknowledgments,
419
                    ..
420
                },
421
            ) => {
422
0
                if let Some(idx) = notifications_in_close_acknowledgments
423
0
                    .iter()
424
0
                    .position(|s| *s == substream_id)
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages0_0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messages0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages0_0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messages0_0CsiUjFBJteJ7x_17smoldot_full_node
425
0
                {
426
0
                    notifications_in_close_acknowledgments.remove(idx);
427
0
                } else {
428
0
                    established
429
0
                        .close_in_notifications_substream(substream_id, now.clone() + timeout);
430
0
                }
431
            }
432
            (
433
                CoordinatorToConnectionInner::StartShutdown { .. },
434
                SingleStreamConnectionTaskInner::Established { .. }
435
                | SingleStreamConnectionTaskInner::Handshake { .. },
436
0
            ) => {
437
0
                // TODO: implement proper shutdown
438
0
                self.pending_messages
439
0
                    .push_back(ConnectionToCoordinatorInner::StartShutdown(None));
440
0
                self.pending_messages
441
0
                    .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
442
0
                self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
443
0
                    initiator: ShutdownInitiator::Coordinator,
444
0
                };
445
0
            }
446
            (
447
0
                CoordinatorToConnectionInner::AcceptInbound { .. }
448
0
                | CoordinatorToConnectionInner::RejectInbound { .. }
449
0
                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
450
0
                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
451
0
                | CoordinatorToConnectionInner::RejectInNotifications { .. }
452
0
                | CoordinatorToConnectionInner::CloseInNotifications { .. }
453
0
                | CoordinatorToConnectionInner::StartRequest { .. }
454
0
                | CoordinatorToConnectionInner::AnswerRequest { .. }
455
0
                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
456
0
                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
457
0
                | CoordinatorToConnectionInner::QueueNotification { .. },
458
0
                SingleStreamConnectionTaskInner::Handshake { .. }
459
0
                | SingleStreamConnectionTaskInner::ShutdownAcked { .. },
460
0
            ) => unreachable!(),
461
            (
462
                CoordinatorToConnectionInner::AcceptInbound { .. }
463
                | CoordinatorToConnectionInner::RejectInbound { .. }
464
                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
465
                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
466
                | CoordinatorToConnectionInner::RejectInNotifications { .. }
467
                | CoordinatorToConnectionInner::CloseInNotifications { .. }
468
                | CoordinatorToConnectionInner::StartRequest { .. }
469
                | CoordinatorToConnectionInner::AnswerRequest { .. }
470
                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
471
                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
472
                | CoordinatorToConnectionInner::QueueNotification { .. },
473
                SingleStreamConnectionTaskInner::ShutdownWaitingAck { .. },
474
            )
475
            | (
476
                CoordinatorToConnectionInner::StartShutdown,
477
                SingleStreamConnectionTaskInner::ShutdownWaitingAck {
478
                    initiator: ShutdownInitiator::Api | ShutdownInitiator::Remote,
479
                },
480
0
            ) => {
481
0
                // There might still be some messages coming from the coordinator after the
482
0
                // connection task has sent a message indicating that it has shut down. This is
483
0
                // due to the concurrent nature of the API and doesn't indicate a bug. These
484
0
                // messages are simply ignored by the connection task.
485
0
            }
486
            (
487
                CoordinatorToConnectionInner::ShutdownFinishedAck,
488
0
                SingleStreamConnectionTaskInner::ShutdownWaitingAck { initiator },
489
0
            ) => {
490
0
                self.connection = SingleStreamConnectionTaskInner::ShutdownAcked {
491
0
                    initiator: *initiator,
492
0
                };
493
0
            }
494
            (
495
                CoordinatorToConnectionInner::StartShutdown,
496
                SingleStreamConnectionTaskInner::ShutdownWaitingAck {
497
                    initiator: ShutdownInitiator::Coordinator,
498
                    ..
499
                }
500
                | SingleStreamConnectionTaskInner::ShutdownAcked { .. },
501
0
            ) => unreachable!(),
502
0
            (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
503
0
            (_, SingleStreamConnectionTaskInner::Poisoned) => unreachable!(),
504
        }
505
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE26inject_coordinator_messageB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messageCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE26inject_coordinator_messageB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messageCsiUjFBJteJ7x_17smoldot_full_node
506
507
    /// Sets the state of the connection to "reset".
508
    ///
509
    /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP
510
    /// RST flag.
511
    ///
512
    /// After this function has been called, it is illegal to call
513
    /// [`SingleStreamConnectionTask::read_write`] or [`SingleStreamConnectionTask::reset`] again.
514
    ///
515
    /// Calling this function might have generated messages for the coordinator.
516
    /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in
517
    /// order to process these messages.
518
    ///
519
    /// # Panic
520
    ///
521
    /// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past.
522
    ///
523
0
    pub fn reset(&mut self) {
524
0
        match self.connection {
525
            SingleStreamConnectionTaskInner::ShutdownWaitingAck {
526
                initiator: ShutdownInitiator::Api,
527
            }
528
            | SingleStreamConnectionTaskInner::ShutdownAcked {
529
                initiator: ShutdownInitiator::Api,
530
            } => {
531
                // It is illegal to call `reset` a second time.
532
0
                panic!()
533
            }
534
0
            SingleStreamConnectionTaskInner::ShutdownWaitingAck { ref mut initiator }
535
0
            | SingleStreamConnectionTaskInner::ShutdownAcked { ref mut initiator } => {
536
0
                // Mark the initiator as being the API in order to track proper API usage.
537
0
                *initiator = ShutdownInitiator::Api;
538
0
            }
539
0
            _ => {
540
0
                self.pending_messages
541
0
                    .push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
542
0
                        ShutdownCause::RemoteReset,
543
0
                    )));
544
0
                self.pending_messages
545
0
                    .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
546
0
                self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
547
0
                    initiator: ShutdownInitiator::Api,
548
0
                };
549
0
            }
550
        }
551
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE5resetB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE5resetCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE5resetB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE5resetCsiUjFBJteJ7x_17smoldot_full_node
552
553
    /// Returns `true` if [`SingleStreamConnectionTask::reset`] has been called in the past.
554
0
    pub fn is_reset_called(&self) -> bool {
555
0
        matches!(
556
0
            self.connection,
557
            SingleStreamConnectionTaskInner::ShutdownWaitingAck {
558
                initiator: ShutdownInitiator::Api,
559
            } | SingleStreamConnectionTaskInner::ShutdownAcked {
560
                initiator: ShutdownInitiator::Api,
561
            }
562
        )
563
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE15is_reset_calledB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE15is_reset_calledCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE15is_reset_calledB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE15is_reset_calledCsiUjFBJteJ7x_17smoldot_full_node
564
565
    /// Reads data coming from the connection, updates the internal state machine, and writes data
566
    /// destined to the connection through the [`ReadWrite`].
567
    ///
568
    /// Calling this function might have generated messages for the coordinator.
569
    /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in
570
    /// order to process these messages.
571
    ///
572
    /// # Panic
573
    ///
574
    /// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past.
575
    ///
576
0
    pub fn read_write(&mut self, read_write: &'_ mut ReadWrite<TNow>) {
577
0
        // There is already at least one pending message. We back-pressure the connection by not
578
0
        // performing any reading or writing, as this might generate more messages and open the
579
0
        // door for a DoS attack by the remote. As documented, the API user is supposed to pull
580
0
        // messages after this function has returned, meaning that they will drain the messages.
581
0
        if !self.pending_messages.is_empty() {
582
0
            return;
583
0
        }
584
0
585
0
        match mem::replace(
586
0
            &mut self.connection,
587
0
            SingleStreamConnectionTaskInner::Poisoned,
588
        ) {
589
            SingleStreamConnectionTaskInner::Established {
590
0
                established,
591
0
                mut outbound_substreams_map,
592
0
                mut notifications_in_close_acknowledgments,
593
0
                mut inbound_negotiated_cancel_acknowledgments,
594
0
            } => match established.read_write(read_write) {
595
0
                Ok((connection, event)) => {
596
0
                    if read_write.is_dead() && read_write.wake_up_after.is_none() {
597
0
                        self.pending_messages.push_back(
598
0
                            ConnectionToCoordinatorInner::StartShutdown(Some(
599
0
                                ShutdownCause::CleanShutdown,
600
0
                            )),
601
0
                        );
602
0
                        self.pending_messages
603
0
                            .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
604
0
                        self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
605
0
                            initiator: ShutdownInitiator::Remote,
606
0
                        };
607
0
                        return;
608
0
                    }
609
610
0
                    match event {
611
                        Some(established::Event::NewOutboundSubstreamsForbidden) => {
612
                            // TODO: handle properly
613
0
                            self.pending_messages.push_back(
614
0
                                ConnectionToCoordinatorInner::StartShutdown(Some(
615
0
                                    ShutdownCause::CleanShutdown,
616
0
                                )),
617
0
                            );
618
0
                            self.pending_messages
619
0
                                .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
620
0
                            self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
621
0
                                initiator: ShutdownInitiator::Remote,
622
0
                            };
623
0
                            return;
624
                        }
625
0
                        Some(established::Event::InboundError(err)) => {
626
0
                            self.pending_messages
627
0
                                .push_back(ConnectionToCoordinatorInner::InboundError(err));
628
0
                        }
629
0
                        Some(established::Event::InboundNegotiated { id, protocol_name }) => {
630
0
                            self.pending_messages.push_back(
631
0
                                ConnectionToCoordinatorInner::InboundNegotiated {
632
0
                                    id,
633
0
                                    protocol_name,
634
0
                                },
635
0
                            );
636
0
                        }
637
0
                        Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
638
0
                            inbound_negotiated_cancel_acknowledgments.insert(id);
639
0
                        }
640
0
                        Some(established::Event::InboundAcceptedCancel { id, .. }) => {
641
0
                            self.pending_messages.push_back(
642
0
                                ConnectionToCoordinatorInner::InboundAcceptedCancel { id },
643
0
                            );
644
0
                        }
645
0
                        Some(established::Event::RequestIn { id, request, .. }) => {
646
0
                            self.pending_messages
647
0
                                .push_back(ConnectionToCoordinatorInner::RequestIn { id, request });
648
0
                        }
649
                        Some(established::Event::Response {
650
0
                            response,
651
0
                            user_data,
652
                            ..
653
0
                        }) => {
654
0
                            let Some(outer_substream_id) = user_data else {
655
0
                                panic!()
656
                            };
657
0
                            outbound_substreams_map.remove(&outer_substream_id).unwrap();
658
0
                            self.pending_messages.push_back(
659
0
                                ConnectionToCoordinatorInner::Response {
660
0
                                    response,
661
0
                                    id: outer_substream_id,
662
0
                                },
663
0
                            );
664
                        }
665
0
                        Some(established::Event::NotificationsInOpen { id, handshake, .. }) => {
666
0
                            self.pending_messages.push_back(
667
0
                                ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake },
668
0
                            );
669
0
                        }
670
0
                        Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
671
0
                            notifications_in_close_acknowledgments.push_back(id);
672
0
                            self.pending_messages.push_back(
673
0
                                ConnectionToCoordinatorInner::NotificationsInOpenCancel { id },
674
0
                            );
675
0
                        }
676
0
                        Some(established::Event::NotificationIn { id, notification }) => {
677
0
                            self.pending_messages.push_back(
678
0
                                ConnectionToCoordinatorInner::NotificationIn { id, notification },
679
0
                            );
680
0
                        }
681
0
                        Some(established::Event::NotificationsInClose { id, outcome, .. }) => {
682
0
                            // TODO: only do this if not sent by API user
683
0
                            notifications_in_close_acknowledgments.push_back(id);
684
0
                            self.pending_messages.push_back(
685
0
                                ConnectionToCoordinatorInner::NotificationsInClose { id, outcome },
686
0
                            );
687
0
                        }
688
0
                        Some(established::Event::NotificationsOutResult { id, result }) => {
689
0
                            let (outer_substream_id, result) = match result {
690
0
                                Ok(r) => {
691
0
                                    let Some(outer_substream_id) = connection[id] else {
692
0
                                        panic!()
693
                                    };
694
0
                                    (outer_substream_id, Ok(r))
695
                                }
696
0
                                Err((err, ud)) => {
697
0
                                    let Some(outer_substream_id) = ud else {
698
0
                                        panic!()
699
                                    };
700
0
                                    outbound_substreams_map.remove(&outer_substream_id);
701
0
                                    (outer_substream_id, Err(NotificationsOutErr::Substream(err)))
702
                                }
703
                            };
704
705
0
                            self.pending_messages.push_back(
706
0
                                ConnectionToCoordinatorInner::NotificationsOutResult {
707
0
                                    id: outer_substream_id,
708
0
                                    result,
709
0
                                },
710
0
                            );
711
                        }
712
0
                        Some(established::Event::NotificationsOutCloseDemanded { id }) => {
713
0
                            let Some(outer_substream_id) = connection[id] else {
714
0
                                panic!()
715
                            };
716
0
                            self.pending_messages.push_back(
717
0
                                ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
718
0
                                    id: outer_substream_id,
719
0
                                },
720
0
                            );
721
                        }
722
0
                        Some(established::Event::NotificationsOutReset { user_data, .. }) => {
723
0
                            let Some(outer_substream_id) = user_data else {
724
0
                                panic!()
725
                            };
726
0
                            outbound_substreams_map.remove(&outer_substream_id);
727
0
                            self.pending_messages.push_back(
728
0
                                ConnectionToCoordinatorInner::NotificationsOutReset {
729
0
                                    id: outer_substream_id,
730
0
                                },
731
0
                            );
732
                        }
733
0
                        Some(established::Event::PingOutSuccess { ping_time }) => {
734
0
                            self.pending_messages.push_back(
735
0
                                ConnectionToCoordinatorInner::PingOutSuccess { ping_time },
736
0
                            );
737
0
                        }
738
0
                        Some(established::Event::PingOutFailed) => {
739
0
                            self.pending_messages
740
0
                                .push_back(ConnectionToCoordinatorInner::PingOutFailed);
741
0
                        }
742
0
                        None => {}
743
                    }
744
745
0
                    self.connection = SingleStreamConnectionTaskInner::Established {
746
0
                        established: connection,
747
0
                        outbound_substreams_map,
748
0
                        notifications_in_close_acknowledgments:
749
0
                            notifications_in_close_acknowledgments,
750
0
                        inbound_negotiated_cancel_acknowledgments,
751
0
                    };
752
                }
753
0
                Err(err) => {
754
0
                    self.pending_messages
755
0
                        .push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
756
0
                            ShutdownCause::ProtocolError(err),
757
0
                        )));
758
0
                    self.pending_messages
759
0
                        .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
760
0
                    self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
761
0
                        initiator: ShutdownInitiator::Remote,
762
0
                    };
763
0
                }
764
            },
765
766
            SingleStreamConnectionTaskInner::Handshake {
767
0
                mut handshake,
768
0
                randomness_seed,
769
0
                timeout,
770
0
                max_inbound_substreams,
771
0
                substreams_capacity,
772
0
                max_protocol_name_len,
773
0
                ping_protocol,
774
0
            } => {
775
0
                // Check that the handshake isn't taking too long.
776
0
                //
777
0
                // Note that we check this condition before looking into the incoming data,
778
0
                // and it is possible for the buffers to contain the data that leads to the
779
0
                // handshake being finished. If that is the case, however, it is impossible to
780
0
                // know whether this data arrived before or after the timeout.
781
0
                // Whether to put this check before or after reading the buffer is a choice
782
0
                // between having false negatives or having false positives for the timeout.
783
0
                // We are more strict than necessary by having the check before, but this
784
0
                // guarantees that no horrendously slow connections can accidentally make their
785
0
                // way through.
786
0
                if timeout < read_write.now {
787
0
                    self.pending_messages
788
0
                        .push_back(ConnectionToCoordinatorInner::StartShutdown(Some(
789
0
                            ShutdownCause::HandshakeTimeout,
790
0
                        )));
791
0
                    self.pending_messages
792
0
                        .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
793
0
                    self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
794
0
                        initiator: ShutdownInitiator::Remote,
795
0
                    };
796
0
                    return;
797
0
                }
798
799
0
                loop {
800
0
                    let (read_before, written_before) =
801
0
                        (read_write.read_bytes, read_write.write_bytes_queued);
802
803
0
                    let result = match handshake.read_write(read_write) {
804
0
                        Ok(rw) => rw,
805
0
                        Err(err) => {
806
0
                            self.pending_messages.push_back(
807
0
                                ConnectionToCoordinatorInner::StartShutdown(Some(
808
0
                                    ShutdownCause::HandshakeError(err),
809
0
                                )),
810
0
                            );
811
0
                            self.pending_messages
812
0
                                .push_back(ConnectionToCoordinatorInner::ShutdownFinished);
813
0
                            self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck {
814
0
                                initiator: ShutdownInitiator::Remote,
815
0
                            };
816
0
                            return;
817
                        }
818
                    };
819
820
0
                    match result {
821
0
                        single_stream_handshake::Handshake::Healthy(updated_handshake)
822
0
                            if (read_before, written_before)
823
0
                                == (read_write.read_bytes, read_write.write_bytes_queued) =>
824
0
                        {
825
0
                            // `read_write()` should be called again as soon as possible
826
0
                            // after `timeout`.
827
0
                            read_write.wake_up_after(&timeout);
828
0
                            self.connection = SingleStreamConnectionTaskInner::Handshake {
829
0
                                handshake: updated_handshake,
830
0
                                randomness_seed,
831
0
                                timeout,
832
0
                                max_inbound_substreams,
833
0
                                substreams_capacity,
834
0
                                max_protocol_name_len,
835
0
                                ping_protocol,
836
0
                            };
837
0
                            break;
838
                        }
839
0
                        single_stream_handshake::Handshake::Healthy(updated_handshake) => {
840
0
                            handshake = updated_handshake;
841
0
                        }
842
                        single_stream_handshake::Handshake::Success {
843
0
                            remote_peer_id,
844
0
                            connection,
845
0
                        } => {
846
0
                            self.pending_messages.push_back(
847
0
                                ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id),
848
0
                            );
849
0
850
0
                            self.connection = SingleStreamConnectionTaskInner::Established {
851
0
                                established: connection.into_connection(established::Config {
852
0
                                    max_inbound_substreams,
853
0
                                    substreams_capacity,
854
0
                                    max_protocol_name_len,
855
0
                                    randomness_seed,
856
0
                                    ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
857
0
                                    ping_interval: Duration::from_secs(20),   // TODO: hardcoded
858
0
                                    ping_timeout: Duration::from_secs(10),    // TODO: hardcoded
859
0
                                    first_out_ping: read_write.now.clone() + Duration::from_secs(2), // TODO: hardcoded
860
0
                                }),
861
0
                                outbound_substreams_map:
862
0
                                    hashbrown::HashMap::with_capacity_and_hasher(
863
0
                                        0,
864
0
                                        Default::default(),
865
0
                                    ), // TODO: capacity?
866
0
                                notifications_in_close_acknowledgments: VecDeque::with_capacity(4),
867
0
                                inbound_negotiated_cancel_acknowledgments:
868
0
                                    hashbrown::HashSet::with_capacity_and_hasher(
869
0
                                        2,
870
0
                                        Default::default(),
871
0
                                    ),
872
0
                            };
873
0
                            break;
874
                        }
875
                    }
876
                }
877
            }
878
879
0
            c @ (SingleStreamConnectionTaskInner::ShutdownWaitingAck {
880
                initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
881
            }
882
            | SingleStreamConnectionTaskInner::ShutdownAcked {
883
                initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote,
884
0
            }) => {
885
0
                // The user might legitimately call this function after the connection has
886
0
                // already shut down. In that case, just do nothing.
887
0
                self.connection = c;
888
0
889
0
                // This might have been done already during the shutdown process, but we do it
890
0
                // again just in case.
891
0
                read_write.close_write();
892
0
            }
893
894
            SingleStreamConnectionTaskInner::ShutdownWaitingAck {
895
                initiator: ShutdownInitiator::Api,
896
            }
897
            | SingleStreamConnectionTaskInner::ShutdownAcked {
898
                initiator: ShutdownInitiator::Api,
899
            } => {
900
                // As documented, it is illegal to call this function after calling `reset()`.
901
0
                panic!()
902
            }
903
904
0
            SingleStreamConnectionTaskInner::Poisoned => unreachable!(),
905
        }
906
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE10read_writeB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE10read_writeB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_writeCsiUjFBJteJ7x_17smoldot_full_node
907
}