Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/collection/multi_stream.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use super::{
19
    super::{
20
        connection::{established, noise, webrtc_framing},
21
        read_write::ReadWrite,
22
    },
23
    ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection,
24
    CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate,
25
    SubstreamId,
26
};
27
28
use alloc::{collections::VecDeque, string::ToString as _, sync::Arc};
29
use core::{
30
    hash::Hash,
31
    ops::{Add, Sub},
32
    time::Duration,
33
};
34
35
/// State machine dedicated to a single multi-stream connection.
36
pub struct MultiStreamConnectionTask<TNow, TSubId> {
37
    connection: MultiStreamConnectionTaskInner<TNow, TSubId>,
38
}
39
enum MultiStreamConnectionTaskInner<TNow, TSubId> {
40
    /// Connection is still in its handshake phase.
41
    Handshake {
42
        /// Substream that has been opened to perform the handshake, if any.
43
        opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>,
44
45
        /// Noise handshake in progress. Always `Some`, except to be temporarily extracted.
46
        handshake: Option<noise::HandshakeInProgress>,
47
48
        /// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`],
49
        /// that have been opened. For each substream, contains a boolean indicating whether the
50
        /// substream is outbound (`true`) or inbound (`false`).
51
        ///
52
        /// Due to the asynchronous nature of the protocol, it is not a logic error to open
53
        /// additional substreams before the handshake has finished. The remote might think that
54
        /// the handshake has finished while the local node hasn't finished processing it yet.
55
        ///
56
        /// These substreams aren't processed as long as the handshake hasn't finished. It is,
57
        /// however, important to remember that substreams have been opened.
58
        extra_open_substreams: hashbrown::HashMap<TSubId, bool, fnv::FnvBuildHasher>,
59
60
        /// State machine used once the connection has been established. Unused during the
61
        /// handshake, but created ahead of time. Always `Some`, except to be temporarily
62
        /// extracted.
63
        established: Option<established::MultiStream<TNow, TSubId, Option<SubstreamId>>>,
64
    },
65
66
    /// Connection has been fully established.
67
    Established {
68
        established: established::MultiStream<TNow, TSubId, Option<SubstreamId>>,
69
70
        /// If `Some`, contains the substream that was used for the handshake. This substream
71
        /// is meant to be closed as soon as possible.
72
        handshake_substream: Option<TSubId>,
73
74
        /// If `Some`, then no `HandshakeFinished` message has been sent back yet.
75
        handshake_finished_message_to_send: Option<PeerId>,
76
77
        /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping
78
        /// of the "outer ids" to "inner ids".
79
        outbound_substreams_map:
80
            hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>,
81
82
        /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a
83
        /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an
84
        /// entry is added to this list. If the coordinator accepts or refuses a substream in this
85
        /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed.
86
        // TODO: this works only because SubstreamIds aren't reused
87
        notifications_in_close_acknowledgments:
88
            hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>,
89
90
        /// Messages about inbound accept cancellations to send back.
91
        inbound_accept_cancel_events: VecDeque<established::SubstreamId>,
92
    },
93
94
    /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`]
95
    /// message has been sent and is waiting to be acknowledged.
96
    ShutdownWaitingAck {
97
        /// What has initiated the shutdown.
98
        initiator: ShutdownInitiator,
99
100
        /// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already
101
        /// been sent to the coordinator. `Some` if the message hasn't been sent yet.
102
        start_shutdown_message_to_send: Option<Option<ShutdownCause>>,
103
104
        /// `true` if the [`ConnectionToCoordinatorInner::ShutdownFinished`] message has already
105
        /// been sent to the coordinator.
106
        shutdown_finish_message_sent: bool,
107
    },
108
109
    /// Connection has finished its shutdown and its shutdown has been acknowledged. There is
110
    /// nothing more to do except stop the connection task.
111
    ShutdownAcked {
112
        /// What has initiated the shutdown.
113
        initiator: ShutdownInitiator,
114
    },
115
}
116
117
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
118
enum ShutdownInitiator {
119
    /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message.
120
    Coordinator,
121
    /// [`MultiStreamConnectionTask::reset`] has been called.
122
    Api,
123
}
124
125
impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId>
126
where
127
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
128
    TSubId: Clone + PartialEq + Eq + Hash,
129
{
130
    // Note that the parameters of this function are a bit rough and undocumented, as this is
131
    // a function only called from the parent module.
132
0
    pub(super) fn new(
133
0
        randomness_seed: [u8; 32],
134
0
        when_connection_start: TNow,
135
0
        handshake: noise::HandshakeInProgress,
136
0
        max_inbound_substreams: usize,
137
0
        substreams_capacity: usize,
138
0
        max_protocol_name_len: usize,
139
0
        ping_protocol: Arc<str>,
140
0
    ) -> Self {
141
0
        MultiStreamConnectionTask {
142
0
            connection: MultiStreamConnectionTaskInner::Handshake {
143
0
                // TODO: the handshake doesn't have a timeout
144
0
                handshake: Some(handshake),
145
0
                opened_substream: None,
146
0
                extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher(
147
0
                    0,
148
0
                    Default::default(),
149
0
                ),
150
0
                established: Some(established::MultiStream::webrtc(established::Config {
151
0
                    max_inbound_substreams,
152
0
                    substreams_capacity,
153
0
                    max_protocol_name_len,
154
0
                    randomness_seed,
155
0
                    ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/
156
0
                    ping_interval: Duration::from_secs(20),   // TODO: hardcoded
157
0
                    ping_timeout: Duration::from_secs(10),    // TODO: hardcoded
158
0
                    first_out_ping: when_connection_start, // TODO: only start the ping after the Noise handshake has ended
159
0
                })),
160
0
            },
161
0
        }
162
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE3newB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE3newB8_
163
164
    /// Pulls a message to send back to the coordinator.
165
    ///
166
    /// This function takes ownership of `self` and optionally yields it back. If the first
167
    /// option contains `None`, then no more message will be generated and the
168
    /// [`MultiStreamConnectionTask`] has vanished. This will happen after the connection has been
169
    /// shut down or reset.
170
    /// It is possible for `self` to not be yielded back even if substreams are still open, in
171
    /// which case the API user should abruptly reset the connection, for example by sending a
172
    /// TCP RST flag.
173
    ///
174
    /// If any message is returned, it is the responsibility of the API user to send it to the
175
    /// coordinator.
176
    /// Do not attempt to buffer the message being returned, as it would work against the
177
    /// back-pressure strategy used internally. As soon as a message is returned, it should be
178
    /// delivered. If the coordinator is busy at the moment a message should be delivered, then
179
    /// the entire thread of execution dedicated to this [`MultiStreamConnectionTask`] should be
180
    /// paused until the coordinator is ready and the message delivered.
181
    ///
182
    /// Messages aren't generated spontaneously. In other words, you don't need to periodically
183
    /// call this function just in case there's a new message. Messages are always generated after
184
    /// [`MultiStreamConnectionTask::substream_read_write`],
185
    /// [`MultiStreamConnectionTask::add_substream`], or [`MultiStreamConnectionTask::reset`]
186
    /// has been called. Multiple messages can happen in a row.
187
    ///
188
    /// Because this function frees space in a buffer, processing substreams again after it
189
    /// has returned might read/write more data and generate an event again. In other words,
190
    /// the API user should call [`MultiStreamConnectionTask::substream_read_write`] and
191
    /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no
192
    /// more message is generated.
193
0
    pub fn pull_message_to_coordinator(
194
0
        mut self,
195
0
    ) -> (Option<Self>, Option<ConnectionToCoordinator>) {
196
0
        match &mut self.connection {
197
0
            MultiStreamConnectionTaskInner::Handshake { .. } => (Some(self), None),
198
            MultiStreamConnectionTaskInner::Established {
199
0
                established,
200
0
                outbound_substreams_map,
201
0
                handshake_finished_message_to_send,
202
0
                notifications_in_close_acknowledgments,
203
0
                inbound_accept_cancel_events,
204
                ..
205
            } => {
206
0
                if let Some(remote_peer_id) = handshake_finished_message_to_send.take() {
207
0
                    return (
208
0
                        Some(self),
209
0
                        Some(ConnectionToCoordinator {
210
0
                            inner: ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id),
211
0
                        }),
212
0
                    );
213
0
                }
214
215
0
                if let Some(substream_id) = inbound_accept_cancel_events.pop_front() {
216
0
                    return (
217
0
                        Some(self),
218
0
                        Some(ConnectionToCoordinator {
219
0
                            inner: ConnectionToCoordinatorInner::InboundAcceptedCancel {
220
0
                                id: substream_id,
221
0
                            },
222
0
                        }),
223
0
                    );
224
0
                }
225
226
0
                let event = match established.pull_event() {
227
                    Some(established::Event::NewOutboundSubstreamsForbidden) => {
228
                        // TODO: handle properly
229
0
                        self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
230
0
                            start_shutdown_message_to_send: Some(None),
231
0
                            shutdown_finish_message_sent: false,
232
0
                            initiator: ShutdownInitiator::Coordinator,
233
0
                        };
234
0
                        Some(ConnectionToCoordinatorInner::StartShutdown(None))
235
                    }
236
0
                    Some(established::Event::InboundError(err)) => {
237
0
                        Some(ConnectionToCoordinatorInner::InboundError(err))
238
                    }
239
0
                    Some(established::Event::InboundNegotiated { id, protocol_name }) => {
240
0
                        Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name })
241
                    }
242
0
                    Some(established::Event::InboundNegotiatedCancel { id, .. }) => {
243
0
                        notifications_in_close_acknowledgments.insert(id);
244
0
                        None
245
                    }
246
0
                    Some(established::Event::InboundAcceptedCancel { id, .. }) => {
247
0
                        Some(ConnectionToCoordinatorInner::InboundAcceptedCancel { id })
248
                    }
249
0
                    Some(established::Event::RequestIn { id, request, .. }) => {
250
0
                        Some(ConnectionToCoordinatorInner::RequestIn { id, request })
251
                    }
252
                    Some(established::Event::Response {
253
0
                        response,
254
0
                        user_data,
255
                        ..
256
                    }) => {
257
0
                        let Some(outer_substream_id) = user_data else {
258
0
                            panic!()
259
                        };
260
0
                        outbound_substreams_map.remove(&outer_substream_id).unwrap();
261
0
                        Some(ConnectionToCoordinatorInner::Response {
262
0
                            response,
263
0
                            id: outer_substream_id,
264
0
                        })
265
                    }
266
0
                    Some(established::Event::NotificationsInOpen { id, handshake, .. }) => {
267
0
                        Some(ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake })
268
                    }
269
0
                    Some(established::Event::NotificationsInOpenCancel { id, .. }) => {
270
0
                        notifications_in_close_acknowledgments.insert(id);
271
0
                        Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id })
272
                    }
273
0
                    Some(established::Event::NotificationIn { id, notification }) => {
274
0
                        Some(ConnectionToCoordinatorInner::NotificationIn { id, notification })
275
                    }
276
0
                    Some(established::Event::NotificationsInClose { id, outcome, .. }) => {
277
0
                        notifications_in_close_acknowledgments.insert(id);
278
0
                        Some(ConnectionToCoordinatorInner::NotificationsInClose { id, outcome })
279
                    }
280
0
                    Some(established::Event::NotificationsOutResult { id, result }) => {
281
0
                        let (outer_substream_id, result) = match result {
282
0
                            Ok(r) => {
283
0
                                let Some(outer_substream_id) = established[id] else {
284
0
                                    panic!()
285
                                };
286
0
                                (outer_substream_id, Ok(r))
287
                            }
288
0
                            Err((err, ud)) => {
289
0
                                let Some(outer_substream_id) = ud else {
290
0
                                    panic!()
291
                                };
292
0
                                outbound_substreams_map.remove(&outer_substream_id);
293
0
                                (outer_substream_id, Err(NotificationsOutErr::Substream(err)))
294
                            }
295
                        };
296
297
0
                        Some(ConnectionToCoordinatorInner::NotificationsOutResult {
298
0
                            id: outer_substream_id,
299
0
                            result,
300
0
                        })
301
                    }
302
0
                    Some(established::Event::NotificationsOutCloseDemanded { id }) => {
303
0
                        let Some(outer_substream_id) = established[id] else {
304
0
                            panic!()
305
                        };
306
0
                        Some(
307
0
                            ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
308
0
                                id: outer_substream_id,
309
0
                            },
310
0
                        )
311
                    }
312
0
                    Some(established::Event::NotificationsOutReset { user_data, .. }) => {
313
0
                        let Some(outer_substream_id) = user_data else {
314
0
                            panic!()
315
                        };
316
0
                        outbound_substreams_map.remove(&outer_substream_id);
317
0
                        Some(ConnectionToCoordinatorInner::NotificationsOutReset {
318
0
                            id: outer_substream_id,
319
0
                        })
320
                    }
321
0
                    Some(established::Event::PingOutSuccess { ping_time }) => {
322
0
                        Some(ConnectionToCoordinatorInner::PingOutSuccess { ping_time })
323
                    }
324
                    Some(established::Event::PingOutFailed) => {
325
0
                        Some(ConnectionToCoordinatorInner::PingOutFailed)
326
                    }
327
0
                    None => None,
328
                };
329
330
0
                (
331
0
                    Some(self),
332
0
                    event.map(|ev| ConnectionToCoordinator { inner: ev }),
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE27pull_message_to_coordinator0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27pull_message_to_coordinator0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE27pull_message_to_coordinator0Ba_
333
0
                )
334
            }
335
            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
336
0
                start_shutdown_message_to_send,
337
0
                shutdown_finish_message_sent,
338
                ..
339
            } => {
340
0
                if let Some(reason) = start_shutdown_message_to_send.take() {
341
0
                    debug_assert!(!*shutdown_finish_message_sent);
342
0
                    (
343
0
                        Some(self),
344
0
                        Some(ConnectionToCoordinator {
345
0
                            inner: ConnectionToCoordinatorInner::StartShutdown(reason),
346
0
                        }),
347
0
                    )
348
0
                } else if !*shutdown_finish_message_sent {
349
0
                    debug_assert!(start_shutdown_message_to_send.is_none());
350
0
                    *shutdown_finish_message_sent = true;
351
0
                    (
352
0
                        Some(self),
353
0
                        Some(ConnectionToCoordinator {
354
0
                            inner: ConnectionToCoordinatorInner::ShutdownFinished,
355
0
                        }),
356
0
                    )
357
                } else {
358
0
                    (Some(self), None)
359
                }
360
            }
361
0
            MultiStreamConnectionTaskInner::ShutdownAcked { .. } => (None, None),
362
        }
363
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27pull_message_to_coordinatorB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27pull_message_to_coordinatorCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27pull_message_to_coordinatorB8_
364
365
    /// Injects a message that has been pulled from the coordinator.
366
    ///
367
    /// Calling this function might generate data to send to the connection. You should call
368
    /// [`MultiStreamConnectionTask::desired_outbound_substreams`] and
369
    /// [`MultiStreamConnectionTask::substream_read_write`] after this function has returned.
370
0
    pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) {
371
0
        match (message.inner, &mut self.connection) {
372
            (
373
                CoordinatorToConnectionInner::AcceptInbound {
374
0
                    substream_id,
375
0
                    inbound_ty,
376
0
                },
377
0
                MultiStreamConnectionTaskInner::Established {
378
0
                    established,
379
0
                    notifications_in_close_acknowledgments,
380
0
                    inbound_accept_cancel_events,
381
0
                    ..
382
0
                },
383
0
            ) => {
384
0
                if !notifications_in_close_acknowledgments.remove(&substream_id) {
385
0
                    established.accept_inbound(substream_id, inbound_ty, None);
386
0
                } else {
387
0
                    inbound_accept_cancel_events.push_back(substream_id)
388
                }
389
            }
390
            (
391
0
                CoordinatorToConnectionInner::RejectInbound { substream_id },
392
0
                MultiStreamConnectionTaskInner::Established {
393
0
                    established,
394
0
                    notifications_in_close_acknowledgments,
395
0
                    ..
396
0
                },
397
0
            ) => {
398
0
                if !notifications_in_close_acknowledgments.remove(&substream_id) {
399
0
                    established.reject_inbound(substream_id);
400
0
                }
401
            }
402
            (
403
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
404
0
                MultiStreamConnectionTaskInner::Handshake {
405
0
                    established: Some(established),
406
                    ..
407
                }
408
0
                | MultiStreamConnectionTaskInner::Established { established, .. },
409
0
            ) => {
410
0
                established.set_max_protocol_name_len(new_max_length);
411
0
            }
412
            (
413
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. },
414
                MultiStreamConnectionTaskInner::Handshake {
415
                    established: None, ..
416
                },
417
            ) => {
418
0
                unreachable!()
419
            }
420
            (
421
                CoordinatorToConnectionInner::StartRequest {
422
0
                    protocol_name,
423
0
                    request_data,
424
0
                    timeout,
425
0
                    max_response_size,
426
0
                    substream_id,
427
0
                },
428
0
                MultiStreamConnectionTaskInner::Established {
429
0
                    established,
430
0
                    outbound_substreams_map,
431
0
                    ..
432
0
                },
433
0
            ) => {
434
0
                let inner_substream_id = established.add_request(
435
0
                    protocol_name,
436
0
                    request_data,
437
0
                    now.clone() + timeout,
438
0
                    max_response_size,
439
0
                    Some(substream_id),
440
0
                );
441
0
                let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id);
442
0
                debug_assert!(_prev_value.is_none());
443
            }
444
            (
445
                CoordinatorToConnectionInner::OpenOutNotifications {
446
0
                    max_handshake_size,
447
0
                    protocol_name,
448
0
                    handshake,
449
0
                    handshake_timeout,
450
0
                    substream_id: outer_substream_id,
451
0
                },
452
0
                MultiStreamConnectionTaskInner::Established {
453
0
                    established,
454
0
                    outbound_substreams_map,
455
0
                    ..
456
0
                },
457
0
            ) => {
458
0
                let inner_substream_id = established.open_notifications_substream(
459
0
                    protocol_name,
460
0
                    max_handshake_size,
461
0
                    handshake,
462
0
                    now.clone() + handshake_timeout,
463
0
                    Some(outer_substream_id),
464
0
                );
465
0
466
0
                let _prev_value =
467
0
                    outbound_substreams_map.insert(outer_substream_id, inner_substream_id);
468
0
                debug_assert!(_prev_value.is_none());
469
            }
470
            (
471
0
                CoordinatorToConnectionInner::CloseOutNotifications { substream_id },
472
0
                MultiStreamConnectionTaskInner::Established {
473
0
                    established,
474
0
                    outbound_substreams_map,
475
                    ..
476
                },
477
            ) => {
478
                // It is possible that the remote has closed the outbound notification substream
479
                // while the `CloseOutNotifications` message was being delivered, or that the API
480
                // user close the substream before the message about the substream being closed
481
                // was delivered to the coordinator.
482
0
                if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) {
483
0
                    established.close_out_notifications_substream(inner_substream_id);
484
0
                }
485
            }
486
            (
487
                CoordinatorToConnectionInner::QueueNotification {
488
0
                    substream_id,
489
0
                    notification,
490
0
                },
491
0
                MultiStreamConnectionTaskInner::Established {
492
0
                    established,
493
0
                    outbound_substreams_map,
494
                    ..
495
                },
496
            ) => {
497
                // It is possible that the remote has closed the outbound notification substream
498
                // while a `QueueNotification` message was being delivered, or that the API user
499
                // queued a notification before the message about the substream being closed was
500
                // delivered to the coordinator.
501
                // If that happens, we intentionally silently discard the message, causing the
502
                // notification to not be sent. This is consistent with the guarantees about
503
                // notifications delivered that are documented in the public API.
504
0
                if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) {
505
0
                    established.write_notification_unbounded(*inner_substream_id, notification);
506
0
                }
507
            }
508
            (
509
                CoordinatorToConnectionInner::AnswerRequest {
510
0
                    substream_id,
511
0
                    response,
512
0
                },
513
0
                MultiStreamConnectionTaskInner::Established { established, .. },
514
0
            ) => match established.respond_in_request(substream_id, response) {
515
0
                Ok(()) => {}
516
0
                Err(established::RespondInRequestError::SubstreamClosed) => {
517
0
                    // As documented, answering an obsolete request is simply ignored.
518
0
                }
519
            },
520
            (
521
                CoordinatorToConnectionInner::AcceptInNotifications {
522
0
                    substream_id,
523
0
                    handshake,
524
0
                    max_notification_size,
525
0
                },
526
0
                MultiStreamConnectionTaskInner::Established {
527
0
                    established,
528
0
                    notifications_in_close_acknowledgments,
529
0
                    ..
530
0
                },
531
0
            ) => {
532
0
                if !notifications_in_close_acknowledgments.remove(&substream_id) {
533
0
                    established.accept_in_notifications_substream(
534
0
                        substream_id,
535
0
                        handshake,
536
0
                        max_notification_size,
537
0
                    );
538
0
                }
539
            }
540
            (
541
0
                CoordinatorToConnectionInner::RejectInNotifications { substream_id },
542
0
                MultiStreamConnectionTaskInner::Established {
543
0
                    established,
544
0
                    notifications_in_close_acknowledgments,
545
0
                    ..
546
0
                },
547
0
            ) => {
548
0
                if !notifications_in_close_acknowledgments.remove(&substream_id) {
549
0
                    established.reject_in_notifications_substream(substream_id);
550
0
                }
551
            }
552
            (
553
                CoordinatorToConnectionInner::CloseInNotifications {
554
0
                    substream_id,
555
0
                    timeout,
556
0
                },
557
0
                MultiStreamConnectionTaskInner::Established {
558
0
                    established,
559
0
                    notifications_in_close_acknowledgments,
560
0
                    ..
561
0
                },
562
0
            ) => {
563
0
                if !notifications_in_close_acknowledgments.remove(&substream_id) {
564
0
                    established
565
0
                        .close_in_notifications_substream(substream_id, now.clone() + timeout);
566
0
                }
567
            }
568
            (
569
                CoordinatorToConnectionInner::StartShutdown { .. },
570
                MultiStreamConnectionTaskInner::Handshake { .. }
571
                | MultiStreamConnectionTaskInner::Established { .. },
572
0
            ) => {
573
0
                // TODO: implement proper shutdown
574
0
                self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
575
0
                    start_shutdown_message_to_send: Some(None),
576
0
                    shutdown_finish_message_sent: false,
577
0
                    initiator: ShutdownInitiator::Coordinator,
578
0
                };
579
0
            }
580
            (
581
0
                CoordinatorToConnectionInner::AcceptInbound { .. }
582
0
                | CoordinatorToConnectionInner::RejectInbound { .. }
583
0
                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
584
0
                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
585
0
                | CoordinatorToConnectionInner::RejectInNotifications { .. }
586
0
                | CoordinatorToConnectionInner::CloseInNotifications { .. }
587
0
                | CoordinatorToConnectionInner::StartRequest { .. }
588
0
                | CoordinatorToConnectionInner::AnswerRequest { .. }
589
0
                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
590
0
                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
591
0
                | CoordinatorToConnectionInner::QueueNotification { .. },
592
0
                MultiStreamConnectionTaskInner::Handshake { .. }
593
0
                | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
594
0
            ) => unreachable!(),
595
            (
596
                CoordinatorToConnectionInner::AcceptInbound { .. }
597
                | CoordinatorToConnectionInner::RejectInbound { .. }
598
                | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }
599
                | CoordinatorToConnectionInner::AcceptInNotifications { .. }
600
                | CoordinatorToConnectionInner::RejectInNotifications { .. }
601
                | CoordinatorToConnectionInner::CloseInNotifications { .. }
602
                | CoordinatorToConnectionInner::StartRequest { .. }
603
                | CoordinatorToConnectionInner::AnswerRequest { .. }
604
                | CoordinatorToConnectionInner::OpenOutNotifications { .. }
605
                | CoordinatorToConnectionInner::CloseOutNotifications { .. }
606
                | CoordinatorToConnectionInner::QueueNotification { .. },
607
                MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. },
608
            )
609
            | (
610
                CoordinatorToConnectionInner::StartShutdown,
611
                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
612
                    initiator: ShutdownInitiator::Api,
613
                    ..
614
                },
615
0
            ) => {
616
0
                // There might still be some messages coming from the coordinator after the
617
0
                // connection task has sent a message indicating that it has shut down. This is
618
0
                // due to the concurrent nature of the API and doesn't indicate a bug. These
619
0
                // messages are simply ignored by the connection task.
620
0
            }
621
            (
622
                CoordinatorToConnectionInner::ShutdownFinishedAck,
623
                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
624
0
                    start_shutdown_message_to_send: start_shutdown_message_sent,
625
0
                    shutdown_finish_message_sent,
626
0
                    initiator,
627
0
                },
628
0
            ) => {
629
0
                debug_assert!(
630
0
                    start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent
631
                );
632
0
                self.connection = MultiStreamConnectionTaskInner::ShutdownAcked {
633
0
                    initiator: *initiator,
634
0
                };
635
            }
636
            (
637
                CoordinatorToConnectionInner::StartShutdown,
638
                MultiStreamConnectionTaskInner::ShutdownWaitingAck {
639
                    initiator: ShutdownInitiator::Coordinator,
640
                    ..
641
                }
642
                | MultiStreamConnectionTaskInner::ShutdownAcked { .. },
643
0
            ) => unreachable!(),
644
0
            (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(),
645
        }
646
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE26inject_coordinator_messageB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE26inject_coordinator_messageCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE26inject_coordinator_messageB8_
647
648
    /// Returns the number of new outbound substreams that the state machine would like to see
649
    /// opened.
650
    ///
651
    /// This value doesn't change automatically over time but only after a call to
652
    /// [`MultiStreamConnectionTask::substream_read_write`],
653
    /// [`MultiStreamConnectionTask::inject_coordinator_message`],
654
    /// [`MultiStreamConnectionTask::add_substream`], or
655
    /// [`MultiStreamConnectionTask::reset_substream`].
656
    ///
657
    /// Note that the user is expected to track the number of substreams that are currently being
658
    /// opened. For example, if this function returns 2 and there are already 2 substreams
659
    /// currently being opened, then there is no need to open any additional one.
660
0
    pub fn desired_outbound_substreams(&self) -> u32 {
661
0
        match &self.connection {
662
            MultiStreamConnectionTaskInner::Handshake {
663
0
                opened_substream, ..
664
0
            } => {
665
0
                if opened_substream.is_none() {
666
0
                    1
667
                } else {
668
0
                    0
669
                }
670
            }
671
0
            MultiStreamConnectionTaskInner::Established { established, .. } => {
672
0
                established.desired_outbound_substreams()
673
            }
674
            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
675
0
            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => 0,
676
        }
677
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27desired_outbound_substreamsB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27desired_outbound_substreamsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27desired_outbound_substreamsB8_
678
679
    /// Notifies the state machine that a new substream has been opened.
680
    ///
681
    /// `outbound` indicates whether the substream has been opened by the remote (`false`) or
682
    /// locally (`true`).
683
    ///
684
    /// If `outbound` is `true`, then the value returned by
685
    /// [`MultiStreamConnectionTask::desired_outbound_substreams`] will decrease by one.
686
    ///
687
    /// # Panic
688
    ///
689
    /// Panics if there already exists a substream with an identical identifier.
690
    ///
691
0
    pub fn add_substream(&mut self, id: TSubId, outbound: bool) {
692
0
        match &mut self.connection {
693
0
            MultiStreamConnectionTaskInner::Handshake {
694
0
                opened_substream: ref mut opened_substream @ None,
695
                ..
696
0
            } if outbound => {
697
0
                *opened_substream = Some((id, webrtc_framing::WebRtcFraming::new()));
698
0
            }
699
            MultiStreamConnectionTaskInner::Handshake {
700
0
                opened_substream,
701
0
                extra_open_substreams,
702
0
                ..
703
0
            } => {
704
0
                assert!(opened_substream
705
0
                    .as_ref()
706
0
                    .map_or(true, |(open, _)| *open != id));
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE13add_substream0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE13add_substream0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE13add_substream0Ba_
707
                // TODO: add a limit to the number allowed?
708
0
                let _was_in = extra_open_substreams.insert(id, outbound);
709
0
                assert!(_was_in.is_none());
710
            }
711
0
            MultiStreamConnectionTaskInner::Established { established, .. } => {
712
0
                established.add_substream(id, outbound)
713
            }
714
            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
715
0
            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
716
0
                // TODO: reset the substream or something?
717
0
            }
718
        }
719
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE13add_substreamB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE13add_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE13add_substreamB8_
720
721
    /// Sets the state of the connection to "reset".
722
    ///
723
    /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP
724
    /// RST flag.
725
    ///
726
    /// After this function has been called, it is illegal to call
727
    /// [`MultiStreamConnectionTask::substream_read_write`] or
728
    /// [`MultiStreamConnectionTask::reset`] again.
729
    ///
730
    /// Calling this function might have generated messages for the coordinator.
731
    /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in
732
    /// order to process these messages.
733
    ///
734
    /// # Panic
735
    ///
736
    /// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past.
737
    ///
738
0
    pub fn reset(&mut self) {
739
0
        match self.connection {
740
            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
741
                initiator: ShutdownInitiator::Api,
742
                ..
743
            }
744
            | MultiStreamConnectionTaskInner::ShutdownAcked {
745
                initiator: ShutdownInitiator::Api,
746
                ..
747
            } => {
748
                // It is illegal to call `reset` a second time.
749
0
                panic!()
750
            }
751
            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
752
0
                ref mut initiator, ..
753
            }
754
            | MultiStreamConnectionTaskInner::ShutdownAcked {
755
0
                ref mut initiator, ..
756
0
            } => {
757
0
                // Mark the initiator as being the API in order to track proper API usage.
758
0
                *initiator = ShutdownInitiator::Api;
759
0
            }
760
0
            _ => {
761
0
                self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck {
762
0
                    initiator: ShutdownInitiator::Api,
763
0
                    shutdown_finish_message_sent: false,
764
0
                    start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)),
765
0
                };
766
0
            }
767
        }
768
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE5resetB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE5resetCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE5resetB8_
769
770
    /// Returns `true` if [`MultiStreamConnectionTask::reset`] has been called in the past.
771
0
    pub fn is_reset_called(&self) -> bool {
772
0
        matches!(
773
0
            self.connection,
774
            MultiStreamConnectionTaskInner::ShutdownWaitingAck {
775
                initiator: ShutdownInitiator::Api,
776
                ..
777
            } | MultiStreamConnectionTaskInner::ShutdownAcked {
778
                initiator: ShutdownInitiator::Api,
779
                ..
780
            }
781
        )
782
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15is_reset_calledB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15is_reset_calledCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15is_reset_calledB8_
783
784
    /// Immediately destroys the substream with the given identifier.
785
    ///
786
    /// The given identifier is now considered invalid by the state machine.
787
    ///
788
    /// # Panic
789
    ///
790
    /// Panics if there is no substream with that identifier.
791
    ///
792
0
    pub fn reset_substream(&mut self, substream_id: &TSubId) {
793
0
        match &mut self.connection {
794
            MultiStreamConnectionTaskInner::Established {
795
0
                handshake_substream,
796
0
                ..
797
0
            } if handshake_substream
798
0
                .as_ref()
799
0
                .map_or(false, |s| s == substream_id) =>
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE15reset_substream0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15reset_substream0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE15reset_substream0Ba_
800
0
            {
801
0
                *handshake_substream = None;
802
0
            }
803
0
            MultiStreamConnectionTaskInner::Established { established, .. } => {
804
0
                established.reset_substream(substream_id)
805
            }
806
            MultiStreamConnectionTaskInner::Handshake {
807
0
                opened_substream: Some((opened_substream, _)),
808
0
                ..
809
0
            } if opened_substream == substream_id => {
810
0
                // TODO: the handshake has failed, kill the connection?
811
0
            }
812
            MultiStreamConnectionTaskInner::Handshake {
813
0
                extra_open_substreams,
814
0
                ..
815
0
            } => {
816
0
                let _was_in = extra_open_substreams.remove(substream_id).is_some();
817
0
                assert!(_was_in);
818
            }
819
            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
820
0
            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
821
0
                // TODO: panic if substream id invalid?
822
0
            }
823
        }
824
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15reset_substreamB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15reset_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15reset_substreamB8_
825
826
    /// Reads/writes data on the substream.
827
    ///
828
    /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead
829
    /// according to the state machine and its identifier is now invalid. If the reading or
830
    /// writing side of the substream was still open, then the user should reset that substream.
831
    ///
832
    /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and
833
    /// [`ReadWrite::write_bytes_queueable`] must always be `Some`.
834
    ///
835
    /// # Panic
836
    ///
837
    /// Panics if there is no substream with that identifier.
838
    /// Panics if this is a WebRTC connection, and the reading or writing side is closed.
839
    ///
840
    #[must_use]
841
0
    pub fn substream_read_write(
842
0
        &mut self,
843
0
        substream_id: &TSubId,
844
0
        read_write: &'_ mut ReadWrite<TNow>,
845
0
    ) -> SubstreamFate {
846
0
        // In WebRTC, the reading and writing sides are never closed.
847
0
        // Note that the `established::MultiStream` state machine also performs this check, but
848
0
        // we do it here again because we're not necessarily in the ̀`established` state.
849
0
        assert!(
850
0
            read_write.expected_incoming_bytes.is_some()
851
0
                && read_write.write_bytes_queueable.is_some()
852
        );
853
854
0
        match &mut self.connection {
855
0
            MultiStreamConnectionTaskInner::Handshake {
856
0
                handshake,
857
0
                opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)),
858
0
                established,
859
0
                extra_open_substreams,
860
0
            } if opened_handshake_substream == substream_id => {
861
                // TODO: check the handshake timeout
862
863
                // Progress the Noise handshake.
864
0
                let handshake_outcome = {
865
0
                    // The Noise data is not directly the data of the substream. Instead,
866
0
                    // everything is wrapped within a Protobuf frame.
867
0
                    let mut with_framing = match handshake_webrtc_framing.read_write(read_write) {
868
0
                        Ok(f) => f,
869
0
                        Err(_err) => {
870
0
                            // TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely
871
0
                            return SubstreamFate::Reset;
872
                        }
873
                    };
874
0
                    handshake.take().unwrap().read_write(&mut with_framing)
875
                };
876
877
0
                match handshake_outcome {
878
0
                    Ok(noise::NoiseHandshake::InProgress(handshake_update)) => {
879
0
                        *handshake = Some(handshake_update);
880
0
                        SubstreamFate::Continue
881
                    }
882
0
                    Err(_err) => return SubstreamFate::Reset, // TODO: /!\
883
                    Ok(noise::NoiseHandshake::Success {
884
                        cipher: _,
885
0
                        remote_peer_id,
886
0
                    }) => {
887
0
                        // The handshake has succeeded and we will transition into "established"
888
0
                        // mode.
889
0
                        let mut established = established.take().unwrap();
890
0
                        for (substream_id, outbound) in extra_open_substreams.drain() {
891
0
                            established.add_substream(substream_id, outbound);
892
0
                        }
893
894
0
                        self.connection = MultiStreamConnectionTaskInner::Established {
895
0
                            established,
896
0
                            handshake_finished_message_to_send: Some(remote_peer_id),
897
0
                            handshake_substream: None, // TODO: do properly
898
0
                            outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
899
0
                                0,
900
0
                                Default::default(),
901
0
                            ),
902
0
                            notifications_in_close_acknowledgments:
903
0
                                hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()),
904
0
                            inbound_accept_cancel_events: VecDeque::with_capacity(2),
905
0
                        };
906
0
907
0
                        // TODO: hacky
908
0
                        SubstreamFate::Reset
909
                    }
910
                }
911
            }
912
            MultiStreamConnectionTaskInner::Established {
913
0
                handshake_substream,
914
0
                ..
915
0
            } if handshake_substream
916
0
                .as_ref()
917
0
                .map_or(false, |s| s == substream_id) =>
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE20substream_read_write0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE20substream_read_write0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE20substream_read_write0Ba_
918
0
            {
919
0
                // Close the writing side. If the reading side is closed, we indicate that the
920
0
                // substream is dead. If the reading side is still open, we indicate that it's not
921
0
                // dead and simply wait for the remote to close it.
922
0
                // TODO: kill the connection if the remote sends more data?
923
0
                read_write.close_write();
924
0
                if read_write.expected_incoming_bytes.is_none() {
925
0
                    *handshake_substream = None;
926
0
                    SubstreamFate::Reset
927
                } else {
928
0
                    SubstreamFate::Continue
929
                }
930
            }
931
0
            MultiStreamConnectionTaskInner::Established { established, .. } => {
932
0
                established.substream_read_write(substream_id, read_write)
933
            }
934
            MultiStreamConnectionTaskInner::Handshake {
935
0
                extra_open_substreams,
936
0
                ..
937
0
            } => {
938
0
                assert!(extra_open_substreams.contains_key(substream_id));
939
                // Don't do anything. Don't read or write. Instead we wait for the handshake to
940
                // be finished.
941
0
                SubstreamFate::Continue
942
            }
943
            MultiStreamConnectionTaskInner::ShutdownAcked { .. }
944
            | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => {
945
                // TODO: panic if substream id invalid?
946
0
                SubstreamFate::Reset
947
            }
948
        }
949
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE20substream_read_writeB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE20substream_read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE20substream_read_writeB8_
950
}