Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/yamux.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Yamux multiplexing protocol.
19
//!
20
//! The Yamux protocol is a multiplexing protocol. As such, it allows dividing a single stream of
21
//! data, typically a TCP socket, into multiple individual parallel substreams. The data sent and
22
//! received over that single stream is divided into frames which, with the exception of `ping`
23
//! and `goaway` frames, belong to a specific substream. In other words, the data transmitted
24
//! over the substreams is interleaved.
25
//!
26
//! Specification available at <https://github.com/hashicorp/yamux/blob/master/spec.md>
27
//!
28
//! # Usage
29
//!
30
//! The [`Yamux`] object holds the state of all yamux-specific information, and the list of
31
//! all currently-open substreams.
32
//!
33
//! The generic parameter of [`Yamux`] is an opaque "user data" associated to each substream.
34
//!
35
36
// TODO: more documentation
37
38
use crate::{
39
    libp2p::read_write::{self, ReadWrite},
40
    util::SipHasherBuild,
41
};
42
43
use alloc::{
44
    boxed::Box,
45
    collections::{BTreeSet, VecDeque},
46
    vec::Vec,
47
};
48
use core::{
49
    cmp, fmt, mem,
50
    num::{NonZeroU32, NonZeroU64, NonZeroUsize},
51
    ops,
52
};
53
use rand::seq::IteratorRandom as _;
54
use rand_chacha::{
55
    rand_core::{RngCore as _, SeedableRng as _},
56
    ChaCha20Rng,
57
};
58
59
pub use header::GoAwayErrorCode;
60
61
mod header;
62
mod tests;
63
64
/// Name of the protocol, typically used when negotiated it using *multistream-select*.
65
pub const PROTOCOL_NAME: &str = "/yamux/1.0.0";
66
67
/// Configuration for a new [`Yamux`].
68
#[derive(Debug)]
69
pub struct Config {
70
    /// `true` if the local machine has initiated the connection. Otherwise, `false`.
71
    pub is_initiator: bool,
72
73
    /// Expected number of substreams simultaneously open, both inbound and outbound substreams
74
    /// combined.
75
    pub capacity: usize,
76
77
    /// Seed used for the randomness. Used to avoid HashDoS attack and determines the order in
78
    /// which the data on substreams is sent out.
79
    pub randomness_seed: [u8; 32],
80
81
    /// Maximum size of data frames to send out.
82
    ///
83
    /// A higher value increases the variance of the latency of the data sent on the substreams,
84
    /// which is undesirable. A lower value increases the overhead of the Yamux protocol. This
85
    /// overhead is equal to `1200 / (max_out_data_frame_size + 12)` per cent, for example setting
86
    /// `max_out_data_frame_size` to 24 incurs a `33%` overhead.
87
    ///
88
    /// The "best" value depends on the bandwidth speed of the underlying connection, and is thus
89
    /// impossible to tell.
90
    ///
91
    /// A typical value is `8192`.
92
    pub max_out_data_frame_size: NonZeroU32,
93
94
    /// When the remote sends a ping, we need to send out a pong. However, the remote could refuse
95
    /// to read any additional data from the socket and continue sending pings, thus increasing
96
    /// the local buffer size indefinitely. In order to protect against this attack, there exists
97
    /// a maximum number of queued pongs, after which the connection will be shut down abruptly.
98
    pub max_simultaneous_queued_pongs: NonZeroUsize,
99
100
    /// When the remote sends a substream, and this substream gets rejected by the API user, some
101
    /// data needs to be sent out. However, the remote could refuse reading any additional data
102
    /// and continue sending new substream requests, thus increasing the local buffer size
103
    /// indefinitely. In order to protect against this attack, there exists a maximum number of
104
    /// queued substream rejections after which the connection will be shut down abruptly.
105
    pub max_simultaneous_rst_substreams: NonZeroUsize,
106
}
107
108
/// Yamux state machine. See [the module-level documentation](..) for more information.
109
pub struct Yamux<TNow, TSub> {
110
    /// The actual fields are wrapped in a `Box` because the `Yamux` object is moved around pretty
111
    /// often.
112
    inner: Box<YamuxInner<TNow, TSub>>,
113
}
114
115
struct YamuxInner<TNow, TSub> {
116
    /// List of substreams currently open in the Yamux state machine.
117
    ///
118
    /// A `SipHasher` is used in order to avoid hash collision attacks on substream IDs.
119
    substreams: hashbrown::HashMap<NonZeroU32, Substream<TNow, TSub>, SipHasherBuild>,
120
121
    /// Subset of the content of [`YamuxInner::substreams`] that is considered "dead", meaning
122
    /// that it is returned by [`Yamux::dead_substreams`].
123
    dead_substreams: hashbrown::HashSet<NonZeroU32, SipHasherBuild>,
124
125
    /// Subset of the content of [`YamuxInner::substreams`] that requires some process because
126
    /// they have data in their read buffer or their `wake_up_after` value is reached.
127
    ///
128
    /// All the substreams are always in the "healthy" state.
129
    ///
130
    /// Keys are the time after which this substream should be processed, which can be inferior or
131
    /// equal to "now" for an immediate wake up. A key equal to `None` means "right now".
132
    substreams_wake_up: BTreeSet<(Option<TNow>, NonZeroU32)>,
133
134
    /// List of substreams that might want to write out additional data. Processed when it is
135
    /// possible to send out data.
136
    ///
137
    /// All the substreams are always in the "healthy" state.
138
    ///
139
    /// Contrary to [`YamuxInner::substreams_wake_up`], the substreams in this list are processed
140
    /// only if it is possible to queue out more data for sending.
141
    substreams_write_ready: hashbrown::HashSet<NonZeroU32, SipHasherBuild>,
142
143
    /// List of window frames to send to the remote. For each substream, the amount of bytes to
144
    /// add to the window.
145
    window_frames_to_send: hashbrown::HashMap<NonZeroU32, NonZeroU64, SipHasherBuild>,
146
147
    /// Number of substreams within [`YamuxInner::substreams`] whose [`Substream::inbound`] is
148
    /// `true`.
149
    num_inbound: usize,
150
151
    /// `Some` if a `GoAway` frame has been received in the past.
152
    received_goaway: Option<GoAwayErrorCode>,
153
154
    /// Whether to send out a `GoAway` frame.
155
    outgoing_goaway: OutgoingGoAway,
156
157
    /// What kind of data is expected on the socket next.
158
    incoming: Incoming,
159
160
    /// What is currently being written out.
161
    outgoing: Outgoing,
162
163
    /// See [`Config::max_out_data_frame_size`].
164
    max_out_data_frame_size: NonZeroU32,
165
166
    /// Id of the next outgoing substream to open.
167
    /// This implementation allocates identifiers linearly. Every time a substream is open, its
168
    /// value is incremented by two.
169
    next_outbound_substream: NonZeroU32,
170
171
    /// Number of pings to send out that haven't been queued yet.
172
    pings_to_send: usize,
173
174
    /// List of pings that have been sent out but haven't been replied yet.
175
    pings_waiting_reply: VecDeque<u32>,
176
177
    /// List of opaque values corresponding to ping requests sent by the remote. For each entry,
178
    /// a PONG header should be sent to the remote.
179
    pongs_to_send: VecDeque<u32>,
180
181
    /// See [`Config::max_simultaneous_queued_pongs`].
182
    max_simultaneous_queued_pongs: NonZeroUsize,
183
184
    /// List of substream IDs that have been reset locally. For each entry, a RST header should
185
    /// be sent to the remote.
186
    rsts_to_send: VecDeque<NonZeroU32>,
187
188
    /// See [`Config::max_simultaneous_rst_substreams`].
189
    max_simultaneous_rst_substreams: NonZeroUsize,
190
191
    /// Source of randomness used for various purposes.
192
    randomness: ChaCha20Rng,
193
}
194
195
struct Substream<TNow, TSub> {
196
    /// State of the substream.
197
    state: SubstreamState<TNow>,
198
    /// `true` if the substream has been opened by the remote.
199
    inbound: bool,
200
    /// Data chosen by the user.
201
    user_data: TSub,
202
}
203
204
enum SubstreamState<TNow> {
205
    Healthy {
206
        /// True if a message on this substream has already been sent since it has been opened. The
207
        /// first message on a substream must contain either a SYN or `ACK` flag.
208
        first_message_queued: bool,
209
        /// True if the remote has sent a message on this substream and has thus acknowledged that
210
        /// this substream exists.
211
        remote_syn_acked: bool,
212
        /// Amount of data the remote is allowed to transmit to the local node. Does not take into
213
        /// account window frames that haven't been sent out yet. In other words, this value is
214
        /// increased when a window frame is sent out.
215
        remote_allowed_window: u64,
216
        /// Amount of data the local node is allowed to transmit to the remote.
217
        allowed_window: u64,
218
        /// State of the local writing side of this substream.
219
        local_write_close: SubstreamStateLocalWrite,
220
        /// True if the writing side of the remote node is closed for this substream.
221
        remote_write_closed: bool,
222
        /// Buffer of incoming data that hasn't been processed by the substream yet.
223
        read_buffer: Vec<u8>,
224
        /// Value of [`ReadWrite::expected_incoming_bytes`] previously yielded by the substream.
225
        /// `None` means "unknown".
226
        expected_incoming_bytes: Option<usize>,
227
        /// If this substream is currently in [`YamuxInner::substreams_wake_up`], this contains
228
        /// the key where it is currently inserted.
229
        substreams_wake_up_key: Option<Option<TNow>>,
230
    },
231
232
    /// The substream has been reset, either locally or by the remote. Its entire purpose is to
233
    /// be removed by the API user.
234
    Reset,
235
}
236
237
enum SubstreamStateLocalWrite {
238
    Open,
239
    FinDesired,
240
    FinQueued,
241
}
242
243
enum Incoming {
244
    /// Expect a header. The field might contain some already-read bytes.
245
    Header,
246
247
    /// Expect the data of a previously-received data frame header.
248
    ///
249
    /// Note that the state of the reading side of the substream might already be closed. The
250
    /// implementation verifies that the remote is allowed to send data when processing the header,
251
    /// the immediately sets the reading side to "closed" if the FIN flag is set, even if there
252
    /// is still data to be received.
253
    DataFrame {
254
        /// Identifier of the substream the data belongs to.
255
        substream_id: NonZeroU32,
256
        /// Number of bytes of data remaining before the frame ends.
257
        remaining_bytes: u32,
258
    },
259
260
    /// A header referring to a new substream has been received. The reception of any further data
261
    /// is blocked waiting for the API user to accept or reject this substream.
262
    PendingIncomingSubstream {
263
        /// Identifier of the pending substream.
264
        substream_id: NonZeroU32,
265
        /// Extra local window size to give to this substream.
266
        extra_window: u32,
267
        /// If non-zero, must transition to a [`Incoming::DataFrame`].
268
        data_frame_size: u32,
269
        /// True if the remote writing side of the substream should immediately be closed.
270
        fin: bool,
271
    },
272
}
273
274
enum Outgoing {
275
    WritingOut {
276
        /// Buffers of data to write out.
277
        buffers: Vec<Vec<u8>>,
278
    },
279
    PreparingDataFrame {
280
        /// Substream concerned by the data frame. Always healthy.
281
        substream_id: NonZeroU32,
282
        /// Buffers of data that the substream is producing. Does not include the Yamux header.
283
        /// Must never be empty.
284
        write_buffers: Vec<Vec<u8>>,
285
    },
286
}
287
288
enum OutgoingGoAway {
289
    /// No `GoAway` frame has been sent or requested. Normal mode of operations.
290
    NotRequired,
291
292
    /// API user has asked to send a `GoAway` frame. This frame hasn't been queued into
293
    /// [`YamuxInner::outgoing`] yet.
294
    Required(GoAwayErrorCode),
295
296
    /// A `GoAway` frame has been queued into [`YamuxInner::outgoing`] in the past.
297
    Queued,
298
299
    /// A `GoAway` frame has been extracted through [`Yamux::read_write`].
300
    Sent,
301
}
302
303
/// Maximum number of simultaneous outgoing pings allowed.
304
pub const MAX_PINGS: usize = 100000;
305
306
impl<TNow, TSub> Yamux<TNow, TSub> {
307
    /// Initializes a new Yamux state machine.
308
14
    pub fn new(config: Config) -> Yamux<TNow, TSub> {
309
14
        let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed);
310
14
311
14
        Yamux {
312
14
            inner: Box::new(YamuxInner {
313
14
                substreams: hashbrown::HashMap::with_capacity_and_hasher(
314
14
                    config.capacity,
315
14
                    SipHasherBuild::new({
316
14
                        let mut seed = [0; 16];
317
14
                        randomness.fill_bytes(&mut seed);
318
14
                        seed
319
14
                    }),
320
14
                ),
321
14
                dead_substreams: hashbrown::HashSet::with_capacity_and_hasher(
322
14
                    config.capacity,
323
14
                    SipHasherBuild::new({
324
14
                        let mut seed = [0; 16];
325
14
                        randomness.fill_bytes(&mut seed);
326
14
                        seed
327
14
                    }),
328
14
                ),
329
14
                substreams_wake_up: BTreeSet::new(),
330
14
                substreams_write_ready: hashbrown::HashSet::with_capacity_and_hasher(
331
14
                    config.capacity,
332
14
                    SipHasherBuild::new({
333
14
                        let mut seed = [0; 16];
334
14
                        randomness.fill_bytes(&mut seed);
335
14
                        seed
336
14
                    }),
337
14
                ),
338
14
                window_frames_to_send: hashbrown::HashMap::with_capacity_and_hasher(
339
14
                    config.capacity,
340
14
                    SipHasherBuild::new({
341
14
                        let mut seed = [0; 16];
342
14
                        randomness.fill_bytes(&mut seed);
343
14
                        seed
344
14
                    }),
345
14
                ),
346
14
                num_inbound: 0,
347
14
                received_goaway: None,
348
14
                incoming: Incoming::Header,
349
14
                outgoing: Outgoing::WritingOut {
350
14
                    // TODO: capacity?
351
14
                    buffers: Vec::with_capacity(16),
352
14
                },
353
14
                outgoing_goaway: OutgoingGoAway::NotRequired,
354
14
                max_out_data_frame_size: config.max_out_data_frame_size,
355
14
                next_outbound_substream: if config.is_initiator {
356
7
                    NonZeroU32::new(1).unwrap()
357
                } else {
358
7
                    NonZeroU32::new(2).unwrap()
359
                },
360
                pings_to_send: 0,
361
                // We leave the initial capacity at 0, as it is likely that no ping is sent at all.
362
14
                pings_waiting_reply: VecDeque::with_capacity(0),
363
14
                pongs_to_send: VecDeque::with_capacity(4),
364
14
                max_simultaneous_queued_pongs: config.max_simultaneous_queued_pongs,
365
14
                rsts_to_send: VecDeque::with_capacity(4),
366
14
                max_simultaneous_rst_substreams: config.max_simultaneous_rst_substreams,
367
14
                randomness,
368
14
            }),
369
14
        }
370
14
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE3newB8_
Line
Count
Source
308
14
    pub fn new(config: Config) -> Yamux<TNow, TSub> {
309
14
        let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed);
310
14
311
14
        Yamux {
312
14
            inner: Box::new(YamuxInner {
313
14
                substreams: hashbrown::HashMap::with_capacity_and_hasher(
314
14
                    config.capacity,
315
14
                    SipHasherBuild::new({
316
14
                        let mut seed = [0; 16];
317
14
                        randomness.fill_bytes(&mut seed);
318
14
                        seed
319
14
                    }),
320
14
                ),
321
14
                dead_substreams: hashbrown::HashSet::with_capacity_and_hasher(
322
14
                    config.capacity,
323
14
                    SipHasherBuild::new({
324
14
                        let mut seed = [0; 16];
325
14
                        randomness.fill_bytes(&mut seed);
326
14
                        seed
327
14
                    }),
328
14
                ),
329
14
                substreams_wake_up: BTreeSet::new(),
330
14
                substreams_write_ready: hashbrown::HashSet::with_capacity_and_hasher(
331
14
                    config.capacity,
332
14
                    SipHasherBuild::new({
333
14
                        let mut seed = [0; 16];
334
14
                        randomness.fill_bytes(&mut seed);
335
14
                        seed
336
14
                    }),
337
14
                ),
338
14
                window_frames_to_send: hashbrown::HashMap::with_capacity_and_hasher(
339
14
                    config.capacity,
340
14
                    SipHasherBuild::new({
341
14
                        let mut seed = [0; 16];
342
14
                        randomness.fill_bytes(&mut seed);
343
14
                        seed
344
14
                    }),
345
14
                ),
346
14
                num_inbound: 0,
347
14
                received_goaway: None,
348
14
                incoming: Incoming::Header,
349
14
                outgoing: Outgoing::WritingOut {
350
14
                    // TODO: capacity?
351
14
                    buffers: Vec::with_capacity(16),
352
14
                },
353
14
                outgoing_goaway: OutgoingGoAway::NotRequired,
354
14
                max_out_data_frame_size: config.max_out_data_frame_size,
355
14
                next_outbound_substream: if config.is_initiator {
356
7
                    NonZeroU32::new(1).unwrap()
357
                } else {
358
7
                    NonZeroU32::new(2).unwrap()
359
                },
360
                pings_to_send: 0,
361
                // We leave the initial capacity at 0, as it is likely that no ping is sent at all.
362
14
                pings_waiting_reply: VecDeque::with_capacity(0),
363
14
                pongs_to_send: VecDeque::with_capacity(4),
364
14
                max_simultaneous_queued_pongs: config.max_simultaneous_queued_pongs,
365
14
                rsts_to_send: VecDeque::with_capacity(4),
366
14
                max_simultaneous_rst_substreams: config.max_simultaneous_rst_substreams,
367
14
                randomness,
368
14
            }),
369
14
        }
370
14
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE3newB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE3newCsiUjFBJteJ7x_17smoldot_full_node
371
372
    /// Returns `true` if there is no substream in the state machine.
373
    ///
374
    /// > **Note**: After a substream has been closed or reset, it must be removed using
375
    /// >           [`Yamux::remove_dead_substream`] before this function can return `true`.
376
0
    pub fn is_empty(&self) -> bool {
377
0
        self.inner.substreams.is_empty()
378
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE8is_emptyB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE8is_emptyB8_
379
380
    /// Returns the number of substreams in the Yamux state machine. Includes substreams that are
381
    /// dead but haven't been removed yet.
382
389
    pub fn len(&self) -> usize {
383
389
        self.inner.substreams.len()
384
389
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE3lenB8_
Line
Count
Source
382
389
    pub fn len(&self) -> usize {
383
389
        self.inner.substreams.len()
384
389
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE3lenCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE3lenB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE3lenCsiUjFBJteJ7x_17smoldot_full_node
385
386
    /// Returns the number of inbound substreams in the Yamux state machine. Includes substreams
387
    /// that are dead but haven't been removed yet.
388
20
    pub fn num_inbound(&self) -> usize {
389
20
        debug_assert_eq!(
390
            self.inner.num_inbound,
391
26
            
self.inner.substreams.values().filter(20
|s| s.inbound
).count()20
_RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE11num_inbound0Ba_
Line
Count
Source
391
26
            self.inner.substreams.values().filter(|s| s.inbound).count()
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE11num_inbound0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11num_inbound0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE11num_inbound0CsiUjFBJteJ7x_17smoldot_full_node
392
        );
393
394
20
        self.inner.num_inbound
395
20
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE11num_inboundB8_
Line
Count
Source
388
20
    pub fn num_inbound(&self) -> usize {
389
20
        debug_assert_eq!(
390
            self.inner.num_inbound,
391
20
            self.inner.substreams.values().filter(|s| s.inbound).count()
392
        );
393
394
20
        self.inner.num_inbound
395
20
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE11num_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE11num_inboundB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE11num_inboundCsiUjFBJteJ7x_17smoldot_full_node
396
397
    /// Returns `Some` if a [`ReadWriteOutcome::GoAway`] event has been generated in the past,
398
    /// in which case the code is returned.
399
    ///
400
    /// If `Some` is returned, it is forbidden to open new outbound substreams.
401
0
    pub fn received_goaway(&self) -> Option<GoAwayErrorCode> {
402
0
        self.inner.received_goaway
403
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE15received_goawayB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE15received_goawayCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE15received_goawayB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE15received_goawayCsiUjFBJteJ7x_17smoldot_full_node
404
405
    /// Returns an iterator to the list of all substream user datas.
406
0
    pub fn user_datas(&self) -> impl ExactSizeIterator<Item = (SubstreamId, &TSub)> {
407
0
        self.inner
408
0
            .substreams
409
0
            .iter()
410
0
            .map(|(id, s)| (SubstreamId(*id), &s.user_data))
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10user_datas0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10user_datas0Ba_
411
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE10user_datasB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE10user_datasB8_
412
413
    /// Returns an iterator to the list of all substream user datas.
414
0
    pub fn user_datas_mut(&mut self) -> impl ExactSizeIterator<Item = (SubstreamId, &mut TSub)> {
415
0
        self.inner
416
0
            .substreams
417
0
            .iter_mut()
418
0
            .map(|(id, s)| (SubstreamId(*id), &mut s.user_data))
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14user_datas_mut0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14user_datas_mut0Ba_
419
0
    }
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE14user_datas_mutB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE14user_datas_mutB8_
420
421
    /// Returns `true` if the given [`SubstreamId`] exists.
422
    ///
423
    /// Also returns `true` if the substream is in a dead state.
424
392
    pub fn has_substream(&self, substream_id: SubstreamId) -> bool {
425
392
        self.inner.substreams.contains_key(&substream_id.0)
426
392
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE13has_substreamB8_
Line
Count
Source
424
392
    pub fn has_substream(&self, substream_id: SubstreamId) -> bool {
425
392
        self.inner.substreams.contains_key(&substream_id.0)
426
392
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE13has_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE13has_substreamB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE13has_substreamCsiUjFBJteJ7x_17smoldot_full_node
427
428
    /// Returns `true` if [`Yamux::send_goaway`] has been called in the past.
429
    ///
430
    /// In other words, returns `true` if a `GoAway` frame has been either queued for sending
431
    /// (and is available through [`Yamux::read_write`]) or has already been sent out.
432
20
    pub fn goaway_queued_or_sent(&self) -> bool {
433
20
        !
matches!0
(self.inner.outgoing_goaway, OutgoingGoAway::NotRequired)
434
20
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE21goaway_queued_or_sentB8_
Line
Count
Source
432
20
    pub fn goaway_queued_or_sent(&self) -> bool {
433
20
        !
matches!0
(self.inner.outgoing_goaway, OutgoingGoAway::NotRequired)
434
20
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE21goaway_queued_or_sentCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE21goaway_queued_or_sentB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE21goaway_queued_or_sentCsiUjFBJteJ7x_17smoldot_full_node
435
436
    /// Returns `true` if [`Yamux::send_goaway`] has been called in the past and that this
437
    /// `GoAway` frame has been extracted through [`Yamux::read_write`].
438
49
    pub fn goaway_sent(&self) -> bool {
439
49
        matches!(self.inner.outgoing_goaway, OutgoingGoAway::Sent)
440
49
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE11goaway_sentB8_
Line
Count
Source
438
49
    pub fn goaway_sent(&self) -> bool {
439
49
        matches!(self.inner.outgoing_goaway, OutgoingGoAway::Sent)
440
49
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE11goaway_sentCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE11goaway_sentB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE11goaway_sentCsiUjFBJteJ7x_17smoldot_full_node
441
}
442
443
impl<TNow, TSub> Yamux<TNow, TSub>
444
where
445
    TNow: Clone + cmp::Ord,
446
{
447
    /// Opens a new substream.
448
    ///
449
    /// This method only modifies the state of `self` and reserves an identifier. No message needs
450
    /// to be sent to the remote before data is actually being sent on the substream.
451
    ///
452
    /// The substream will be automatically processed by [`Yamux::read_write`] in the future.
453
    ///
454
    /// > **Note**: Importantly, the remote will not be notified of the substream being open
455
    /// >           before the local side sends data on this substream. As such, protocols where
456
    /// >           the remote is expected to send data in response to a substream being open,
457
    /// >           without the local side first sending some data on that substream, will not
458
    /// >           work. In practice, while this is technically out of concern of the Yamux
459
    /// >           protocol, all substreams in the context of libp2p start with a
460
    /// >           multistream-select negotiation, and this scenario can therefore never happen.
461
    ///
462
    /// Returns an error if a [`ReadWriteOutcome::GoAway`] event has been generated. This can
463
    /// also be checked by calling [`Yamux::received_goaway`].
464
    ///
465
    /// Returns an error if all possible substream IDs are already taken. This happen if there
466
    /// exists more than approximately `2^31` substreams, which is very unlikely to happen unless
467
    /// there exists a bug in the code.
468
    ///
469
20
    pub fn open_substream(&mut self, user_data: TSub) -> Result<SubstreamId, OpenSubstreamError> {
470
20
        if self.inner.received_goaway.is_some() {
471
0
            return Err(OpenSubstreamError::GoAwayReceived);
472
20
        }
473
20
474
20
        let substream_id = self.inner.next_outbound_substream;
475
20
476
20
        self.inner.next_outbound_substream = match self.inner.next_outbound_substream.checked_add(2)
477
        {
478
20
            Some(new_id) => new_id,
479
0
            None => return Err(OpenSubstreamError::NoFreeSubstreamId),
480
        };
481
482
20
        let _prev_value = self.inner.substreams.insert(
483
20
            substream_id,
484
20
            Substream {
485
20
                state: SubstreamState::Healthy {
486
20
                    first_message_queued: false,
487
20
                    remote_syn_acked: false,
488
20
                    remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE,
489
20
                    allowed_window: NEW_SUBSTREAMS_FRAME_SIZE,
490
20
                    local_write_close: SubstreamStateLocalWrite::Open,
491
20
                    remote_write_closed: false,
492
20
                    expected_incoming_bytes: None,
493
20
                    read_buffer: Vec::new(), // TODO: capacity?
494
20
                    substreams_wake_up_key: Some(None),
495
20
                },
496
20
                inbound: false,
497
20
                user_data,
498
20
            },
499
20
        );
500
20
        debug_assert!(_prev_value.is_none());
501
502
        // The substream is added to `substreams_wake_up` rather than `substreams_write_ready`,
503
        // in case the substream processing does some magic such as generate events before having
504
        // even done anything.
505
20
        self.inner.substreams_wake_up.insert((None, substream_id));
506
20
507
20
        Ok(SubstreamId(substream_id))
508
20
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE14open_substreamBa_
Line
Count
Source
469
20
    pub fn open_substream(&mut self, user_data: TSub) -> Result<SubstreamId, OpenSubstreamError> {
470
20
        if self.inner.received_goaway.is_some() {
471
0
            return Err(OpenSubstreamError::GoAwayReceived);
472
20
        }
473
20
474
20
        let substream_id = self.inner.next_outbound_substream;
475
20
476
20
        self.inner.next_outbound_substream = match self.inner.next_outbound_substream.checked_add(2)
477
        {
478
20
            Some(new_id) => new_id,
479
0
            None => return Err(OpenSubstreamError::NoFreeSubstreamId),
480
        };
481
482
20
        let _prev_value = self.inner.substreams.insert(
483
20
            substream_id,
484
20
            Substream {
485
20
                state: SubstreamState::Healthy {
486
20
                    first_message_queued: false,
487
20
                    remote_syn_acked: false,
488
20
                    remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE,
489
20
                    allowed_window: NEW_SUBSTREAMS_FRAME_SIZE,
490
20
                    local_write_close: SubstreamStateLocalWrite::Open,
491
20
                    remote_write_closed: false,
492
20
                    expected_incoming_bytes: None,
493
20
                    read_buffer: Vec::new(), // TODO: capacity?
494
20
                    substreams_wake_up_key: Some(None),
495
20
                },
496
20
                inbound: false,
497
20
                user_data,
498
20
            },
499
20
        );
500
20
        debug_assert!(_prev_value.is_none());
501
502
        // The substream is added to `substreams_wake_up` rather than `substreams_write_ready`,
503
        // in case the substream processing does some magic such as generate events before having
504
        // even done anything.
505
20
        self.inner.substreams_wake_up.insert((None, substream_id));
506
20
507
20
        Ok(SubstreamId(substream_id))
508
20
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE14open_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14open_substreamBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE14open_substreamCsiUjFBJteJ7x_17smoldot_full_node
509
510
    /// Marks the given substream as being ready to write out data.
511
    ///
512
    /// Calling this function is necessary in situations where a substream didn't write any data
513
    /// in the past, but some sort of manual state update will make it write data in the future.
514
    ///
515
    /// Has no effect if the substream has been reset or closed.
516
    ///
517
    /// # Panic
518
    ///
519
    /// Panics if the [`SubstreamId`] is invalid.
520
    ///
521
28
    pub fn mark_substream_write_ready(&mut self, substream_id: SubstreamId) {
522
28
        assert!(self.inner.substreams.contains_key(&substream_id.0));
523
28
        if !self.inner.dead_substreams.contains(&substream_id.0) {
524
28
            self.inner.substreams_write_ready.insert(substream_id.0);
525
28
        }
0
526
28
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE26mark_substream_write_readyBa_
Line
Count
Source
521
28
    pub fn mark_substream_write_ready(&mut self, substream_id: SubstreamId) {
522
28
        assert!(self.inner.substreams.contains_key(&substream_id.0));
523
28
        if !self.inner.dead_substreams.contains(&substream_id.0) {
524
28
            self.inner.substreams_write_ready.insert(substream_id.0);
525
28
        }
0
526
28
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE26mark_substream_write_readyCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE26mark_substream_write_readyBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE26mark_substream_write_readyCsiUjFBJteJ7x_17smoldot_full_node
527
528
    /// Feeds data coming from a socket and outputs data to write to the socket.
529
    ///
530
    /// Returns an object that implements `Deref<Target = ReadWrite>`. This object represents the
531
    /// decrypted stream of data.
532
    ///
533
    /// An error is returned if the protocol is being violated by the remote or if the nonce
534
    /// overflows. When that happens, the connection should be closed altogether.
535
389
    pub fn read_write(
536
389
        mut self,
537
389
        outer_read_write: &mut ReadWrite<TNow>,
538
389
    ) -> Result<ReadWriteOutcome<'_, TNow, TSub>, Error> {
539
        // Queue something for writing if necessary.
540
389
        if let Outgoing::WritingOut { 
buffers323
} = &mut self.inner.outgoing {
541
            // Make sure that it's not just a list of empty buffers.
542
323
            debug_assert_eq!(
543
323
                buffers.is_empty(),
544
323
                buffers.iter().fold(0, |sz, b| 
sz + b.len()201
) == 0
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes6_0Bc_
Line
Count
Source
544
201
                buffers.iter().fold(0, |sz, b| sz + b.len()) == 0
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes6_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes6_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes6_0CsiUjFBJteJ7x_17smoldot_full_node
545
            );
546
547
323
            if buffers.is_empty() {
548
254
                if let OutgoingGoAway::Queued = self.inner.outgoing_goaway {
549
0
                    self.inner.outgoing_goaway = OutgoingGoAway::Sent;
550
254
                }
551
552
254
                if let OutgoingGoAway::Required(
error_code0
) = self.inner.outgoing_goaway {
553
0
                    // Send a `GoAway` frame if demanded.
554
0
                    buffers.push(
555
0
                        header::encode(&header::DecodedYamuxHeader::GoAway { error_code }).to_vec(),
556
0
                    );
557
0
                    self.inner.outgoing_goaway = OutgoingGoAway::Queued;
558
254
                } else if let Some(
substream_id0
) = self.inner.rsts_to_send.pop_front() {
559
0
                    // Send RST frame.
560
0
                    buffers.push(
561
0
                        header::encode(&header::DecodedYamuxHeader::Window {
562
0
                            syn: false,
563
0
                            ack: false,
564
0
                            fin: false,
565
0
                            rst: true,
566
0
                            stream_id: substream_id,
567
0
                            length: 0,
568
0
                        })
569
0
                        .to_vec(),
570
0
                    );
571
254
                } else if self.inner.pings_to_send > 0 {
572
                    // Send outgoing pings.
573
0
                    self.inner.pings_to_send -= 1;
574
0
                    let opaque_value: u32 = self.inner.randomness.next_u32();
575
0
                    self.inner.pings_waiting_reply.push_back(opaque_value);
576
0
                    buffers.push(
577
0
                        header::encode(&header::DecodedYamuxHeader::PingRequest { opaque_value })
578
0
                            .to_vec(),
579
0
                    );
580
0
                    debug_assert!(self.inner.pings_waiting_reply.len() <= MAX_PINGS);
581
254
                } else if let Some(
opaque_value0
) = self.inner.pongs_to_send.pop_front() {
582
0
                    // Send outgoing pongs.
583
0
                    buffers.push(
584
0
                        header::encode(&header::DecodedYamuxHeader::PingResponse { opaque_value })
585
0
                            .to_vec(),
586
0
                    );
587
254
                } else if let Some(
substream_id0
) = self
588
254
                    .inner
589
254
                    .window_frames_to_send
590
254
                    .keys()
591
254
                    .choose(&mut self.inner.randomness)
592
254
                    .copied()
593
                {
594
                    // Send window frame.
595
                    let Some(Substream {
596
0
                        inbound,
597
0
                        state:
598
0
                            SubstreamState::Healthy {
599
0
                                first_message_queued,
600
0
                                remote_allowed_window,
601
0
                                local_write_close,
602
                                ..
603
                            },
604
                        ..
605
0
                    }) = &mut self.inner.substreams.get_mut(&substream_id)
606
                    else {
607
0
                        unreachable!()
608
                    };
609
610
0
                    let mut pending_window_increase = self
611
0
                        .inner
612
0
                        .window_frames_to_send
613
0
                        .remove(&substream_id)
614
0
                        .unwrap()
615
0
                        .get();
616
0
617
0
                    let actual_window_update =
618
0
                        u32::try_from(pending_window_increase).unwrap_or(u32::MAX);
619
0
                    buffers.push(
620
0
                        header::encode(&header::DecodedYamuxHeader::Window {
621
0
                            syn: !*first_message_queued && !*inbound,
622
0
                            ack: !*first_message_queued && *inbound,
623
                            // Note that it is unclear whether `fin` should be set if the local
624
                            // writing state has been written before.
625
0
                            fin: matches!(local_write_close, SubstreamStateLocalWrite::FinDesired),
626
                            rst: false,
627
0
                            stream_id: substream_id,
628
0
                            length: actual_window_update,
629
0
                        })
630
0
                        .to_vec(),
631
0
                    );
632
0
633
0
                    *remote_allowed_window =
634
0
                        remote_allowed_window.saturating_add(u64::from(actual_window_update));
635
0
                    pending_window_increase -= u64::from(actual_window_update);
636
0
                    *first_message_queued = true;
637
0
                    if matches!(local_write_close, SubstreamStateLocalWrite::FinDesired) {
638
0
                        *local_write_close = SubstreamStateLocalWrite::FinQueued;
639
0
                    }
640
641
                    // In the rare situation where the window update doesn't fit in a `u32`, we
642
                    // have to send another window frame again later.
643
0
                    if let Some(pending_window_increase) = NonZeroU64::new(pending_window_increase)
644
0
                    {
645
0
                        self.inner
646
0
                            .window_frames_to_send
647
0
                            .insert(substream_id, pending_window_increase);
648
0
                    }
649
254
                }
650
69
            }
651
66
        }
652
653
        // Try finish writing the data currently being written.
654
389
        if let Outgoing::WritingOut { 
buffers323
} = &mut self.inner.outgoing {
655
323
            let buffers_total_size = buffers.iter().fold(0, |count, buf| 
count + buf.len()201
);
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_write0Bc_
Line
Count
Source
655
201
            let buffers_total_size = buffers.iter().fold(0, |count, buf| count + buf.len());
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_write0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_write0CsiUjFBJteJ7x_17smoldot_full_node
656
323
657
323
            if buffers_total_size == 0 {
658
254
                // Do nothing.
659
254
            } else if outer_read_write
660
69
                .write_bytes_queueable
661
69
                .map_or(false, |queuable| buffers_total_size <= queuable)
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes_0Bc_
Line
Count
Source
661
69
                .map_or(false, |queuable| buffers_total_size <= queuable)
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes_0CsiUjFBJteJ7x_17smoldot_full_node
662
            {
663
                // We can directly push all the write buffers to the `ReadWrite`.
664
69
                if outer_read_write.write_buffers.is_empty() {
665
69
                    debug_assert_eq!(outer_read_write.write_bytes_queued, 0);
666
69
                    outer_read_write.write_buffers = mem::take(buffers);
667
0
                } else {
668
0
                    outer_read_write.write_buffers.append(buffers);
669
0
                }
670
671
69
                outer_read_write.write_bytes_queued += buffers_total_size;
672
69
                *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_total_size;
673
0
            } else if outer_read_write.write_buffers.is_empty()
674
0
                && outer_read_write
675
0
                    .write_bytes_queueable
676
0
                    .map_or(false, |queueable| {
677
0
                        buffers.first().map_or(0, |b| b.len()) <= queueable
Unexecuted instantiation: _RNCNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1e_6option6OptionTINtNtNtBa_11established9substream9SubstreamB1a_EIB1M_uEEEE10read_writes0_00Be_
Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1f_6option6OptionTINtNtNtBa_11established9substream9SubstreamB1b_EIB1N_IB1N_NtNtBc_10collection11SubstreamIdEEEEE10read_writes0_00CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxppE10read_writes0_00Be_
Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtBa_11established9substream9SubstreamB1b_EIB1L_IB1L_NtNtBc_10collection11SubstreamIdEEEEE10read_writes0_00CsiUjFBJteJ7x_17smoldot_full_node
678
0
                    })
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes0_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes0_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes0_0CsiUjFBJteJ7x_17smoldot_full_node
679
0
            {
680
0
                // Not enough space to push all the buffers at once, but enough space to push at
681
0
                // least the first one. Push as many buffers as possible.
682
0
                let limit = outer_read_write.write_bytes_queueable.unwrap_or(0);
683
0
                let (num_buffers, buffers_size) = buffers
684
0
                    .iter()
685
0
                    .scan(0, |count, buf| {
686
0
                        *count += buf.len();
687
0
                        Some(*count)
688
0
                    })
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes1_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes1_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes1_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes1_0CsiUjFBJteJ7x_17smoldot_full_node
689
0
                    .enumerate()
690
0
                    .take_while(|(_, sz)| *sz <= limit)
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes2_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes2_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes2_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes2_0CsiUjFBJteJ7x_17smoldot_full_node
691
0
                    .last()
692
0
                    .unwrap();
693
0
694
0
                outer_read_write
695
0
                    .write_buffers
696
0
                    .extend(buffers.drain(..num_buffers));
697
0
                outer_read_write.write_bytes_queued += buffers_size;
698
0
                *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_size;
699
0
            } else if outer_read_write.write_buffers.is_empty() {
700
                // Not enough space to fully push even the first buffer.
701
0
                if let Some(first) = buffers.first_mut() {
702
0
                    outer_read_write.write_from_vec(first);
703
0
                }
704
0
            }
705
66
        }
706
707
        // Consume as much incoming data as possible until either the incoming data buffer is
708
        // empty or we reach a non-empty data frame.
709
506
        loop {
710
506
            match self.inner.incoming {
711
0
                Incoming::PendingIncomingSubstream { .. } => break,
712
713
                Incoming::DataFrame {
714
                    remaining_bytes: 0, ..
715
62
                } => {
716
62
                    // Nothing more to do.
717
62
                    self.inner.incoming = Incoming::Header;
718
62
                }
719
720
                Incoming::DataFrame {
721
225
                    substream_id,
722
225
                    ref mut remaining_bytes,
723
                    ..
724
                } => {
725
                    // It is possible that we are receiving data corresponding to a substream for
726
                    // which a RST has been sent out by the local node. Since the
727
                    // local state machine doesn't keep track of RST'ted substreams, any
728
                    // frame concerning a substream that has been RST or doesn't exist is
729
                    // discarded and doesn't result in an error, under the presumption that we
730
                    // are in this situation.
731
                    let Some(Substream {
732
                        state:
733
                            SubstreamState::Healthy {
734
225
                                expected_incoming_bytes,
735
225
                                read_buffer,
736
225
                                substreams_wake_up_key,
737
                                ..
738
                            },
739
                        ..
740
225
                    }) = self.inner.substreams.get_mut(&substream_id)
741
                    else {
742
                        // Substream doesn't exist (as it's likely been RST as explained above).
743
                        // Discard the next `remaining_bytes`.
744
                        // TODO: don't use `incoming_bytes_take` but instead simply discard as much as possible or something
745
0
                        match outer_read_write.incoming_bytes_take(
746
0
                            usize::try_from(*remaining_bytes).unwrap_or(usize::MAX),
747
0
                        ) {
748
0
                            Ok(Some(taken)) => {
749
0
                                debug_assert_eq!(taken.len() as u32, *remaining_bytes);
750
0
                                *remaining_bytes = 0;
751
0
                                continue;
752
                            }
753
                            // TODO: how to deal with the read closed?
754
0
                            Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
755
                        }
756
                    };
757
758
                    // We copy data from the main incoming buffer to the read buffer of that
759
                    // substream.
760
                    // If there isn't enough data in `outer_read_write.incoming_buffer`
761
                    // compared to the substream has previously requested with
762
                    // `expected_incoming_bytes`, then we request more data from the outside.
763
                    // If instead there is enough data, we copy all the data immediately
764
                    // in order to avoid the cost of splitting the buffer.
765
766
225
                    debug_assert_ne!(*remaining_bytes, 0);
767
225
                    let to_copy = if *expected_incoming_bytes == Some(0) {
768
                        // If the substream refuses to process more incoming data right now,
769
                        // we copy everything into its read buffer in order to free the outer
770
                        // incoming buffer.
771
                        // The window system of Yamux ensures that the read buffer size is capped.
772
8
                        usize::try_from(*remaining_bytes).unwrap_or(usize::MAX)
773
                    } else {
774
217
                        cmp::min(
775
217
                            expected_incoming_bytes
776
217
                                .unwrap_or(0)
777
217
                                .saturating_sub(read_buffer.len()),
778
217
                            usize::try_from(*remaining_bytes).unwrap_or(usize::MAX),
779
217
                        )
780
                    };
781
782
225
                    match outer_read_write.incoming_bytes_take(to_copy) {
783
225
                        Ok(Some(mut data)) => {
784
225
                            *remaining_bytes -= u32::try_from(data.len()).unwrap();
785
225
                            if read_buffer.is_empty() {
786
189
                                *read_buffer = data;
787
189
                            } else {
788
36
                                read_buffer.append(&mut data);
789
36
                            }
790
                        }
791
                        // TODO: how to deal with the read closed?
792
0
                        Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
793
                    }
794
795
                    // If the substream has enough data to read, or has never been processed
796
                    // before, make sure that it will wake up as soon as possible.
797
225
                    if expected_incoming_bytes.map_or(true, |expected| {
798
198
                        expected != 0 && 
read_buffer.len() >= expected190
799
225
                    
}198
) {
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes3_0Bc_
Line
Count
Source
797
198
                    if expected_incoming_bytes.map_or(true, |expected| {
798
198
                        expected != 0 && 
read_buffer.len() >= expected190
799
198
                    }) {
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes3_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes3_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes3_0CsiUjFBJteJ7x_17smoldot_full_node
800
14
                        match substreams_wake_up_key {
801
14
                            Some(Some(when)) if *when <= outer_read_write.no
w => {}2
802
150
                            Some(None) => {}
803
12
                            Some(Some(when)) => {
804
12
                                let _was_removed = self
805
12
                                    .inner
806
12
                                    .substreams_wake_up
807
12
                                    .remove(&(Some(when.clone()), substream_id));
808
12
                                debug_assert!(_was_removed);
809
12
                                self.inner.substreams_wake_up.insert((None, substream_id));
810
12
                                *substreams_wake_up_key = Some(None);
811
                            }
812
53
                            _ => {
813
53
                                self.inner.substreams_wake_up.insert((None, substream_id));
814
53
                                *substreams_wake_up_key = Some(None);
815
53
                            }
816
                        }
817
818
                        // Also stop processing incoming data so that we can process the substream.
819
217
                        break;
820
8
                    }
821
8
822
8
                    // Make sure that some progress was made, otherwise we might get into an
823
8
                    // infinite loop.
824
8
                    debug_assert_ne!(to_copy, 0);
825
                }
826
827
                Incoming::Header => {
828
                    // Try to grab a header from the incoming buffer.
829
219
                    let 
header_bytes67
= match outer_read_write.incoming_bytes_take_array::<12>() {
830
67
                        Ok(Some(hdr)) => hdr,
831
152
                        Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
832
                    };
833
834
                    // Decode the header in `header_bytes`.
835
67
                    let decoded_header = match header::decode_yamux_header(&header_bytes) {
836
67
                        Ok(h) => h,
837
0
                        Err(err) => return Err(Error::HeaderDecode(err)),
838
                    };
839
840
67
                    match decoded_header {
841
0
                        header::DecodedYamuxHeader::PingRequest { opaque_value } => {
842
0
                            if self.inner.pongs_to_send.len()
843
0
                                >= self.inner.max_simultaneous_queued_pongs.get()
844
                            {
845
0
                                return Err(Error::MaxSimultaneousPingsExceeded);
846
0
                            }
847
0
848
0
                            self.inner.pongs_to_send.push_back(opaque_value);
849
0
                            self.inner.incoming = Incoming::Header;
850
                        }
851
852
0
                        header::DecodedYamuxHeader::PingResponse { opaque_value } => {
853
0
                            if self.inner.pings_waiting_reply.pop_front() != Some(opaque_value) {
854
0
                                return Err(Error::PingResponseNotMatching);
855
0
                            }
856
0
857
0
                            self.inner.incoming = Incoming::Header;
858
0
                            return Ok(ReadWriteOutcome::PingResponse { yamux: self });
859
                        }
860
861
0
                        header::DecodedYamuxHeader::GoAway { error_code } => {
862
0
                            if self.inner.received_goaway.is_some() {
863
0
                                return Err(Error::MultipleGoAways);
864
0
                            }
865
0
866
0
                            self.inner.incoming = Incoming::Header;
867
0
                            self.inner.received_goaway = Some(error_code);
868
0
869
0
                            let mut reset_substreams =
870
0
                                Vec::with_capacity(self.inner.substreams.len());
871
0
                            for (substream_id, substream) in self.inner.substreams.iter_mut() {
872
                                let SubstreamState::Healthy {
873
                                    remote_syn_acked: false,
874
0
                                    substreams_wake_up_key,
875
                                    ..
876
0
                                } = &mut substream.state
877
                                else {
878
0
                                    continue;
879
                                };
880
881
0
                                reset_substreams.push(SubstreamId(*substream_id));
882
0
883
0
                                let _was_inserted =
884
0
                                    self.inner.dead_substreams.insert(*substream_id);
885
0
                                debug_assert!(_was_inserted);
886
887
0
                                self.inner.substreams_write_ready.remove(substream_id);
888
0
                                self.inner.window_frames_to_send.remove(substream_id);
889
890
0
                                if let Some(k) = substreams_wake_up_key.take() {
891
0
                                    let _was_removed =
892
0
                                        self.inner.substreams_wake_up.remove(&(k, *substream_id));
893
0
                                    debug_assert!(_was_removed);
894
0
                                }
895
896
                                if let Outgoing::PreparingDataFrame {
897
0
                                    substream_id: s,
898
0
                                    write_buffers,
899
0
                                } = &mut self.inner.outgoing
900
                                {
901
0
                                    if *substream_id == *s {
902
0
                                        write_buffers.clear();
903
0
                                        self.inner.outgoing = Outgoing::WritingOut {
904
0
                                            buffers: mem::take(write_buffers),
905
0
                                        }
906
0
                                    }
907
0
                                }
908
909
0
                                substream.state = SubstreamState::Reset;
910
                            }
911
912
0
                            outer_read_write.wake_up_asap();
913
0
                            return Ok(ReadWriteOutcome::GoAway {
914
0
                                yamux: self,
915
0
                                code: error_code,
916
0
                                reset_substreams,
917
0
                            });
918
                        }
919
920
                        header::DecodedYamuxHeader::Data {
921
                            rst: true,
922
0
                            ack,
923
0
                            stream_id,
924
0
                            length,
925
                            ..
926
                        }
927
                        | header::DecodedYamuxHeader::Window {
928
                            rst: true,
929
0
                            ack,
930
0
                            stream_id,
931
0
                            length,
932
                            ..
933
                        } => {
934
                            // Frame with the `RST` flag set. Destroy the substream.
935
936
                            // Sending a `RST` flag and data together is a weird corner case and
937
                            // is difficult to handle. It is unclear whether it is allowed at all.
938
                            // We thus consider it as invalid.
939
0
                            if matches!(decoded_header, header::DecodedYamuxHeader::Data { .. })
940
0
                                && length != 0
941
                            {
942
0
                                return Err(Error::DataWithRst);
943
0
                            }
944
0
945
0
                            self.inner.incoming = Incoming::Header;
946
947
                            // The remote might have sent a RST frame concerning a substream for
948
                            // which we have sent a RST frame earlier. Considering that we don't
949
                            // always keep traces of old substreams, we have no way to know whether
950
                            // this is the case or not.
951
0
                            let Some(substream) = self.inner.substreams.get_mut(&stream_id) else {
952
0
                                continue;
953
                            };
954
                            let SubstreamState::Healthy {
955
0
                                remote_syn_acked,
956
0
                                substreams_wake_up_key,
957
                                ..
958
0
                            } = &mut substream.state
959
                            else {
960
0
                                continue;
961
                            };
962
963
0
                            let _was_inserted = self.inner.dead_substreams.insert(stream_id);
964
0
                            debug_assert!(_was_inserted);
965
966
                            // Check whether the remote has ACKed multiple times.
967
0
                            if *remote_syn_acked && ack {
968
0
                                return Err(Error::UnexpectedAck);
969
0
                            }
970
971
                            if let Outgoing::PreparingDataFrame {
972
0
                                substream_id,
973
0
                                write_buffers,
974
0
                            } = &mut self.inner.outgoing
975
                            {
976
0
                                if *substream_id == stream_id {
977
0
                                    write_buffers.clear();
978
0
                                    self.inner.outgoing = Outgoing::WritingOut {
979
0
                                        buffers: mem::take(write_buffers),
980
0
                                    }
981
0
                                }
982
0
                            }
983
984
0
                            self.inner.window_frames_to_send.remove(&stream_id);
985
0
                            self.inner.substreams_write_ready.remove(&stream_id);
986
987
0
                            if let Some(k) = substreams_wake_up_key.take() {
988
0
                                let _was_removed =
989
0
                                    self.inner.substreams_wake_up.remove(&(k, stream_id));
990
0
                                debug_assert!(_was_removed);
991
0
                            }
992
993
0
                            substream.state = SubstreamState::Reset;
994
0
995
0
                            outer_read_write.wake_up_asap();
996
0
                            return Ok(ReadWriteOutcome::StreamReset {
997
0
                                yamux: self,
998
0
                                substream_id: SubstreamId(stream_id),
999
0
                            });
1000
                        }
1001
1002
                        header::DecodedYamuxHeader::Data {
1003
                            syn: true,
1004
                            ack: true,
1005
                            ..
1006
                        }
1007
                        | header::DecodedYamuxHeader::Window {
1008
                            syn: true,
1009
                            ack: true,
1010
                            ..
1011
                        } => {
1012
                            // You're never supposed to send a SYN and ACK at the same time.
1013
0
                            return Err(Error::UnexpectedAck);
1014
                        }
1015
1016
                        header::DecodedYamuxHeader::Data {
1017
                            syn: true,
1018
20
                            fin,
1019
20
                            rst: false,
1020
20
                            stream_id,
1021
20
                            length,
1022
                            ..
1023
                        }
1024
                        | header::DecodedYamuxHeader::Window {
1025
                            syn: true,
1026
0
                            fin,
1027
0
                            rst: false,
1028
0
                            stream_id,
1029
0
                            length,
1030
                            ..
1031
                        } => {
1032
                            // The initiator should only allocate uneven substream IDs, and the
1033
                            // other side only even IDs. We don't know anymore whether we're
1034
                            // initiator at this point, but we can compare with the even-ness of
1035
                            // the IDs that we allocate locally.
1036
20
                            if (self.inner.next_outbound_substream.get() % 2)
1037
20
                                == (stream_id.get() % 2)
1038
                            {
1039
0
                                return Err(Error::InvalidInboundStreamId(stream_id));
1040
20
                            }
1041
20
1042
20
                            // Remote has sent a SYN flag. A new substream is to be opened.
1043
20
                            match self.inner.substreams.get(&stream_id) {
1044
20
                                None => {}
1045
                                Some(Substream {
1046
                                    state:
1047
                                        SubstreamState::Healthy {
1048
                                            local_write_close: SubstreamStateLocalWrite::FinQueued,
1049
                                            remote_write_closed: true,
1050
                                            ..
1051
                                        },
1052
                                    ..
1053
                                })
1054
                                | Some(Substream {
1055
                                    state: SubstreamState::Reset,
1056
                                    ..
1057
                                }) => {
1058
                                    // Because we don't immediately destroy substreams, the remote
1059
                                    // might decide to re-use a substream ID that is still
1060
                                    // allocated locally. If that happens, we block the reading.
1061
                                    // It will be unblocked when the API user destroys the old
1062
                                    // substream.
1063
0
                                    break;
1064
                                }
1065
                                Some(Substream {
1066
                                    state: SubstreamState::Healthy { .. },
1067
                                    ..
1068
                                }) => {
1069
0
                                    return Err(Error::UnexpectedSyn(stream_id));
1070
                                }
1071
                            }
1072
1073
                            // When receiving a new substream, we might have to potentially queue
1074
                            // a substream rejection message later.
1075
                            // In order to ensure that there is enough space in `rsts_to_send`,
1076
                            // we check it against the limit now.
1077
20
                            if self.inner.rsts_to_send.len()
1078
20
                                >= self.inner.max_simultaneous_rst_substreams.get()
1079
                            {
1080
0
                                return Err(Error::MaxSimultaneousRstSubstreamsExceeded);
1081
20
                            }
1082
1083
20
                            let is_data =
1084
20
                                
matches!0
(decoded_header, header::DecodedYamuxHeader::Data { .. });
1085
1086
                            // If we have queued or sent a GoAway frame, then the substream is
1087
                            // ignored. The remote understands when receiving the GoAway that the
1088
                            // substream has been rejected.
1089
20
                            if !
matches!0
(self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) {
1090
0
                                self.inner.incoming = if !is_data {
1091
0
                                    Incoming::Header
1092
                                } else {
1093
0
                                    Incoming::DataFrame {
1094
0
                                        substream_id: stream_id,
1095
0
                                        remaining_bytes: length,
1096
0
                                    }
1097
                                };
1098
1099
0
                                continue;
1100
20
                            }
1101
20
1102
20
                            if is_data && u64::from(length) > NEW_SUBSTREAMS_FRAME_SIZE {
1103
0
                                return Err(Error::CreditsExceeded);
1104
20
                            }
1105
20
1106
20
                            self.inner.incoming = Incoming::PendingIncomingSubstream {
1107
20
                                substream_id: stream_id,
1108
20
                                extra_window: if !is_data { 
length0
} else { 0 },
1109
20
                                data_frame_size: if is_data { length } else { 
00
},
1110
20
                                fin,
1111
20
                            };
1112
20
1113
20
                            return Ok(ReadWriteOutcome::IncomingSubstream { yamux: self });
1114
                        }
1115
1116
                        header::DecodedYamuxHeader::Data {
1117
                            syn: false,
1118
                            rst: false,
1119
47
                            stream_id,
1120
47
                            length,
1121
47
                            ack,
1122
47
                            fin,
1123
                            ..
1124
                        } => {
1125
                            // Note that it is possible that the remote is referring to a substream
1126
                            // for which a RST has been sent out by the local node. Since the
1127
                            // local state machine doesn't keep track of RST'ted substreams, any
1128
                            // frame concerning a substream that has been RST or with an unknown
1129
                            // id is discarded and doesn't result in an error, under the
1130
                            // presumption that we are in this situation.
1131
                            if let Some(Substream {
1132
                                state:
1133
                                    SubstreamState::Healthy {
1134
47
                                        remote_write_closed,
1135
47
                                        remote_allowed_window,
1136
47
                                        remote_syn_acked,
1137
47
                                        substreams_wake_up_key,
1138
                                        ..
1139
                                    },
1140
                                ..
1141
47
                            }) = self.inner.substreams.get_mut(&stream_id)
1142
                            {
1143
47
                                match (ack, remote_syn_acked) {
1144
28
                                    (false, true) => {}
1145
19
                                    (true, acked @ false) => *acked = true,
1146
0
                                    (true, true) => return Err(Error::UnexpectedAck),
1147
0
                                    (false, false) => return Err(Error::ExpectedAck),
1148
                                }
1149
1150
47
                                if *remote_write_closed {
1151
0
                                    return Err(Error::WriteAfterFin);
1152
47
                                }
1153
47
1154
47
                                if fin {
1155
5
                                    *remote_write_closed = true;
1156
5
1157
5
                                    // No need to send window frames anymore if the remote has
1158
5
                                    // sent a FIN.
1159
5
                                    self.inner.window_frames_to_send.remove(&stream_id);
1160
1161
                                    // Wake up the substream.
1162
0
                                    match substreams_wake_up_key {
1163
0
                                        Some(Some(when)) if *when <= outer_read_write.now => {}
1164
2
                                        Some(None) => {}
1165
0
                                        Some(Some(when)) => {
1166
0
                                            let _was_removed = self
1167
0
                                                .inner
1168
0
                                                .substreams_wake_up
1169
0
                                                .remove(&(Some(when.clone()), stream_id));
1170
0
                                            debug_assert!(_was_removed);
1171
0
                                            self.inner.substreams_wake_up.insert((None, stream_id));
1172
0
                                            *substreams_wake_up_key = Some(None);
1173
                                        }
1174
3
                                        _ => {
1175
3
                                            self.inner.substreams_wake_up.insert((None, stream_id));
1176
3
                                            *substreams_wake_up_key = Some(None);
1177
3
                                        }
1178
                                    }
1179
42
                                }
1180
1181
                                // Check whether the remote has the right to send that much data.
1182
                                // Note that the credits aren't checked in the case of an unknown
1183
                                // substream.
1184
47
                                *remote_allowed_window = remote_allowed_window
1185
47
                                    .checked_sub(u64::from(length))
1186
47
                                    .ok_or(Error::CreditsExceeded)
?0
;
1187
0
                            }
1188
1189
                            // Switch to the `DataFrame` state in order to process the frame, even
1190
                            // if the substream no longer exists, in order to not duplicate code.
1191
47
                            self.inner.incoming = Incoming::DataFrame {
1192
47
                                substream_id: stream_id,
1193
47
                                remaining_bytes: length,
1194
47
                            };
1195
                        }
1196
1197
                        header::DecodedYamuxHeader::Window {
1198
                            syn: false,
1199
                            rst: false,
1200
0
                            stream_id,
1201
0
                            length,
1202
0
                            ack,
1203
0
                            fin,
1204
                            ..
1205
                        } => {
1206
                            // Note that it is possible that the remote is referring to a substream
1207
                            // for which a RST has been sent out by the local node. Since the
1208
                            // local state machine doesn't keep track of RST'ted substreams, any
1209
                            // frame concerning a substream that has been RST or with an unknown
1210
                            // id is discarded and doesn't result in an error, under the
1211
                            // presumption that we are in this situation.
1212
                            let Some(Substream {
1213
                                state:
1214
                                    SubstreamState::Healthy {
1215
0
                                        remote_syn_acked,
1216
0
                                        allowed_window,
1217
0
                                        remote_write_closed,
1218
                                        ..
1219
                                    },
1220
                                ..
1221
0
                            }) = self.inner.substreams.get_mut(&stream_id)
1222
                            else {
1223
0
                                self.inner.incoming = Incoming::Header;
1224
0
                                continue;
1225
                            };
1226
1227
0
                            match (ack, remote_syn_acked) {
1228
0
                                (false, true) => {}
1229
0
                                (true, acked @ false) => *acked = true,
1230
0
                                (true, true) => return Err(Error::UnexpectedAck),
1231
0
                                (false, false) => return Err(Error::ExpectedAck),
1232
                            }
1233
1234
                            // Note that the spec is unclear about whether the remote can or
1235
                            // should continue sending FIN flags on window size frames after
1236
                            // their side of the substream has already been closed before.
1237
0
                            if fin {
1238
0
                                *remote_write_closed = true;
1239
0
                            }
1240
1241
                            // If a substream was processed with a non-zero queuable bytes
1242
                            // value (i.e. `allowed_window` non-zero) but yields empty write
1243
                            // buffers, then we can assume that the substream has nothing to write,
1244
                            // and thus that simply increasing the queueable bytes will not change
1245
                            // the situation. After all, we have no idea whether the remote will
1246
                            // increase the window size of this substream any further in the
1247
                            // future, and thus we must write data out no matter what the window
1248
                            // size is as long as it is non-zero.
1249
                            // On the other hand, if the queueable bytes were 0, we take the guess
1250
                            // that the substream might have more to write.
1251
0
                            if length != 0 && *allowed_window == 0 {
1252
0
                                self.inner.substreams_write_ready.insert(stream_id);
1253
0
                            }
1254
1255
0
                            *allowed_window = allowed_window
1256
0
                                .checked_add(u64::from(length))
1257
0
                                .ok_or(Error::LocalCreditsOverflow)?;
1258
1259
0
                            self.inner.incoming = Incoming::Header;
1260
                        }
1261
                    }
1262
                }
1263
            }
1264
        }
1265
1266
        // Choose which substream to read/write (if any).
1267
329
        let substream_id = match (
1268
369
            &self.inner.outgoing,
1269
369
            self.inner.substreams_write_ready.iter().next().copied(),
1270
369
            self.inner.substreams_wake_up.first(),
1271
        ) {
1272
            (
1273
                Outgoing::PreparingDataFrame {
1274
66
                    substream_id,
1275
66
                    write_buffers,
1276
66
                },
1277
66
                _,
1278
66
                _,
1279
66
            ) => {
1280
66
                // Continue writing to the substream whose frame we're already preparing.
1281
66
                debug_assert!(!write_buffers.is_empty());
1282
66
                self.inner.substreams_write_ready.remove(substream_id);
1283
66
                *substream_id
1284
            }
1285
92
            (Outgoing::WritingOut { buffers }, Some(substream_id), _) if buffers.is_empty() => {
1286
92
                // Pull a substream from `substreams_write_ready`.
1287
92
                self.inner.substreams_write_ready.remove(&substream_id);
1288
92
                substream_id
1289
            }
1290
195
            (_, _, Some((when, substream_id)))
1291
195
                if when
1292
195
                    .as_ref()
1293
195
                    .map_or(true, |when| 
*when <= outer_read_write.now26
) =>
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes4_0Bc_
Line
Count
Source
1293
26
                    .map_or(true, |when| *when <= outer_read_write.now) =>
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes4_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes4_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes4_0CsiUjFBJteJ7x_17smoldot_full_node
1294
171
            {
1295
171
                *substream_id
1296
            }
1297
            _ => {
1298
                // No substream to read/write.
1299
40
                return Ok(ReadWriteOutcome::Idle { yamux: self });
1300
            }
1301
        };
1302
1303
        // Extract some fields from the substream state.
1304
        let SubstreamState::Healthy {
1305
329
            allowed_window,
1306
329
            local_write_close,
1307
329
            remote_write_closed,
1308
329
            read_buffer,
1309
329
            substreams_wake_up_key,
1310
            ..
1311
329
        } = &mut self.inner.substreams.get_mut(&substream_id).unwrap().state
1312
        else {
1313
0
            unreachable!()
1314
        };
1315
1316
        // Remove the substream from `substreams_wake_up`, since we're processing it now.
1317
        // If the processing produces a `wake_up_after` value when being processed, it will be
1318
        // re-inserted.
1319
329
        if let Some(
substreams_wake_up_key273
) = substreams_wake_up_key.take() {
1320
273
            let _was_removed = self
1321
273
                .inner
1322
273
                .substreams_wake_up
1323
273
                .remove(&(substreams_wake_up_key, substream_id));
1324
273
            debug_assert!(_was_removed);
1325
56
        }
1326
1327
329
        let (write_buffers, can_queue_data) = match 
&mut self.inner.outgoing66
{
1328
263
            Outgoing::WritingOut { buffers } if buffers.is_empty() => {
1329
263
                let mut buffers = mem::take(buffers);
1330
263
                // As a small optimization, we push an empty buffer at the front where the header
1331
263
                // might later get written.
1332
263
                buffers.push(Vec::with_capacity(12));
1333
263
                (buffers, true)
1334
            }
1335
            Outgoing::PreparingDataFrame {
1336
66
                substream_id: s,
1337
66
                write_buffers,
1338
66
            } if *s == substream_id => {
1339
66
                let buffers = mem::take(write_buffers);
1340
66
                self.inner.outgoing = Outgoing::WritingOut {
1341
66
                    buffers: Vec::new(),
1342
66
                };
1343
66
                (buffers, true)
1344
            }
1345
0
            _ => (Vec::new(), false),
1346
        };
1347
461
        let 
write_buffers_len_before = write_buffers.iter().fold(0, 329
|count, buf| count + buf.len());
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes5_0Bc_
Line
Count
Source
1347
461
        let write_buffers_len_before = write_buffers.iter().fold(0, |count, buf| count + buf.len());
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes5_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes5_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes5_0CsiUjFBJteJ7x_17smoldot_full_node
1348
329
1349
329
        Ok(ReadWriteOutcome::ProcessSubstream {
1350
329
            substream_read_write: SubstreamReadWrite {
1351
329
                substream_id,
1352
329
                write_buffers_len_before,
1353
329
                inner_read_write: ReadWrite {
1354
329
                    now: outer_read_write.now.clone(),
1355
329
                    incoming_buffer: mem::take(read_buffer),
1356
329
                    expected_incoming_bytes: if !*remote_write_closed { 
Some(0)320
} else {
None9
},
1357
                    read_bytes: 0,
1358
329
                    write_buffers,
1359
329
                    write_bytes_queued: write_buffers_len_before,
1360
10
                    write_bytes_queueable: if matches!(
1361
329
                        local_write_close,
1362
                        SubstreamStateLocalWrite::Open
1363
                    ) {
1364
319
                        if can_queue_data {
1365
319
                            Some(
1366
319
                                usize::try_from(cmp::min(
1367
319
                                    u64::from(self.inner.max_out_data_frame_size.get()),
1368
319
                                    *allowed_window,
1369
319
                                ))
1370
319
                                .unwrap_or(usize::MAX)
1371
319
                                .saturating_sub(write_buffers_len_before),
1372
319
                            )
1373
                        } else {
1374
0
                            Some(0)
1375
                        }
1376
                    } else {
1377
10
                        None
1378
                    },
1379
329
                    wake_up_after: None,
1380
329
                },
1381
329
                outer_read_write,
1382
329
                yamux: self,
1383
            },
1384
        })
1385
389
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE10read_writeBa_
Line
Count
Source
535
389
    pub fn read_write(
536
389
        mut self,
537
389
        outer_read_write: &mut ReadWrite<TNow>,
538
389
    ) -> Result<ReadWriteOutcome<'_, TNow, TSub>, Error> {
539
        // Queue something for writing if necessary.
540
389
        if let Outgoing::WritingOut { 
buffers323
} = &mut self.inner.outgoing {
541
            // Make sure that it's not just a list of empty buffers.
542
323
            debug_assert_eq!(
543
323
                buffers.is_empty(),
544
323
                buffers.iter().fold(0, |sz, b| sz + b.len()) == 0
545
            );
546
547
323
            if buffers.is_empty() {
548
254
                if let OutgoingGoAway::Queued = self.inner.outgoing_goaway {
549
0
                    self.inner.outgoing_goaway = OutgoingGoAway::Sent;
550
254
                }
551
552
254
                if let OutgoingGoAway::Required(
error_code0
) = self.inner.outgoing_goaway {
553
0
                    // Send a `GoAway` frame if demanded.
554
0
                    buffers.push(
555
0
                        header::encode(&header::DecodedYamuxHeader::GoAway { error_code }).to_vec(),
556
0
                    );
557
0
                    self.inner.outgoing_goaway = OutgoingGoAway::Queued;
558
254
                } else if let Some(
substream_id0
) = self.inner.rsts_to_send.pop_front() {
559
0
                    // Send RST frame.
560
0
                    buffers.push(
561
0
                        header::encode(&header::DecodedYamuxHeader::Window {
562
0
                            syn: false,
563
0
                            ack: false,
564
0
                            fin: false,
565
0
                            rst: true,
566
0
                            stream_id: substream_id,
567
0
                            length: 0,
568
0
                        })
569
0
                        .to_vec(),
570
0
                    );
571
254
                } else if self.inner.pings_to_send > 0 {
572
                    // Send outgoing pings.
573
0
                    self.inner.pings_to_send -= 1;
574
0
                    let opaque_value: u32 = self.inner.randomness.next_u32();
575
0
                    self.inner.pings_waiting_reply.push_back(opaque_value);
576
0
                    buffers.push(
577
0
                        header::encode(&header::DecodedYamuxHeader::PingRequest { opaque_value })
578
0
                            .to_vec(),
579
0
                    );
580
0
                    debug_assert!(self.inner.pings_waiting_reply.len() <= MAX_PINGS);
581
254
                } else if let Some(
opaque_value0
) = self.inner.pongs_to_send.pop_front() {
582
0
                    // Send outgoing pongs.
583
0
                    buffers.push(
584
0
                        header::encode(&header::DecodedYamuxHeader::PingResponse { opaque_value })
585
0
                            .to_vec(),
586
0
                    );
587
254
                } else if let Some(
substream_id0
) = self
588
254
                    .inner
589
254
                    .window_frames_to_send
590
254
                    .keys()
591
254
                    .choose(&mut self.inner.randomness)
592
254
                    .copied()
593
                {
594
                    // Send window frame.
595
                    let Some(Substream {
596
0
                        inbound,
597
0
                        state:
598
0
                            SubstreamState::Healthy {
599
0
                                first_message_queued,
600
0
                                remote_allowed_window,
601
0
                                local_write_close,
602
                                ..
603
                            },
604
                        ..
605
0
                    }) = &mut self.inner.substreams.get_mut(&substream_id)
606
                    else {
607
0
                        unreachable!()
608
                    };
609
610
0
                    let mut pending_window_increase = self
611
0
                        .inner
612
0
                        .window_frames_to_send
613
0
                        .remove(&substream_id)
614
0
                        .unwrap()
615
0
                        .get();
616
0
617
0
                    let actual_window_update =
618
0
                        u32::try_from(pending_window_increase).unwrap_or(u32::MAX);
619
0
                    buffers.push(
620
0
                        header::encode(&header::DecodedYamuxHeader::Window {
621
0
                            syn: !*first_message_queued && !*inbound,
622
0
                            ack: !*first_message_queued && *inbound,
623
                            // Note that it is unclear whether `fin` should be set if the local
624
                            // writing state has been written before.
625
0
                            fin: matches!(local_write_close, SubstreamStateLocalWrite::FinDesired),
626
                            rst: false,
627
0
                            stream_id: substream_id,
628
0
                            length: actual_window_update,
629
0
                        })
630
0
                        .to_vec(),
631
0
                    );
632
0
633
0
                    *remote_allowed_window =
634
0
                        remote_allowed_window.saturating_add(u64::from(actual_window_update));
635
0
                    pending_window_increase -= u64::from(actual_window_update);
636
0
                    *first_message_queued = true;
637
0
                    if matches!(local_write_close, SubstreamStateLocalWrite::FinDesired) {
638
0
                        *local_write_close = SubstreamStateLocalWrite::FinQueued;
639
0
                    }
640
641
                    // In the rare situation where the window update doesn't fit in a `u32`, we
642
                    // have to send another window frame again later.
643
0
                    if let Some(pending_window_increase) = NonZeroU64::new(pending_window_increase)
644
0
                    {
645
0
                        self.inner
646
0
                            .window_frames_to_send
647
0
                            .insert(substream_id, pending_window_increase);
648
0
                    }
649
254
                }
650
69
            }
651
66
        }
652
653
        // Try finish writing the data currently being written.
654
389
        if let Outgoing::WritingOut { 
buffers323
} = &mut self.inner.outgoing {
655
323
            let buffers_total_size = buffers.iter().fold(0, |count, buf| count + buf.len());
656
323
657
323
            if buffers_total_size == 0 {
658
254
                // Do nothing.
659
254
            } else if outer_read_write
660
69
                .write_bytes_queueable
661
69
                .map_or(false, |queuable| buffers_total_size <= queuable)
662
            {
663
                // We can directly push all the write buffers to the `ReadWrite`.
664
69
                if outer_read_write.write_buffers.is_empty() {
665
69
                    debug_assert_eq!(outer_read_write.write_bytes_queued, 0);
666
69
                    outer_read_write.write_buffers = mem::take(buffers);
667
0
                } else {
668
0
                    outer_read_write.write_buffers.append(buffers);
669
0
                }
670
671
69
                outer_read_write.write_bytes_queued += buffers_total_size;
672
69
                *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_total_size;
673
0
            } else if outer_read_write.write_buffers.is_empty()
674
0
                && outer_read_write
675
0
                    .write_bytes_queueable
676
0
                    .map_or(false, |queueable| {
677
                        buffers.first().map_or(0, |b| b.len()) <= queueable
678
0
                    })
679
0
            {
680
0
                // Not enough space to push all the buffers at once, but enough space to push at
681
0
                // least the first one. Push as many buffers as possible.
682
0
                let limit = outer_read_write.write_bytes_queueable.unwrap_or(0);
683
0
                let (num_buffers, buffers_size) = buffers
684
0
                    .iter()
685
0
                    .scan(0, |count, buf| {
686
                        *count += buf.len();
687
                        Some(*count)
688
0
                    })
689
0
                    .enumerate()
690
0
                    .take_while(|(_, sz)| *sz <= limit)
691
0
                    .last()
692
0
                    .unwrap();
693
0
694
0
                outer_read_write
695
0
                    .write_buffers
696
0
                    .extend(buffers.drain(..num_buffers));
697
0
                outer_read_write.write_bytes_queued += buffers_size;
698
0
                *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_size;
699
0
            } else if outer_read_write.write_buffers.is_empty() {
700
                // Not enough space to fully push even the first buffer.
701
0
                if let Some(first) = buffers.first_mut() {
702
0
                    outer_read_write.write_from_vec(first);
703
0
                }
704
0
            }
705
66
        }
706
707
        // Consume as much incoming data as possible until either the incoming data buffer is
708
        // empty or we reach a non-empty data frame.
709
506
        loop {
710
506
            match self.inner.incoming {
711
0
                Incoming::PendingIncomingSubstream { .. } => break,
712
713
                Incoming::DataFrame {
714
                    remaining_bytes: 0, ..
715
62
                } => {
716
62
                    // Nothing more to do.
717
62
                    self.inner.incoming = Incoming::Header;
718
62
                }
719
720
                Incoming::DataFrame {
721
225
                    substream_id,
722
225
                    ref mut remaining_bytes,
723
                    ..
724
                } => {
725
                    // It is possible that we are receiving data corresponding to a substream for
726
                    // which a RST has been sent out by the local node. Since the
727
                    // local state machine doesn't keep track of RST'ted substreams, any
728
                    // frame concerning a substream that has been RST or doesn't exist is
729
                    // discarded and doesn't result in an error, under the presumption that we
730
                    // are in this situation.
731
                    let Some(Substream {
732
                        state:
733
                            SubstreamState::Healthy {
734
225
                                expected_incoming_bytes,
735
225
                                read_buffer,
736
225
                                substreams_wake_up_key,
737
                                ..
738
                            },
739
                        ..
740
225
                    }) = self.inner.substreams.get_mut(&substream_id)
741
                    else {
742
                        // Substream doesn't exist (as it's likely been RST as explained above).
743
                        // Discard the next `remaining_bytes`.
744
                        // TODO: don't use `incoming_bytes_take` but instead simply discard as much as possible or something
745
0
                        match outer_read_write.incoming_bytes_take(
746
0
                            usize::try_from(*remaining_bytes).unwrap_or(usize::MAX),
747
0
                        ) {
748
0
                            Ok(Some(taken)) => {
749
0
                                debug_assert_eq!(taken.len() as u32, *remaining_bytes);
750
0
                                *remaining_bytes = 0;
751
0
                                continue;
752
                            }
753
                            // TODO: how to deal with the read closed?
754
0
                            Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
755
                        }
756
                    };
757
758
                    // We copy data from the main incoming buffer to the read buffer of that
759
                    // substream.
760
                    // If there isn't enough data in `outer_read_write.incoming_buffer`
761
                    // compared to the substream has previously requested with
762
                    // `expected_incoming_bytes`, then we request more data from the outside.
763
                    // If instead there is enough data, we copy all the data immediately
764
                    // in order to avoid the cost of splitting the buffer.
765
766
225
                    debug_assert_ne!(*remaining_bytes, 0);
767
225
                    let to_copy = if *expected_incoming_bytes == Some(0) {
768
                        // If the substream refuses to process more incoming data right now,
769
                        // we copy everything into its read buffer in order to free the outer
770
                        // incoming buffer.
771
                        // The window system of Yamux ensures that the read buffer size is capped.
772
8
                        usize::try_from(*remaining_bytes).unwrap_or(usize::MAX)
773
                    } else {
774
217
                        cmp::min(
775
217
                            expected_incoming_bytes
776
217
                                .unwrap_or(0)
777
217
                                .saturating_sub(read_buffer.len()),
778
217
                            usize::try_from(*remaining_bytes).unwrap_or(usize::MAX),
779
217
                        )
780
                    };
781
782
225
                    match outer_read_write.incoming_bytes_take(to_copy) {
783
225
                        Ok(Some(mut data)) => {
784
225
                            *remaining_bytes -= u32::try_from(data.len()).unwrap();
785
225
                            if read_buffer.is_empty() {
786
189
                                *read_buffer = data;
787
189
                            } else {
788
36
                                read_buffer.append(&mut data);
789
36
                            }
790
                        }
791
                        // TODO: how to deal with the read closed?
792
0
                        Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
793
                    }
794
795
                    // If the substream has enough data to read, or has never been processed
796
                    // before, make sure that it will wake up as soon as possible.
797
225
                    if expected_incoming_bytes.map_or(true, |expected| {
798
                        expected != 0 && read_buffer.len() >= expected
799
225
                    }) {
800
14
                        match substreams_wake_up_key {
801
14
                            Some(Some(when)) if *when <= outer_read_write.no
w => {}2
802
150
                            Some(None) => {}
803
12
                            Some(Some(when)) => {
804
12
                                let _was_removed = self
805
12
                                    .inner
806
12
                                    .substreams_wake_up
807
12
                                    .remove(&(Some(when.clone()), substream_id));
808
12
                                debug_assert!(_was_removed);
809
12
                                self.inner.substreams_wake_up.insert((None, substream_id));
810
12
                                *substreams_wake_up_key = Some(None);
811
                            }
812
53
                            _ => {
813
53
                                self.inner.substreams_wake_up.insert((None, substream_id));
814
53
                                *substreams_wake_up_key = Some(None);
815
53
                            }
816
                        }
817
818
                        // Also stop processing incoming data so that we can process the substream.
819
217
                        break;
820
8
                    }
821
8
822
8
                    // Make sure that some progress was made, otherwise we might get into an
823
8
                    // infinite loop.
824
8
                    debug_assert_ne!(to_copy, 0);
825
                }
826
827
                Incoming::Header => {
828
                    // Try to grab a header from the incoming buffer.
829
219
                    let 
header_bytes67
= match outer_read_write.incoming_bytes_take_array::<12>() {
830
67
                        Ok(Some(hdr)) => hdr,
831
152
                        Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break,
832
                    };
833
834
                    // Decode the header in `header_bytes`.
835
67
                    let decoded_header = match header::decode_yamux_header(&header_bytes) {
836
67
                        Ok(h) => h,
837
0
                        Err(err) => return Err(Error::HeaderDecode(err)),
838
                    };
839
840
67
                    match decoded_header {
841
0
                        header::DecodedYamuxHeader::PingRequest { opaque_value } => {
842
0
                            if self.inner.pongs_to_send.len()
843
0
                                >= self.inner.max_simultaneous_queued_pongs.get()
844
                            {
845
0
                                return Err(Error::MaxSimultaneousPingsExceeded);
846
0
                            }
847
0
848
0
                            self.inner.pongs_to_send.push_back(opaque_value);
849
0
                            self.inner.incoming = Incoming::Header;
850
                        }
851
852
0
                        header::DecodedYamuxHeader::PingResponse { opaque_value } => {
853
0
                            if self.inner.pings_waiting_reply.pop_front() != Some(opaque_value) {
854
0
                                return Err(Error::PingResponseNotMatching);
855
0
                            }
856
0
857
0
                            self.inner.incoming = Incoming::Header;
858
0
                            return Ok(ReadWriteOutcome::PingResponse { yamux: self });
859
                        }
860
861
0
                        header::DecodedYamuxHeader::GoAway { error_code } => {
862
0
                            if self.inner.received_goaway.is_some() {
863
0
                                return Err(Error::MultipleGoAways);
864
0
                            }
865
0
866
0
                            self.inner.incoming = Incoming::Header;
867
0
                            self.inner.received_goaway = Some(error_code);
868
0
869
0
                            let mut reset_substreams =
870
0
                                Vec::with_capacity(self.inner.substreams.len());
871
0
                            for (substream_id, substream) in self.inner.substreams.iter_mut() {
872
                                let SubstreamState::Healthy {
873
                                    remote_syn_acked: false,
874
0
                                    substreams_wake_up_key,
875
                                    ..
876
0
                                } = &mut substream.state
877
                                else {
878
0
                                    continue;
879
                                };
880
881
0
                                reset_substreams.push(SubstreamId(*substream_id));
882
0
883
0
                                let _was_inserted =
884
0
                                    self.inner.dead_substreams.insert(*substream_id);
885
0
                                debug_assert!(_was_inserted);
886
887
0
                                self.inner.substreams_write_ready.remove(substream_id);
888
0
                                self.inner.window_frames_to_send.remove(substream_id);
889
890
0
                                if let Some(k) = substreams_wake_up_key.take() {
891
0
                                    let _was_removed =
892
0
                                        self.inner.substreams_wake_up.remove(&(k, *substream_id));
893
0
                                    debug_assert!(_was_removed);
894
0
                                }
895
896
                                if let Outgoing::PreparingDataFrame {
897
0
                                    substream_id: s,
898
0
                                    write_buffers,
899
0
                                } = &mut self.inner.outgoing
900
                                {
901
0
                                    if *substream_id == *s {
902
0
                                        write_buffers.clear();
903
0
                                        self.inner.outgoing = Outgoing::WritingOut {
904
0
                                            buffers: mem::take(write_buffers),
905
0
                                        }
906
0
                                    }
907
0
                                }
908
909
0
                                substream.state = SubstreamState::Reset;
910
                            }
911
912
0
                            outer_read_write.wake_up_asap();
913
0
                            return Ok(ReadWriteOutcome::GoAway {
914
0
                                yamux: self,
915
0
                                code: error_code,
916
0
                                reset_substreams,
917
0
                            });
918
                        }
919
920
                        header::DecodedYamuxHeader::Data {
921
                            rst: true,
922
0
                            ack,
923
0
                            stream_id,
924
0
                            length,
925
                            ..
926
                        }
927
                        | header::DecodedYamuxHeader::Window {
928
                            rst: true,
929
0
                            ack,
930
0
                            stream_id,
931
0
                            length,
932
                            ..
933
                        } => {
934
                            // Frame with the `RST` flag set. Destroy the substream.
935
936
                            // Sending a `RST` flag and data together is a weird corner case and
937
                            // is difficult to handle. It is unclear whether it is allowed at all.
938
                            // We thus consider it as invalid.
939
0
                            if matches!(decoded_header, header::DecodedYamuxHeader::Data { .. })
940
0
                                && length != 0
941
                            {
942
0
                                return Err(Error::DataWithRst);
943
0
                            }
944
0
945
0
                            self.inner.incoming = Incoming::Header;
946
947
                            // The remote might have sent a RST frame concerning a substream for
948
                            // which we have sent a RST frame earlier. Considering that we don't
949
                            // always keep traces of old substreams, we have no way to know whether
950
                            // this is the case or not.
951
0
                            let Some(substream) = self.inner.substreams.get_mut(&stream_id) else {
952
0
                                continue;
953
                            };
954
                            let SubstreamState::Healthy {
955
0
                                remote_syn_acked,
956
0
                                substreams_wake_up_key,
957
                                ..
958
0
                            } = &mut substream.state
959
                            else {
960
0
                                continue;
961
                            };
962
963
0
                            let _was_inserted = self.inner.dead_substreams.insert(stream_id);
964
0
                            debug_assert!(_was_inserted);
965
966
                            // Check whether the remote has ACKed multiple times.
967
0
                            if *remote_syn_acked && ack {
968
0
                                return Err(Error::UnexpectedAck);
969
0
                            }
970
971
                            if let Outgoing::PreparingDataFrame {
972
0
                                substream_id,
973
0
                                write_buffers,
974
0
                            } = &mut self.inner.outgoing
975
                            {
976
0
                                if *substream_id == stream_id {
977
0
                                    write_buffers.clear();
978
0
                                    self.inner.outgoing = Outgoing::WritingOut {
979
0
                                        buffers: mem::take(write_buffers),
980
0
                                    }
981
0
                                }
982
0
                            }
983
984
0
                            self.inner.window_frames_to_send.remove(&stream_id);
985
0
                            self.inner.substreams_write_ready.remove(&stream_id);
986
987
0
                            if let Some(k) = substreams_wake_up_key.take() {
988
0
                                let _was_removed =
989
0
                                    self.inner.substreams_wake_up.remove(&(k, stream_id));
990
0
                                debug_assert!(_was_removed);
991
0
                            }
992
993
0
                            substream.state = SubstreamState::Reset;
994
0
995
0
                            outer_read_write.wake_up_asap();
996
0
                            return Ok(ReadWriteOutcome::StreamReset {
997
0
                                yamux: self,
998
0
                                substream_id: SubstreamId(stream_id),
999
0
                            });
1000
                        }
1001
1002
                        header::DecodedYamuxHeader::Data {
1003
                            syn: true,
1004
                            ack: true,
1005
                            ..
1006
                        }
1007
                        | header::DecodedYamuxHeader::Window {
1008
                            syn: true,
1009
                            ack: true,
1010
                            ..
1011
                        } => {
1012
                            // You're never supposed to send a SYN and ACK at the same time.
1013
0
                            return Err(Error::UnexpectedAck);
1014
                        }
1015
1016
                        header::DecodedYamuxHeader::Data {
1017
                            syn: true,
1018
20
                            fin,
1019
20
                            rst: false,
1020
20
                            stream_id,
1021
20
                            length,
1022
                            ..
1023
                        }
1024
                        | header::DecodedYamuxHeader::Window {
1025
                            syn: true,
1026
0
                            fin,
1027
0
                            rst: false,
1028
0
                            stream_id,
1029
0
                            length,
1030
                            ..
1031
                        } => {
1032
                            // The initiator should only allocate uneven substream IDs, and the
1033
                            // other side only even IDs. We don't know anymore whether we're
1034
                            // initiator at this point, but we can compare with the even-ness of
1035
                            // the IDs that we allocate locally.
1036
20
                            if (self.inner.next_outbound_substream.get() % 2)
1037
20
                                == (stream_id.get() % 2)
1038
                            {
1039
0
                                return Err(Error::InvalidInboundStreamId(stream_id));
1040
20
                            }
1041
20
1042
20
                            // Remote has sent a SYN flag. A new substream is to be opened.
1043
20
                            match self.inner.substreams.get(&stream_id) {
1044
20
                                None => {}
1045
                                Some(Substream {
1046
                                    state:
1047
                                        SubstreamState::Healthy {
1048
                                            local_write_close: SubstreamStateLocalWrite::FinQueued,
1049
                                            remote_write_closed: true,
1050
                                            ..
1051
                                        },
1052
                                    ..
1053
                                })
1054
                                | Some(Substream {
1055
                                    state: SubstreamState::Reset,
1056
                                    ..
1057
                                }) => {
1058
                                    // Because we don't immediately destroy substreams, the remote
1059
                                    // might decide to re-use a substream ID that is still
1060
                                    // allocated locally. If that happens, we block the reading.
1061
                                    // It will be unblocked when the API user destroys the old
1062
                                    // substream.
1063
0
                                    break;
1064
                                }
1065
                                Some(Substream {
1066
                                    state: SubstreamState::Healthy { .. },
1067
                                    ..
1068
                                }) => {
1069
0
                                    return Err(Error::UnexpectedSyn(stream_id));
1070
                                }
1071
                            }
1072
1073
                            // When receiving a new substream, we might have to potentially queue
1074
                            // a substream rejection message later.
1075
                            // In order to ensure that there is enough space in `rsts_to_send`,
1076
                            // we check it against the limit now.
1077
20
                            if self.inner.rsts_to_send.len()
1078
20
                                >= self.inner.max_simultaneous_rst_substreams.get()
1079
                            {
1080
0
                                return Err(Error::MaxSimultaneousRstSubstreamsExceeded);
1081
20
                            }
1082
1083
20
                            let is_data =
1084
20
                                
matches!0
(decoded_header, header::DecodedYamuxHeader::Data { .. });
1085
1086
                            // If we have queued or sent a GoAway frame, then the substream is
1087
                            // ignored. The remote understands when receiving the GoAway that the
1088
                            // substream has been rejected.
1089
20
                            if !
matches!0
(self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) {
1090
0
                                self.inner.incoming = if !is_data {
1091
0
                                    Incoming::Header
1092
                                } else {
1093
0
                                    Incoming::DataFrame {
1094
0
                                        substream_id: stream_id,
1095
0
                                        remaining_bytes: length,
1096
0
                                    }
1097
                                };
1098
1099
0
                                continue;
1100
20
                            }
1101
20
1102
20
                            if is_data && u64::from(length) > NEW_SUBSTREAMS_FRAME_SIZE {
1103
0
                                return Err(Error::CreditsExceeded);
1104
20
                            }
1105
20
1106
20
                            self.inner.incoming = Incoming::PendingIncomingSubstream {
1107
20
                                substream_id: stream_id,
1108
20
                                extra_window: if !is_data { 
length0
} else { 0 },
1109
20
                                data_frame_size: if is_data { length } else { 
00
},
1110
20
                                fin,
1111
20
                            };
1112
20
1113
20
                            return Ok(ReadWriteOutcome::IncomingSubstream { yamux: self });
1114
                        }
1115
1116
                        header::DecodedYamuxHeader::Data {
1117
                            syn: false,
1118
                            rst: false,
1119
47
                            stream_id,
1120
47
                            length,
1121
47
                            ack,
1122
47
                            fin,
1123
                            ..
1124
                        } => {
1125
                            // Note that it is possible that the remote is referring to a substream
1126
                            // for which a RST has been sent out by the local node. Since the
1127
                            // local state machine doesn't keep track of RST'ted substreams, any
1128
                            // frame concerning a substream that has been RST or with an unknown
1129
                            // id is discarded and doesn't result in an error, under the
1130
                            // presumption that we are in this situation.
1131
                            if let Some(Substream {
1132
                                state:
1133
                                    SubstreamState::Healthy {
1134
47
                                        remote_write_closed,
1135
47
                                        remote_allowed_window,
1136
47
                                        remote_syn_acked,
1137
47
                                        substreams_wake_up_key,
1138
                                        ..
1139
                                    },
1140
                                ..
1141
47
                            }) = self.inner.substreams.get_mut(&stream_id)
1142
                            {
1143
47
                                match (ack, remote_syn_acked) {
1144
28
                                    (false, true) => {}
1145
19
                                    (true, acked @ false) => *acked = true,
1146
0
                                    (true, true) => return Err(Error::UnexpectedAck),
1147
0
                                    (false, false) => return Err(Error::ExpectedAck),
1148
                                }
1149
1150
47
                                if *remote_write_closed {
1151
0
                                    return Err(Error::WriteAfterFin);
1152
47
                                }
1153
47
1154
47
                                if fin {
1155
5
                                    *remote_write_closed = true;
1156
5
1157
5
                                    // No need to send window frames anymore if the remote has
1158
5
                                    // sent a FIN.
1159
5
                                    self.inner.window_frames_to_send.remove(&stream_id);
1160
1161
                                    // Wake up the substream.
1162
0
                                    match substreams_wake_up_key {
1163
0
                                        Some(Some(when)) if *when <= outer_read_write.now => {}
1164
2
                                        Some(None) => {}
1165
0
                                        Some(Some(when)) => {
1166
0
                                            let _was_removed = self
1167
0
                                                .inner
1168
0
                                                .substreams_wake_up
1169
0
                                                .remove(&(Some(when.clone()), stream_id));
1170
0
                                            debug_assert!(_was_removed);
1171
0
                                            self.inner.substreams_wake_up.insert((None, stream_id));
1172
0
                                            *substreams_wake_up_key = Some(None);
1173
                                        }
1174
3
                                        _ => {
1175
3
                                            self.inner.substreams_wake_up.insert((None, stream_id));
1176
3
                                            *substreams_wake_up_key = Some(None);
1177
3
                                        }
1178
                                    }
1179
42
                                }
1180
1181
                                // Check whether the remote has the right to send that much data.
1182
                                // Note that the credits aren't checked in the case of an unknown
1183
                                // substream.
1184
47
                                *remote_allowed_window = remote_allowed_window
1185
47
                                    .checked_sub(u64::from(length))
1186
47
                                    .ok_or(Error::CreditsExceeded)
?0
;
1187
0
                            }
1188
1189
                            // Switch to the `DataFrame` state in order to process the frame, even
1190
                            // if the substream no longer exists, in order to not duplicate code.
1191
47
                            self.inner.incoming = Incoming::DataFrame {
1192
47
                                substream_id: stream_id,
1193
47
                                remaining_bytes: length,
1194
47
                            };
1195
                        }
1196
1197
                        header::DecodedYamuxHeader::Window {
1198
                            syn: false,
1199
                            rst: false,
1200
0
                            stream_id,
1201
0
                            length,
1202
0
                            ack,
1203
0
                            fin,
1204
                            ..
1205
                        } => {
1206
                            // Note that it is possible that the remote is referring to a substream
1207
                            // for which a RST has been sent out by the local node. Since the
1208
                            // local state machine doesn't keep track of RST'ted substreams, any
1209
                            // frame concerning a substream that has been RST or with an unknown
1210
                            // id is discarded and doesn't result in an error, under the
1211
                            // presumption that we are in this situation.
1212
                            let Some(Substream {
1213
                                state:
1214
                                    SubstreamState::Healthy {
1215
0
                                        remote_syn_acked,
1216
0
                                        allowed_window,
1217
0
                                        remote_write_closed,
1218
                                        ..
1219
                                    },
1220
                                ..
1221
0
                            }) = self.inner.substreams.get_mut(&stream_id)
1222
                            else {
1223
0
                                self.inner.incoming = Incoming::Header;
1224
0
                                continue;
1225
                            };
1226
1227
0
                            match (ack, remote_syn_acked) {
1228
0
                                (false, true) => {}
1229
0
                                (true, acked @ false) => *acked = true,
1230
0
                                (true, true) => return Err(Error::UnexpectedAck),
1231
0
                                (false, false) => return Err(Error::ExpectedAck),
1232
                            }
1233
1234
                            // Note that the spec is unclear about whether the remote can or
1235
                            // should continue sending FIN flags on window size frames after
1236
                            // their side of the substream has already been closed before.
1237
0
                            if fin {
1238
0
                                *remote_write_closed = true;
1239
0
                            }
1240
1241
                            // If a substream was processed with a non-zero queuable bytes
1242
                            // value (i.e. `allowed_window` non-zero) but yields empty write
1243
                            // buffers, then we can assume that the substream has nothing to write,
1244
                            // and thus that simply increasing the queueable bytes will not change
1245
                            // the situation. After all, we have no idea whether the remote will
1246
                            // increase the window size of this substream any further in the
1247
                            // future, and thus we must write data out no matter what the window
1248
                            // size is as long as it is non-zero.
1249
                            // On the other hand, if the queueable bytes were 0, we take the guess
1250
                            // that the substream might have more to write.
1251
0
                            if length != 0 && *allowed_window == 0 {
1252
0
                                self.inner.substreams_write_ready.insert(stream_id);
1253
0
                            }
1254
1255
0
                            *allowed_window = allowed_window
1256
0
                                .checked_add(u64::from(length))
1257
0
                                .ok_or(Error::LocalCreditsOverflow)?;
1258
1259
0
                            self.inner.incoming = Incoming::Header;
1260
                        }
1261
                    }
1262
                }
1263
            }
1264
        }
1265
1266
        // Choose which substream to read/write (if any).
1267
329
        let substream_id = match (
1268
369
            &self.inner.outgoing,
1269
369
            self.inner.substreams_write_ready.iter().next().copied(),
1270
369
            self.inner.substreams_wake_up.first(),
1271
        ) {
1272
            (
1273
                Outgoing::PreparingDataFrame {
1274
66
                    substream_id,
1275
66
                    write_buffers,
1276
66
                },
1277
66
                _,
1278
66
                _,
1279
66
            ) => {
1280
66
                // Continue writing to the substream whose frame we're already preparing.
1281
66
                debug_assert!(!write_buffers.is_empty());
1282
66
                self.inner.substreams_write_ready.remove(substream_id);
1283
66
                *substream_id
1284
            }
1285
92
            (Outgoing::WritingOut { buffers }, Some(substream_id), _) if buffers.is_empty() => {
1286
92
                // Pull a substream from `substreams_write_ready`.
1287
92
                self.inner.substreams_write_ready.remove(&substream_id);
1288
92
                substream_id
1289
            }
1290
195
            (_, _, Some((when, substream_id)))
1291
195
                if when
1292
195
                    .as_ref()
1293
195
                    .map_or(true, |when| *when <= outer_read_write.now) =>
1294
171
            {
1295
171
                *substream_id
1296
            }
1297
            _ => {
1298
                // No substream to read/write.
1299
40
                return Ok(ReadWriteOutcome::Idle { yamux: self });
1300
            }
1301
        };
1302
1303
        // Extract some fields from the substream state.
1304
        let SubstreamState::Healthy {
1305
329
            allowed_window,
1306
329
            local_write_close,
1307
329
            remote_write_closed,
1308
329
            read_buffer,
1309
329
            substreams_wake_up_key,
1310
            ..
1311
329
        } = &mut self.inner.substreams.get_mut(&substream_id).unwrap().state
1312
        else {
1313
0
            unreachable!()
1314
        };
1315
1316
        // Remove the substream from `substreams_wake_up`, since we're processing it now.
1317
        // If the processing produces a `wake_up_after` value when being processed, it will be
1318
        // re-inserted.
1319
329
        if let Some(
substreams_wake_up_key273
) = substreams_wake_up_key.take() {
1320
273
            let _was_removed = self
1321
273
                .inner
1322
273
                .substreams_wake_up
1323
273
                .remove(&(substreams_wake_up_key, substream_id));
1324
273
            debug_assert!(_was_removed);
1325
56
        }
1326
1327
329
        let (write_buffers, can_queue_data) = match 
&mut self.inner.outgoing66
{
1328
263
            Outgoing::WritingOut { buffers } if buffers.is_empty() => {
1329
263
                let mut buffers = mem::take(buffers);
1330
263
                // As a small optimization, we push an empty buffer at the front where the header
1331
263
                // might later get written.
1332
263
                buffers.push(Vec::with_capacity(12));
1333
263
                (buffers, true)
1334
            }
1335
            Outgoing::PreparingDataFrame {
1336
66
                substream_id: s,
1337
66
                write_buffers,
1338
66
            } if *s == substream_id => {
1339
66
                let buffers = mem::take(write_buffers);
1340
66
                self.inner.outgoing = Outgoing::WritingOut {
1341
66
                    buffers: Vec::new(),
1342
66
                };
1343
66
                (buffers, true)
1344
            }
1345
0
            _ => (Vec::new(), false),
1346
        };
1347
329
        let write_buffers_len_before = write_buffers.iter().fold(0, |count, buf| count + buf.len());
1348
329
1349
329
        Ok(ReadWriteOutcome::ProcessSubstream {
1350
329
            substream_read_write: SubstreamReadWrite {
1351
329
                substream_id,
1352
329
                write_buffers_len_before,
1353
329
                inner_read_write: ReadWrite {
1354
329
                    now: outer_read_write.now.clone(),
1355
329
                    incoming_buffer: mem::take(read_buffer),
1356
329
                    expected_incoming_bytes: if !*remote_write_closed { 
Some(0)320
} else {
None9
},
1357
                    read_bytes: 0,
1358
329
                    write_buffers,
1359
329
                    write_bytes_queued: write_buffers_len_before,
1360
10
                    write_bytes_queueable: if matches!(
1361
329
                        local_write_close,
1362
                        SubstreamStateLocalWrite::Open
1363
                    ) {
1364
319
                        if can_queue_data {
1365
319
                            Some(
1366
319
                                usize::try_from(cmp::min(
1367
319
                                    u64::from(self.inner.max_out_data_frame_size.get()),
1368
319
                                    *allowed_window,
1369
319
                                ))
1370
319
                                .unwrap_or(usize::MAX)
1371
319
                                .saturating_sub(write_buffers_len_before),
1372
319
                            )
1373
                        } else {
1374
0
                            Some(0)
1375
                        }
1376
                    } else {
1377
10
                        None
1378
                    },
1379
329
                    wake_up_after: None,
1380
329
                },
1381
329
                outer_read_write,
1382
329
                yamux: self,
1383
            },
1384
        })
1385
389
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10read_writeBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node
1386
1387
    /// Adds `bytes` to the number of bytes the remote is allowed to send at once in the next
1388
    /// packet.
1389
    ///
1390
    /// > **Note**: The [`SubstreamReadWrite`] object ensures that the remote window is big enough
1391
    /// >           to receive the amount of bytes requested by the substream. Calling
1392
    /// >           [`Yamux::add_remote_window_saturating`] is therefore not mandatory, but only an
1393
    /// >           optimization to reduce the number of networking round trips.
1394
    ///
1395
    /// The counter saturates if its maximum is reached. This could cause stalls if the
1396
    /// remote sends a ton of data. However, given that the number of bytes is stored in a `u64`,
1397
    /// the remote would have to send at least  `2^64` bytes in order to reach this situation,
1398
    /// making it basically impossible.
1399
    ///
1400
    /// It is, furthermore, a bad idea to increase this counter by an immense number ahead of
1401
    /// time, as the remote can shut down the connection if its own counter overflows. The way
1402
    /// this counter is supposed to be used in a "streaming" way.
1403
    ///
1404
    /// Increasing the window of a substream by `bytes` requires sending out `12 * bytes / 2^32`
1405
    /// bytes. For example, increasing the window by `2^64` adds an overhead of 48 GiB. Please
1406
    /// don't do that.
1407
    ///
1408
    /// > **Note**: When a substream has just been opened or accepted, it starts with an initial
1409
    /// >           window of [`NEW_SUBSTREAMS_FRAME_SIZE`].
1410
    ///
1411
    /// > **Note**: It is only possible to add more bytes to the window and not set or reduce this
1412
    /// >           number of bytes, and it is also not possible to obtain the number of bytes the
1413
    /// >           remote is allowed. That's because it would be ambiguous whether bytes possibly
1414
    /// >           in the send or receive queue should be counted or not.
1415
    ///
1416
    /// Has no effect if the remote has already closed their writing side.
1417
    ///
1418
    /// # Panic
1419
    ///
1420
    /// Panics if the [`SubstreamId`] is invalid.
1421
    ///
1422
3
    pub fn add_remote_window_saturating(&mut self, substream_id: SubstreamId, bytes: u64) {
1423
3
        if let SubstreamState::Healthy {
1424
3
            remote_write_closed: false,
1425
3
            ..
1426
3
        } = &mut self
1427
3
            .inner
1428
3
            .substreams
1429
3
            .get_mut(&substream_id.0)
1430
3
            .unwrap_or_else(|| 
panic!()0
)
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE28add_remote_window_saturating0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturating0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE28add_remote_window_saturating0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturating0CsiUjFBJteJ7x_17smoldot_full_node
1431
3
            .state
1432
0
        {
1433
3
            let Some(bytes) = NonZeroU64::new(bytes) else {
1434
3
                return;
1435
            };
1436
1437
0
            self.inner
1438
0
                .window_frames_to_send
1439
0
                .entry(substream_id.0)
1440
0
                .and_modify(|window| *window = window.saturating_add(bytes.get()))
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE28add_remote_window_saturatings_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturatings_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE28add_remote_window_saturatings_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturatings_0CsiUjFBJteJ7x_17smoldot_full_node
1441
0
                .or_insert(bytes);
1442
0
        }
1443
3
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE28add_remote_window_saturatingBa_
Line
Count
Source
1422
3
    pub fn add_remote_window_saturating(&mut self, substream_id: SubstreamId, bytes: u64) {
1423
3
        if let SubstreamState::Healthy {
1424
3
            remote_write_closed: false,
1425
3
            ..
1426
3
        } = &mut self
1427
3
            .inner
1428
3
            .substreams
1429
3
            .get_mut(&substream_id.0)
1430
3
            .unwrap_or_else(|| panic!())
1431
3
            .state
1432
0
        {
1433
3
            let Some(bytes) = NonZeroU64::new(bytes) else {
1434
3
                return;
1435
            };
1436
1437
0
            self.inner
1438
0
                .window_frames_to_send
1439
0
                .entry(substream_id.0)
1440
0
                .and_modify(|window| *window = window.saturating_add(bytes.get()))
1441
0
                .or_insert(bytes);
1442
0
        }
1443
3
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE28add_remote_window_saturatingCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE28add_remote_window_saturatingBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE28add_remote_window_saturatingCsiUjFBJteJ7x_17smoldot_full_node
1444
1445
    /// Abruptly shuts down the substream. Sends a frame with the `RST` flag to the remote.
1446
    ///
1447
    /// Use this method when a protocol error happens on a substream.
1448
    ///
1449
    /// Returns an error if [`Yamux::reset`] has already been called on this substream, or if the
1450
    /// remote has reset the substream in the past, or if the substream was closed.
1451
    ///
1452
    /// # Panic
1453
    ///
1454
    /// Panics if the [`SubstreamId`] is invalid.
1455
    ///
1456
5
    pub fn reset(&mut self, substream_id: SubstreamId) -> Result<(), ResetError> {
1457
        // Add an entry to the list of RST headers to send to the remote.
1458
        let SubstreamState::Healthy {
1459
5
            substreams_wake_up_key,
1460
            ..
1461
5
        } = &mut self
1462
5
            .inner
1463
5
            .substreams
1464
5
            .get_mut(&substream_id.0)
1465
5
            .unwrap_or_else(|| 
panic!()0
)
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE5reset0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE5reset0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE5reset0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE5reset0CsiUjFBJteJ7x_17smoldot_full_node
1466
5
            .state
1467
        else {
1468
0
            return Err(ResetError::AlreadyReset);
1469
        };
1470
1471
5
        if !self.inner.dead_substreams.insert(substream_id.0) {
1472
1
            return Err(ResetError::AlreadyClosed);
1473
4
        }
1474
4
1475
4
        self.inner.window_frames_to_send.remove(&substream_id.0);
1476
4
        self.inner.substreams_write_ready.remove(&substream_id.0);
1477
1478
4
        if let Some(
key2
) = substreams_wake_up_key.take() {
1479
2
            let _was_removed = self.inner.substreams_wake_up.remove(&(key, substream_id.0));
1480
2
            debug_assert!(_was_removed);
1481
2
        }
1482
1483
        if let Outgoing::PreparingDataFrame {
1484
0
            substream_id: id,
1485
0
            write_buffers,
1486
4
        } = &mut self.inner.outgoing
1487
        {
1488
0
            if *id == substream_id.0 {
1489
0
                write_buffers.clear();
1490
0
                self.inner.outgoing = Outgoing::WritingOut {
1491
0
                    buffers: mem::take(write_buffers),
1492
0
                }
1493
0
            }
1494
4
        }
1495
1496
        // Note that we intentionally don't check the size against
1497
        // `max_simultaneous_rst_substreams`, as locally-emitted RST frames aren't the
1498
        // remote's fault.
1499
4
        self.inner.rsts_to_send.push_back(substream_id.0);
1500
4
1501
4
        self.inner
1502
4
            .substreams
1503
4
            .get_mut(&substream_id.0)
1504
4
            .unwrap_or_else(|| 
panic!()0
)
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE5resets_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE5resets_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE5resets_0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE5resets_0CsiUjFBJteJ7x_17smoldot_full_node
1505
4
            .state = SubstreamState::Reset;
1506
4
1507
4
        Ok(())
1508
5
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE5resetBa_
Line
Count
Source
1456
5
    pub fn reset(&mut self, substream_id: SubstreamId) -> Result<(), ResetError> {
1457
        // Add an entry to the list of RST headers to send to the remote.
1458
        let SubstreamState::Healthy {
1459
5
            substreams_wake_up_key,
1460
            ..
1461
5
        } = &mut self
1462
5
            .inner
1463
5
            .substreams
1464
5
            .get_mut(&substream_id.0)
1465
5
            .unwrap_or_else(|| panic!())
1466
5
            .state
1467
        else {
1468
0
            return Err(ResetError::AlreadyReset);
1469
        };
1470
1471
5
        if !self.inner.dead_substreams.insert(substream_id.0) {
1472
1
            return Err(ResetError::AlreadyClosed);
1473
4
        }
1474
4
1475
4
        self.inner.window_frames_to_send.remove(&substream_id.0);
1476
4
        self.inner.substreams_write_ready.remove(&substream_id.0);
1477
1478
4
        if let Some(
key2
) = substreams_wake_up_key.take() {
1479
2
            let _was_removed = self.inner.substreams_wake_up.remove(&(key, substream_id.0));
1480
2
            debug_assert!(_was_removed);
1481
2
        }
1482
1483
        if let Outgoing::PreparingDataFrame {
1484
0
            substream_id: id,
1485
0
            write_buffers,
1486
4
        } = &mut self.inner.outgoing
1487
        {
1488
0
            if *id == substream_id.0 {
1489
0
                write_buffers.clear();
1490
0
                self.inner.outgoing = Outgoing::WritingOut {
1491
0
                    buffers: mem::take(write_buffers),
1492
0
                }
1493
0
            }
1494
4
        }
1495
1496
        // Note that we intentionally don't check the size against
1497
        // `max_simultaneous_rst_substreams`, as locally-emitted RST frames aren't the
1498
        // remote's fault.
1499
4
        self.inner.rsts_to_send.push_back(substream_id.0);
1500
4
1501
4
        self.inner
1502
4
            .substreams
1503
4
            .get_mut(&substream_id.0)
1504
4
            .unwrap_or_else(|| panic!())
1505
4
            .state = SubstreamState::Reset;
1506
4
1507
4
        Ok(())
1508
5
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE5resetCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE5resetBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE5resetCsiUjFBJteJ7x_17smoldot_full_node
1509
1510
    /// Queues sending out a ping to the remote.
1511
    ///
1512
    /// # Panic
1513
    ///
1514
    /// Panics if there are already [`MAX_PINGS`] pings that have been queued and that the remote
1515
    /// hasn't answered yet. [`MAX_PINGS`] is pretty large, and unless there is a bug in the API
1516
    /// user's code causing pings to be allocated in a loop, this limit is not likely to ever be
1517
    /// reached.
1518
    ///
1519
0
    pub fn queue_ping(&mut self) {
1520
0
        // A maximum number of simultaneous pings (`MAX_PINGS`) is necessary because we don't
1521
0
        // support sending multiple identical ping opaque values. Since the ping opaque values
1522
0
        // are 32 bits, the actual maximum number of simultaneous pings is 2^32. But because we
1523
0
        // allocate ping values by looping until we find a not-yet-allocated value, the arbitrary
1524
0
        // self-enforced maximum needs to be way lower.
1525
0
        assert!(self.inner.pings_to_send + self.inner.pings_waiting_reply.len() < MAX_PINGS);
1526
0
        self.inner.pings_to_send += 1;
1527
0
    }
Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10queue_pingBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10queue_pingBa_
1528
1529
    /// Queues a `GoAway` frame, requesting the remote to no longer open any substream.
1530
    ///
1531
    /// If the state of [`Yamux`] is currently waiting for a confirmation to accept/reject a
1532
    /// substream, then this function automatically implies calling
1533
    /// [`Yamux::reject_pending_substream`].
1534
    ///
1535
    /// All follow-up requests for new substreams from the remote are automatically rejected.
1536
    /// [`ReadWriteOutcome::IncomingSubstream`] events can no longer happen.
1537
    ///
1538
0
    pub fn send_goaway(&mut self, code: GoAwayErrorCode) -> Result<(), SendGoAwayError> {
1539
0
        match self.inner.outgoing_goaway {
1540
            OutgoingGoAway::NotRequired => {
1541
0
                self.inner.outgoing_goaway = OutgoingGoAway::Required(code)
1542
            }
1543
0
            _ => return Err(SendGoAwayError::AlreadySent),
1544
        }
1545
1546
        // If the remote is currently opening a substream, ignore it. The remote understands when
1547
        // receiving the GoAway that the substream has been rejected.
1548
        if let Incoming::PendingIncomingSubstream {
1549
0
            substream_id,
1550
0
            data_frame_size,
1551
            ..
1552
0
        } = self.inner.incoming
1553
0
        {
1554
0
            self.inner.incoming = Incoming::DataFrame {
1555
0
                substream_id,
1556
0
                remaining_bytes: data_frame_size,
1557
0
            };
1558
0
        }
1559
1560
0
        Ok(())
1561
0
    }
Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11send_goawayBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11send_goawayBa_
1562
1563
    /// Returns the list of all substreams that have been closed or reset.
1564
    ///
1565
    /// This function does not remove dead substreams from the state machine. In other words, if
1566
    /// this function is called multiple times in a row, it will always return the same
1567
    /// substreams. Use [`Yamux::remove_dead_substream`] to remove substreams.
1568
295
    pub fn dead_substreams(
1569
295
        &'_ self,
1570
295
    ) -> impl Iterator<Item = (SubstreamId, DeadSubstreamTy, &'_ TSub)> + '_ {
1571
295
        self.inner.dead_substreams.iter().map(|id| {
1572
2
            let substream = self.inner.substreams.get(id).unwrap();
1573
2
            match &substream.state {
1574
2
                SubstreamState::Reset => (
1575
2
                    SubstreamId(*id),
1576
2
                    DeadSubstreamTy::Reset,
1577
2
                    &substream.user_data,
1578
2
                ),
1579
                SubstreamState::Healthy {
1580
0
                    local_write_close,
1581
0
                    remote_write_closed,
1582
0
                    ..
1583
0
                } => {
1584
0
                    debug_assert!(
1585
0
                        matches!(local_write_close, SubstreamStateLocalWrite::FinQueued)
1586
0
                            && *remote_write_closed
1587
                    );
1588
1589
0
                    (
1590
0
                        SubstreamId(*id),
1591
0
                        DeadSubstreamTy::ClosedGracefully,
1592
0
                        &substream.user_data,
1593
0
                    )
1594
                }
1595
            }
1596
295
        
}2
)
_RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE15dead_substreams0Bc_
Line
Count
Source
1571
2
        self.inner.dead_substreams.iter().map(|id| {
1572
2
            let substream = self.inner.substreams.get(id).unwrap();
1573
2
            match &substream.state {
1574
2
                SubstreamState::Reset => (
1575
2
                    SubstreamId(*id),
1576
2
                    DeadSubstreamTy::Reset,
1577
2
                    &substream.user_data,
1578
2
                ),
1579
                SubstreamState::Healthy {
1580
0
                    local_write_close,
1581
0
                    remote_write_closed,
1582
0
                    ..
1583
0
                } => {
1584
0
                    debug_assert!(
1585
0
                        matches!(local_write_close, SubstreamStateLocalWrite::FinQueued)
1586
0
                            && *remote_write_closed
1587
                    );
1588
1589
0
                    (
1590
0
                        SubstreamId(*id),
1591
0
                        DeadSubstreamTy::ClosedGracefully,
1592
0
                        &substream.user_data,
1593
0
                    )
1594
                }
1595
            }
1596
2
        })
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE15dead_substreams0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE15dead_substreams0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE15dead_substreams0CsiUjFBJteJ7x_17smoldot_full_node
1597
295
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE15dead_substreamsBa_
Line
Count
Source
1568
295
    pub fn dead_substreams(
1569
295
        &'_ self,
1570
295
    ) -> impl Iterator<Item = (SubstreamId, DeadSubstreamTy, &'_ TSub)> + '_ {
1571
295
        self.inner.dead_substreams.iter().map(|id| {
1572
            let substream = self.inner.substreams.get(id).unwrap();
1573
            match &substream.state {
1574
                SubstreamState::Reset => (
1575
                    SubstreamId(*id),
1576
                    DeadSubstreamTy::Reset,
1577
                    &substream.user_data,
1578
                ),
1579
                SubstreamState::Healthy {
1580
                    local_write_close,
1581
                    remote_write_closed,
1582
                    ..
1583
                } => {
1584
                    debug_assert!(
1585
                        matches!(local_write_close, SubstreamStateLocalWrite::FinQueued)
1586
                            && *remote_write_closed
1587
                    );
1588
1589
                    (
1590
                        SubstreamId(*id),
1591
                        DeadSubstreamTy::ClosedGracefully,
1592
                        &substream.user_data,
1593
                    )
1594
                }
1595
            }
1596
295
        })
1597
295
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE15dead_substreamsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE15dead_substreamsBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE15dead_substreamsCsiUjFBJteJ7x_17smoldot_full_node
1598
1599
    /// Removes a dead substream from the state machine.
1600
    ///
1601
    /// # Panic
1602
    ///
1603
    /// Panics if the substream with that id doesn't exist or isn't dead.
1604
    ///
1605
2
    pub fn remove_dead_substream(&mut self, id: SubstreamId) -> TSub {
1606
2
        let was_in = self.inner.dead_substreams.remove(&id.0);
1607
2
        if !was_in {
1608
0
            panic!()
1609
2
        }
1610
2
1611
2
        debug_assert!(!self
1612
2
            .inner
1613
2
            .substreams_wake_up
1614
2
            .iter()
1615
2
            .any(|(_, s)| 
s == &id.00
));
Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE21remove_dead_substream0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE21remove_dead_substream0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE21remove_dead_substream0Bc_
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE21remove_dead_substream0CsiUjFBJteJ7x_17smoldot_full_node
1616
2
        debug_assert!(!self.inner.window_frames_to_send.contains_key(&id.0));
1617
2
        debug_assert!(!self.inner.substreams_write_ready.contains(&id.0));
1618
1619
2
        let substream = self.inner.substreams.remove(&id.0).unwrap();
1620
2
1621
2
        if substream.inbound {
1622
2
            self.inner.num_inbound -= 1;
1623
2
        }
0
1624
1625
2
        substream.user_data
1626
2
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE21remove_dead_substreamBa_
Line
Count
Source
1605
2
    pub fn remove_dead_substream(&mut self, id: SubstreamId) -> TSub {
1606
2
        let was_in = self.inner.dead_substreams.remove(&id.0);
1607
2
        if !was_in {
1608
0
            panic!()
1609
2
        }
1610
2
1611
2
        debug_assert!(!self
1612
2
            .inner
1613
2
            .substreams_wake_up
1614
2
            .iter()
1615
2
            .any(|(_, s)| s == &id.0));
1616
2
        debug_assert!(!self.inner.window_frames_to_send.contains_key(&id.0));
1617
2
        debug_assert!(!self.inner.substreams_write_ready.contains(&id.0));
1618
1619
2
        let substream = self.inner.substreams.remove(&id.0).unwrap();
1620
2
1621
2
        if substream.inbound {
1622
2
            self.inner.num_inbound -= 1;
1623
2
        }
0
1624
1625
2
        substream.user_data
1626
2
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE21remove_dead_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE21remove_dead_substreamBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE21remove_dead_substreamCsiUjFBJteJ7x_17smoldot_full_node
1627
1628
    /// Accepts an incoming substream.
1629
    ///
1630
    /// Either [`Yamux::accept_pending_substream`] or [`Yamux::reject_pending_substream`] must be
1631
    /// called after [`ReadWriteOutcome::IncomingSubstream`] is returned.
1632
    ///
1633
    /// Note that there is no expiration window after [`ReadWriteOutcome::IncomingSubstream`]
1634
    /// is returned until the substream is no longer valid. However, reading will be blocked until
1635
    /// the substream is either accepted or rejected. This function should thus be called as
1636
    /// soon as possible.
1637
    ///
1638
    /// Returns an error if no incoming substream is currently pending.
1639
    ///
1640
20
    pub fn accept_pending_substream(
1641
20
        &mut self,
1642
20
        user_data: TSub,
1643
20
    ) -> Result<SubstreamId, PendingSubstreamError> {
1644
        let Incoming::PendingIncomingSubstream {
1645
20
            substream_id,
1646
20
            extra_window,
1647
20
            data_frame_size,
1648
20
            fin,
1649
20
        } = self.inner.incoming
1650
        else {
1651
0
            return Err(PendingSubstreamError::NoPendingSubstream);
1652
        };
1653
1654
20
        debug_assert!(u64::from(data_frame_size) <= NEW_SUBSTREAMS_FRAME_SIZE);
1655
1656
20
        let _was_before = self.inner.substreams.insert(
1657
20
            substream_id,
1658
20
            Substream {
1659
20
                state: SubstreamState::Healthy {
1660
20
                    first_message_queued: false,
1661
20
                    remote_syn_acked: true,
1662
20
                    remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE - u64::from(data_frame_size),
1663
20
                    allowed_window: NEW_SUBSTREAMS_FRAME_SIZE + u64::from(extra_window),
1664
20
                    local_write_close: SubstreamStateLocalWrite::Open,
1665
20
                    remote_write_closed: fin,
1666
20
                    expected_incoming_bytes: None,
1667
20
                    read_buffer: Vec::new(), // TODO: capacity?
1668
20
                    // There is no need to insert the substream in `substreams_wake_up`, as we
1669
20
                    // switch `self.inner.incoming` to a data frame below, which will cause the
1670
20
                    // substream to be processed.
1671
20
                    substreams_wake_up_key: None,
1672
20
                },
1673
20
                inbound: true,
1674
20
                user_data,
1675
20
            },
1676
20
        );
1677
20
        debug_assert!(_was_before.is_none());
1678
1679
20
        self.inner.num_inbound += 1;
1680
20
1681
20
        self.inner.incoming = Incoming::DataFrame {
1682
20
            substream_id,
1683
20
            remaining_bytes: data_frame_size,
1684
20
        };
1685
20
1686
20
        Ok(SubstreamId(substream_id))
1687
20
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE24accept_pending_substreamBa_
Line
Count
Source
1640
20
    pub fn accept_pending_substream(
1641
20
        &mut self,
1642
20
        user_data: TSub,
1643
20
    ) -> Result<SubstreamId, PendingSubstreamError> {
1644
        let Incoming::PendingIncomingSubstream {
1645
20
            substream_id,
1646
20
            extra_window,
1647
20
            data_frame_size,
1648
20
            fin,
1649
20
        } = self.inner.incoming
1650
        else {
1651
0
            return Err(PendingSubstreamError::NoPendingSubstream);
1652
        };
1653
1654
20
        debug_assert!(u64::from(data_frame_size) <= NEW_SUBSTREAMS_FRAME_SIZE);
1655
1656
20
        let _was_before = self.inner.substreams.insert(
1657
20
            substream_id,
1658
20
            Substream {
1659
20
                state: SubstreamState::Healthy {
1660
20
                    first_message_queued: false,
1661
20
                    remote_syn_acked: true,
1662
20
                    remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE - u64::from(data_frame_size),
1663
20
                    allowed_window: NEW_SUBSTREAMS_FRAME_SIZE + u64::from(extra_window),
1664
20
                    local_write_close: SubstreamStateLocalWrite::Open,
1665
20
                    remote_write_closed: fin,
1666
20
                    expected_incoming_bytes: None,
1667
20
                    read_buffer: Vec::new(), // TODO: capacity?
1668
20
                    // There is no need to insert the substream in `substreams_wake_up`, as we
1669
20
                    // switch `self.inner.incoming` to a data frame below, which will cause the
1670
20
                    // substream to be processed.
1671
20
                    substreams_wake_up_key: None,
1672
20
                },
1673
20
                inbound: true,
1674
20
                user_data,
1675
20
            },
1676
20
        );
1677
20
        debug_assert!(_was_before.is_none());
1678
1679
20
        self.inner.num_inbound += 1;
1680
20
1681
20
        self.inner.incoming = Incoming::DataFrame {
1682
20
            substream_id,
1683
20
            remaining_bytes: data_frame_size,
1684
20
        };
1685
20
1686
20
        Ok(SubstreamId(substream_id))
1687
20
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE24accept_pending_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE24accept_pending_substreamBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE24accept_pending_substreamCsiUjFBJteJ7x_17smoldot_full_node
1688
1689
    /// Rejects an incoming substream.
1690
    ///
1691
    /// Either [`Yamux::accept_pending_substream`] or [`Yamux::reject_pending_substream`] must be
1692
    /// called after [`ReadWriteOutcome::IncomingSubstream`] is returned.
1693
    ///
1694
    /// Note that there is no expiration window after [`ReadWriteOutcome::IncomingSubstream`]
1695
    /// is returned until the substream is no longer valid. However, reading will be blocked until
1696
    /// the substream is either accepted or rejected. This function should thus be called as
1697
    /// soon as possible.
1698
    ///
1699
    /// Returns an error if no incoming substream is currently pending.
1700
    ///
1701
0
    pub fn reject_pending_substream(&mut self) -> Result<(), PendingSubstreamError> {
1702
        let Incoming::PendingIncomingSubstream {
1703
0
            substream_id,
1704
0
            data_frame_size,
1705
            ..
1706
0
        } = self.inner.incoming
1707
        else {
1708
0
            return Err(PendingSubstreamError::NoPendingSubstream);
1709
        };
1710
1711
0
        self.inner.rsts_to_send.push_back(substream_id);
1712
0
        self.inner.incoming = Incoming::DataFrame {
1713
0
            substream_id,
1714
0
            remaining_bytes: data_frame_size,
1715
0
        };
1716
0
        Ok(())
1717
0
    }
Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE24reject_pending_substreamBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE24reject_pending_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE24reject_pending_substreamBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE24reject_pending_substreamCsiUjFBJteJ7x_17smoldot_full_node
1718
}
1719
1720
impl<TNow, TSub> ops::Index<SubstreamId> for Yamux<TNow, TSub> {
1721
    type Output = TSub;
1722
1723
0
    fn index(&self, substream_id: SubstreamId) -> &TSub {
1724
0
        &self
1725
0
            .inner
1726
0
            .substreams
1727
0
            .get(&substream_id.0)
1728
0
            .unwrap()
1729
0
            .user_data
1730
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs0_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_11SubstreamIdE5indexBb_
Unexecuted instantiation: _RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1K_IB1K_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1c_3ops5index5IndexNtB5_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs0_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_11SubstreamIdE5indexBb_
Unexecuted instantiation: _RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1I_IB1I_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1M_3ops5index5IndexNtB5_11SubstreamIdE5indexCsiUjFBJteJ7x_17smoldot_full_node
1731
}
1732
1733
impl<TNow, TSub> ops::IndexMut<SubstreamId> for Yamux<TNow, TSub> {
1734
28
    fn index_mut(&mut self, substream_id: SubstreamId) -> &mut TSub {
1735
28
        &mut self
1736
28
            .inner
1737
28
            .substreams
1738
28
            .get_mut(&substream_id.0)
1739
28
            .unwrap_or_else(|| 
panic!()0
)
Unexecuted instantiation: _RNCNvXs1_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB9_11established9substream9SubstreamB19_EIB1L_uEEEEINtNtNtB1d_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0Bd_
Unexecuted instantiation: _RNCNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1e_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1a_EIB1M_IB1M_NtNtBb_10collection11SubstreamIdEEEEEINtNtNtB1e_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs1_0ppEINtB7_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0Bd_
Unexecuted instantiation: _RNCNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1a_EIB1K_IB1K_NtNtBb_10collection11SubstreamIdEEEEEINtNtNtB1O_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0CsiUjFBJteJ7x_17smoldot_full_node
1740
28
            .user_data
1741
28
    }
_RNvXs1_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB7_11established9substream9SubstreamB17_EIB1J_uEEEEINtNtNtB1b_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutBb_
Line
Count
Source
1734
28
    fn index_mut(&mut self, substream_id: SubstreamId) -> &mut TSub {
1735
28
        &mut self
1736
28
            .inner
1737
28
            .substreams
1738
28
            .get_mut(&substream_id.0)
1739
28
            .unwrap_or_else(|| panic!())
1740
28
            .user_data
1741
28
    }
Unexecuted instantiation: _RNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1K_IB1K_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1c_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs1_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutBb_
Unexecuted instantiation: _RNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1I_IB1I_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1M_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutCsiUjFBJteJ7x_17smoldot_full_node
1742
}
1743
1744
impl<TNow, TSub> fmt::Debug for Yamux<TNow, TSub>
1745
where
1746
    TSub: fmt::Debug,
1747
{
1748
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1749
0
        struct List<'a, TNow, TSub>(&'a Yamux<TNow, TSub>);
1750
0
        impl<'a, TNow, TSub> fmt::Debug for List<'a, TNow, TSub>
1751
0
        where
1752
0
            TSub: fmt::Debug,
1753
0
        {
1754
0
            fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1755
0
                f.debug_list()
1756
0
                    .entries(self.0.inner.substreams.values().map(|v| &v.user_data))
Unexecuted instantiation: _RNCNvXININvXs2_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtBd_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB7_4ListppEB1i_3fmt0Bj_
Unexecuted instantiation: _RNCNvXININvXs2_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtBd_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB7_4ListppEB1j_3fmt0Bj_
1757
0
                    .finish()
1758
0
            }
Unexecuted instantiation: _RNvXININvXs2_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtBb_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB5_4ListppEB1g_3fmtBh_
Unexecuted instantiation: _RNvXININvXs2_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtBb_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB5_4ListppEB1h_3fmtBh_
1759
0
        }
1760
0
1761
0
        f.debug_struct("Yamux")
1762
0
            .field("substreams", &List(self))
1763
0
            .finish()
1764
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs2_0ppEINtB5_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs2_0ppEINtB5_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
1765
}
1766
1767
/// Identifier of a substream in the context of a connection.
1768
#[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, derive_more::From)]
1769
pub struct SubstreamId(NonZeroU32);
1770
1771
impl SubstreamId {
1772
    /// Value that compares inferior or equal to all possible values.
1773
    pub const MIN: Self = Self(match NonZeroU32::new(1) {
1774
        Some(v) => v,
1775
        None => unreachable!(),
1776
    });
1777
1778
    /// Value that compares superior or equal to all possible values.
1779
    pub const MAX: Self = Self(match NonZeroU32::new(u32::MAX) {
1780
        Some(v) => v,
1781
        None => unreachable!(),
1782
    });
1783
}
1784
1785
/// Details about the incoming data.
1786
#[must_use]
1787
#[derive(Debug)]
1788
pub enum ReadWriteOutcome<'a, TNow, TSub>
1789
where
1790
    TNow: Clone + cmp::Ord,
1791
{
1792
    /// Nothing in particular happened.
1793
    Idle {
1794
        /// The [`Yamux`] state machine yielded back.
1795
        yamux: Yamux<TNow, TSub>,
1796
    },
1797
1798
    /// Remote has requested to open a new substream.
1799
    ///
1800
    /// After this has been received, either [`Yamux::accept_pending_substream`] or
1801
    /// [`Yamux::reject_pending_substream`] needs to be called in order to accept or reject
1802
    /// this substream. [`Yamux::read_write`] will stop reading incoming data before this is done.
1803
    ///
1804
    /// Note that this can never happen after [`Yamux::send_goaway`] has been called, as all
1805
    /// substreams are then automatically rejected.
1806
    IncomingSubstream {
1807
        /// The [`Yamux`] state machine yielded back.
1808
        // TODO: use an accept/reject wrapper instead
1809
        yamux: Yamux<TNow, TSub>,
1810
    },
1811
1812
    /// Received data corresponding to a substream.
1813
    ProcessSubstream {
1814
        /// Object allowing reading and writing data from/to the given substream.
1815
        substream_read_write: SubstreamReadWrite<'a, TNow, TSub>,
1816
    },
1817
1818
    /// Remote has asked to reset a substream.
1819
    StreamReset {
1820
        /// The [`Yamux`] state machine yielded back.
1821
        yamux: Yamux<TNow, TSub>,
1822
        /// Substream that has been reset.
1823
        substream_id: SubstreamId,
1824
    },
1825
1826
    /// Received a "go away" request. This means that it is now forbidden to open new outbound
1827
    /// substreams. It is still allowed to send and receive data on existing substreams, and the
1828
    /// remote is still allowed to open substreams.
1829
    GoAway {
1830
        /// The [`Yamux`] state machine yielded back.
1831
        yamux: Yamux<TNow, TSub>,
1832
        /// Error code sent by the remote.
1833
        code: GoAwayErrorCode,
1834
        /// List of all outgoing substreams that haven't been acknowledged by the remote yet.
1835
        /// These substreams are considered as reset, similar to
1836
        /// [`ReadWriteOutcome::StreamReset`].
1837
        reset_substreams: Vec<SubstreamId>,
1838
    },
1839
1840
    /// Received a response to a ping that has been sent out earlier.
1841
    ///
1842
    /// If multiple pings have been sent out simultaneously, they are always answered in the same
1843
    /// order as they have been sent out.
1844
    PingResponse {
1845
        /// The [`Yamux`] state machine yielded back.
1846
        yamux: Yamux<TNow, TSub>,
1847
    },
1848
}
1849
1850
pub struct SubstreamReadWrite<'a, TNow, TSub>
1851
where
1852
    TNow: Clone + cmp::Ord,
1853
{
1854
    outer_read_write: &'a mut ReadWrite<TNow>,
1855
    inner_read_write: ReadWrite<TNow>,
1856
    yamux: Yamux<TNow, TSub>,
1857
    substream_id: NonZeroU32,
1858
1859
    /// Size of the write buffers of the substream prior to its processing.
1860
    write_buffers_len_before: usize,
1861
}
1862
1863
impl<'a, TNow, TSub> SubstreamReadWrite<'a, TNow, TSub>
1864
where
1865
    TNow: Clone + cmp::Ord,
1866
{
1867
    /// Returns the identifier of the substream being read/written.
1868
39
    pub fn substream_id(&self) -> SubstreamId {
1869
39
        SubstreamId(self.substream_id)
1870
39
    }
_RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE12substream_idBb_
Line
Count
Source
1868
39
    pub fn substream_id(&self) -> SubstreamId {
1869
39
        SubstreamId(self.substream_id)
1870
39
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE12substream_idCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE12substream_idBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE12substream_idCsiUjFBJteJ7x_17smoldot_full_node
1871
1872
329
    pub fn read_write(&mut self) -> &mut ReadWrite<TNow> {
1873
329
        &mut self.inner_read_write
1874
329
    }
_RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE10read_writeBb_
Line
Count
Source
1872
329
    pub fn read_write(&mut self) -> &mut ReadWrite<TNow> {
1873
329
        &mut self.inner_read_write
1874
329
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE10read_writeBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node
1875
1876
    /// Returns the user data associated to the substream being read/written.
1877
0
    pub fn user_data(&self) -> &TSub {
1878
0
        &self
1879
0
            .yamux
1880
0
            .inner
1881
0
            .substreams
1882
0
            .get(&self.substream_id)
1883
0
            .unwrap()
1884
0
            .user_data
1885
0
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE9user_dataBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE9user_dataBb_
1886
1887
    /// Returns the user data associated to the substream being read/written.
1888
653
    pub fn user_data_mut(&mut self) -> &mut TSub {
1889
653
        &mut self
1890
653
            .yamux
1891
653
            .inner
1892
653
            .substreams
1893
653
            .get_mut(&self.substream_id)
1894
653
            .unwrap()
1895
653
            .user_data
1896
653
    }
_RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE13user_data_mutBb_
Line
Count
Source
1888
653
    pub fn user_data_mut(&mut self) -> &mut TSub {
1889
653
        &mut self
1890
653
            .yamux
1891
653
            .inner
1892
653
            .substreams
1893
653
            .get_mut(&self.substream_id)
1894
653
            .unwrap()
1895
653
            .user_data
1896
653
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE13user_data_mutCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE13user_data_mutBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE13user_data_mutCsiUjFBJteJ7x_17smoldot_full_node
1897
1898
329
    pub fn finish(mut self) -> Yamux<TNow, TSub> {
1899
        let Substream {
1900
329
            inbound,
1901
329
            state:
1902
329
                SubstreamState::Healthy {
1903
329
                    first_message_queued,
1904
329
                    remote_allowed_window,
1905
329
                    local_write_close,
1906
329
                    remote_write_closed,
1907
329
                    read_buffer,
1908
329
                    expected_incoming_bytes,
1909
329
                    substreams_wake_up_key,
1910
                    ..
1911
                },
1912
            ..
1913
329
        } = &mut self
1914
329
            .yamux
1915
329
            .inner
1916
329
            .substreams
1917
329
            .get_mut(&self.substream_id)
1918
329
            .unwrap()
1919
        else {
1920
0
            unreachable!()
1921
        };
1922
1923
        // Update the reading part of the substream's internal state.
1924
329
        *read_buffer = mem::take(&mut self.inner_read_write.incoming_buffer);
1925
329
        *expected_incoming_bytes = Some(self.inner_read_write.expected_incoming_bytes.unwrap_or(0));
1926
1927
        // If the substream requests more data than the remote is allowed to send, send out a
1928
        // window frame. This ensures that the reading can never stall due to window frames issues.
1929
329
        if let Some(
mut missing_window_size0
) = NonZeroUsize::new(
1930
329
            expected_incoming_bytes
1931
329
                .unwrap()
1932
329
                .saturating_sub(usize::try_from(*remote_allowed_window).unwrap_or(usize::MAX)),
1933
329
        ) {
1934
            // Don't send super tiny window frames.
1935
0
            if missing_window_size.get() < 1024 {
1936
0
                missing_window_size = NonZeroUsize::new(1024).unwrap();
1937
0
            }
1938
1939
0
            let missing_window_size =
1940
0
                NonZeroU64::new(u64::try_from(missing_window_size.get()).unwrap_or(u64::MAX))
1941
0
                    .unwrap();
1942
0
1943
0
            self.yamux
1944
0
                .inner
1945
0
                .window_frames_to_send
1946
0
                .entry(self.substream_id)
1947
0
                .and_modify(|v| {
1948
0
                    if *v < missing_window_size {
1949
0
                        *v = missing_window_size;
1950
0
                    }
1951
0
                })
Unexecuted instantiation: _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finish0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finish0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finish0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finish0CsiUjFBJteJ7x_17smoldot_full_node
1952
0
                .or_insert(missing_window_size);
1953
0
1954
0
            self.outer_read_write.wake_up_asap();
1955
329
        }
1956
1957
        // When to wake up the substream for reading again.
1958
329
        debug_assert!(substreams_wake_up_key.is_none());
1959
329
        let will_wake_up_read_again = match (
1960
329
            self.inner_read_write.read_bytes,
1961
329
            &self.inner_read_write.wake_up_after,
1962
        ) {
1963
            (0, None) => {
1964
                // Don't wake it up for reading.
1965
122
                false
1966
            }
1967
39
            (0, Some(when)) if *when > self.outer_read_write.now => {
1968
37
                // Wake it up at `when`.
1969
37
                self.outer_read_write.wake_up_after(when);
1970
37
                self.yamux
1971
37
                    .inner
1972
37
                    .substreams_wake_up
1973
37
                    .insert((Some(when.clone()), self.substream_id));
1974
37
                *substreams_wake_up_key = Some(Some(when.clone()));
1975
37
                true
1976
            }
1977
            _ => {
1978
                // Non-zero bytes written or `when <= now`.
1979
                // Wake it up as soon as possible so it continues reading from its read buffer.
1980
170
                self.outer_read_write.wake_up_asap();
1981
170
                self.yamux
1982
170
                    .inner
1983
170
                    .substreams_wake_up
1984
170
                    .insert((None, self.substream_id));
1985
170
                *substreams_wake_up_key = Some(None);
1986
170
                true
1987
            }
1988
        };
1989
1990
        // Update the `local_write_close` state of the substream.
1991
329
        if 
matches!10
(*local_write_close, SubstreamStateLocalWrite::Open)
1992
319
            && self.inner_read_write.write_bytes_queueable.is_none()
1993
6
        {
1994
6
            *local_write_close = SubstreamStateLocalWrite::FinDesired;
1995
323
        }
1996
1997
        // Sanity check.
1998
329
        debug_assert!(
matches!0
(
1999
329
            self.yamux.inner.outgoing,
2000
            Outgoing::WritingOut { .. }
2001
        ));
2002
2003
        // Process the writing side of the substream.
2004
329
        if self.inner_read_write.write_bytes_queued != self.write_buffers_len_before
2005
67
            && matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty())
2006
67
            && self
2007
67
                .inner_read_write
2008
67
                .write_bytes_queueable
2009
67
                .map_or(false, |n| n != 0)
_RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs_0Bd_
Line
Count
Source
2009
67
                .map_or(false, |n| n != 0)
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs_0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs_0CsiUjFBJteJ7x_17smoldot_full_node
2010
67
        {
2011
67
            // Substream has written out data, but might have more to write. Put back the write
2012
67
            // buffers for next time.
2013
67
            // Note that there's no need to insert the substream in `substreams_write_read`, as
2014
67
            // as the `Outgoing::PreparingDataFrame` state guarantees that this substream will be
2015
67
            // processed again as soon as possible.
2016
67
            self.yamux.inner.outgoing = Outgoing::PreparingDataFrame {
2017
67
                substream_id: self.substream_id,
2018
67
                write_buffers: mem::take(&mut self.inner_read_write.write_buffers),
2019
67
            };
2020
67
            self.outer_read_write.wake_up_asap();
2021
262
        } else if self.inner_read_write.write_bytes_queued != 0
2022
196
            || 
matches!193
(*local_write_close, SubstreamStateLocalWrite::FinDesired)
2023
        {
2024
            // Substream hasn't written anything more. A data frame is ready. Flush its data.
2025
2026
            // The substream should only have been able to write data if we're not currently
2027
            // writing out. If this assertion fails, it indicates that the substream hasn't
2028
            // respected the `ReadWrite` contract.
2029
69
            debug_assert!(
2030
69
                matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty())
2031
            );
2032
2033
69
            let mut write_buffers = mem::take(&mut self.inner_read_write.write_buffers);
2034
69
2035
69
            // When preparing the inner `ReadWrite` object, the `write_buffers` are set to
2036
69
            // contain one empty entry of enough capacity to hold the header. There is a high
2037
69
            // chance that this empty entry is still there, but if it's not we add it now.
2038
69
            if write_buffers.first().map_or(true, |b| !b.is_empty()) {
_RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs0_0Bd_
Line
Count
Source
2038
69
            if write_buffers.first().map_or(true, |b| !b.is_empty()) {
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs0_0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs0_0CsiUjFBJteJ7x_17smoldot_full_node
2039
0
                write_buffers.insert(0, Vec::with_capacity(12));
2040
69
            }
2041
2042
69
            write_buffers[0].extend_from_slice(&header::encode(
2043
69
                &header::DecodedYamuxHeader::Data {
2044
69
                    syn: !*first_message_queued && 
!*inbound40
,
2045
69
                    ack: !*first_message_queued && 
*inbound40
,
2046
69
                    fin: 
matches!63
(*local_write_close, SubstreamStateLocalWrite::FinDesired),
2047
                    rst: false,
2048
69
                    stream_id: self.substream_id,
2049
69
                    length: {
2050
69
                        // Because the number of queuable bytes is capped by the value in
2051
69
                        // `Config::max_out_data_frame_size`, we are guaranteed that the length
2052
69
                        // to write out fits in a `u32`.
2053
69
                        debug_assert!(self.yamux.inner.max_out_data_frame_size.get() <= u32::MAX);
2054
69
                        u32::try_from(self.inner_read_write.write_bytes_queued).unwrap()
2055
                    },
2056
                },
2057
            ));
2058
69
            if 
matches!63
(*local_write_close, SubstreamStateLocalWrite::FinDesired) {
2059
6
                *local_write_close = SubstreamStateLocalWrite::FinQueued;
2060
63
            }
2061
69
            *first_message_queued = true;
2062
69
2063
69
            self.yamux.inner.outgoing = Outgoing::WritingOut {
2064
69
                buffers: write_buffers,
2065
69
            };
2066
69
2067
69
            self.outer_read_write.wake_up_asap();
2068
69
2069
69
            // Re-schedule the substream for writing, as it was maybe waiting for the queue to
2070
69
            // be flushed before writing more data.
2071
69
            self.yamux
2072
69
                .inner
2073
69
                .substreams_write_ready
2074
69
                .insert(self.substream_id);
2075
193
        } else if self.inner_read_write.write_bytes_queueable == Some(0) {
2076
0
            // Substream hasn't written anything because it wasn't able to write anything.
2077
0
            // Re-schedule the substream for when it is possible to write data out.
2078
0
            self.yamux
2079
0
                .inner
2080
0
                .substreams_write_ready
2081
0
                .insert(self.substream_id);
2082
193
        } else {
2083
193
            // Substream has nothing to write.
2084
193
        }
2085
2086
        // Mark the substream as dead if it won't ever wake up again.
2087
329
        if 
matches!313
(local_write_close, SubstreamStateLocalWrite::FinQueued)
2088
16
            && *remote_write_closed
2089
3
            && !will_wake_up_read_again
2090
2
            && !self
2091
2
                .yamux
2092
2
                .inner
2093
2
                .substreams_write_ready
2094
2
                .contains(&self.substream_id)
2095
0
            && !matches!(self.yamux.inner.outgoing,  Outgoing::PreparingDataFrame {
2096
0
                substream_id,
2097
                ..
2098
0
            } if substream_id == self.substream_id)
2099
        {
2100
1
            let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id);
2101
1
            debug_assert!(_was_inserted);
2102
1
            debug_assert!(!self
2103
1
                .yamux
2104
1
                .inner
2105
1
                .substreams_wake_up
2106
1
                .iter()
2107
1
                .any(|(_, s)| 
*s == self.substream_id0
));
Unexecuted instantiation: _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs1_0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs1_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs1_0Bd_
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs1_0CsiUjFBJteJ7x_17smoldot_full_node
2108
1
            debug_assert!(!self
2109
1
                .yamux
2110
1
                .inner
2111
1
                .substreams_write_ready
2112
1
                .contains(&self.substream_id));
2113
1
            debug_assert!(!self
2114
1
                .yamux
2115
1
                .inner
2116
1
                .window_frames_to_send
2117
1
                .contains_key(&self.substream_id));
2118
328
        }
2119
2120
329
        self.yamux
2121
329
    }
_RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE6finishBb_
Line
Count
Source
1898
329
    pub fn finish(mut self) -> Yamux<TNow, TSub> {
1899
        let Substream {
1900
329
            inbound,
1901
329
            state:
1902
329
                SubstreamState::Healthy {
1903
329
                    first_message_queued,
1904
329
                    remote_allowed_window,
1905
329
                    local_write_close,
1906
329
                    remote_write_closed,
1907
329
                    read_buffer,
1908
329
                    expected_incoming_bytes,
1909
329
                    substreams_wake_up_key,
1910
                    ..
1911
                },
1912
            ..
1913
329
        } = &mut self
1914
329
            .yamux
1915
329
            .inner
1916
329
            .substreams
1917
329
            .get_mut(&self.substream_id)
1918
329
            .unwrap()
1919
        else {
1920
0
            unreachable!()
1921
        };
1922
1923
        // Update the reading part of the substream's internal state.
1924
329
        *read_buffer = mem::take(&mut self.inner_read_write.incoming_buffer);
1925
329
        *expected_incoming_bytes = Some(self.inner_read_write.expected_incoming_bytes.unwrap_or(0));
1926
1927
        // If the substream requests more data than the remote is allowed to send, send out a
1928
        // window frame. This ensures that the reading can never stall due to window frames issues.
1929
329
        if let Some(
mut missing_window_size0
) = NonZeroUsize::new(
1930
329
            expected_incoming_bytes
1931
329
                .unwrap()
1932
329
                .saturating_sub(usize::try_from(*remote_allowed_window).unwrap_or(usize::MAX)),
1933
329
        ) {
1934
            // Don't send super tiny window frames.
1935
0
            if missing_window_size.get() < 1024 {
1936
0
                missing_window_size = NonZeroUsize::new(1024).unwrap();
1937
0
            }
1938
1939
0
            let missing_window_size =
1940
0
                NonZeroU64::new(u64::try_from(missing_window_size.get()).unwrap_or(u64::MAX))
1941
0
                    .unwrap();
1942
0
1943
0
            self.yamux
1944
0
                .inner
1945
0
                .window_frames_to_send
1946
0
                .entry(self.substream_id)
1947
0
                .and_modify(|v| {
1948
                    if *v < missing_window_size {
1949
                        *v = missing_window_size;
1950
                    }
1951
0
                })
1952
0
                .or_insert(missing_window_size);
1953
0
1954
0
            self.outer_read_write.wake_up_asap();
1955
329
        }
1956
1957
        // When to wake up the substream for reading again.
1958
329
        debug_assert!(substreams_wake_up_key.is_none());
1959
329
        let will_wake_up_read_again = match (
1960
329
            self.inner_read_write.read_bytes,
1961
329
            &self.inner_read_write.wake_up_after,
1962
        ) {
1963
            (0, None) => {
1964
                // Don't wake it up for reading.
1965
122
                false
1966
            }
1967
39
            (0, Some(when)) if *when > self.outer_read_write.now => {
1968
37
                // Wake it up at `when`.
1969
37
                self.outer_read_write.wake_up_after(when);
1970
37
                self.yamux
1971
37
                    .inner
1972
37
                    .substreams_wake_up
1973
37
                    .insert((Some(when.clone()), self.substream_id));
1974
37
                *substreams_wake_up_key = Some(Some(when.clone()));
1975
37
                true
1976
            }
1977
            _ => {
1978
                // Non-zero bytes written or `when <= now`.
1979
                // Wake it up as soon as possible so it continues reading from its read buffer.
1980
170
                self.outer_read_write.wake_up_asap();
1981
170
                self.yamux
1982
170
                    .inner
1983
170
                    .substreams_wake_up
1984
170
                    .insert((None, self.substream_id));
1985
170
                *substreams_wake_up_key = Some(None);
1986
170
                true
1987
            }
1988
        };
1989
1990
        // Update the `local_write_close` state of the substream.
1991
329
        if 
matches!10
(*local_write_close, SubstreamStateLocalWrite::Open)
1992
319
            && self.inner_read_write.write_bytes_queueable.is_none()
1993
6
        {
1994
6
            *local_write_close = SubstreamStateLocalWrite::FinDesired;
1995
323
        }
1996
1997
        // Sanity check.
1998
329
        debug_assert!(
matches!0
(
1999
329
            self.yamux.inner.outgoing,
2000
            Outgoing::WritingOut { .. }
2001
        ));
2002
2003
        // Process the writing side of the substream.
2004
329
        if self.inner_read_write.write_bytes_queued != self.write_buffers_len_before
2005
67
            && matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty())
2006
67
            && self
2007
67
                .inner_read_write
2008
67
                .write_bytes_queueable
2009
67
                .map_or(false, |n| n != 0)
2010
67
        {
2011
67
            // Substream has written out data, but might have more to write. Put back the write
2012
67
            // buffers for next time.
2013
67
            // Note that there's no need to insert the substream in `substreams_write_read`, as
2014
67
            // as the `Outgoing::PreparingDataFrame` state guarantees that this substream will be
2015
67
            // processed again as soon as possible.
2016
67
            self.yamux.inner.outgoing = Outgoing::PreparingDataFrame {
2017
67
                substream_id: self.substream_id,
2018
67
                write_buffers: mem::take(&mut self.inner_read_write.write_buffers),
2019
67
            };
2020
67
            self.outer_read_write.wake_up_asap();
2021
262
        } else if self.inner_read_write.write_bytes_queued != 0
2022
196
            || 
matches!193
(*local_write_close, SubstreamStateLocalWrite::FinDesired)
2023
        {
2024
            // Substream hasn't written anything more. A data frame is ready. Flush its data.
2025
2026
            // The substream should only have been able to write data if we're not currently
2027
            // writing out. If this assertion fails, it indicates that the substream hasn't
2028
            // respected the `ReadWrite` contract.
2029
69
            debug_assert!(
2030
69
                matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty())
2031
            );
2032
2033
69
            let mut write_buffers = mem::take(&mut self.inner_read_write.write_buffers);
2034
69
2035
69
            // When preparing the inner `ReadWrite` object, the `write_buffers` are set to
2036
69
            // contain one empty entry of enough capacity to hold the header. There is a high
2037
69
            // chance that this empty entry is still there, but if it's not we add it now.
2038
69
            if write_buffers.first().map_or(true, |b| !b.is_empty()) {
2039
0
                write_buffers.insert(0, Vec::with_capacity(12));
2040
69
            }
2041
2042
69
            write_buffers[0].extend_from_slice(&header::encode(
2043
69
                &header::DecodedYamuxHeader::Data {
2044
69
                    syn: !*first_message_queued && 
!*inbound40
,
2045
69
                    ack: !*first_message_queued && 
*inbound40
,
2046
69
                    fin: 
matches!63
(*local_write_close, SubstreamStateLocalWrite::FinDesired),
2047
                    rst: false,
2048
69
                    stream_id: self.substream_id,
2049
69
                    length: {
2050
69
                        // Because the number of queuable bytes is capped by the value in
2051
69
                        // `Config::max_out_data_frame_size`, we are guaranteed that the length
2052
69
                        // to write out fits in a `u32`.
2053
69
                        debug_assert!(self.yamux.inner.max_out_data_frame_size.get() <= u32::MAX);
2054
69
                        u32::try_from(self.inner_read_write.write_bytes_queued).unwrap()
2055
                    },
2056
                },
2057
            ));
2058
69
            if 
matches!63
(*local_write_close, SubstreamStateLocalWrite::FinDesired) {
2059
6
                *local_write_close = SubstreamStateLocalWrite::FinQueued;
2060
63
            }
2061
69
            *first_message_queued = true;
2062
69
2063
69
            self.yamux.inner.outgoing = Outgoing::WritingOut {
2064
69
                buffers: write_buffers,
2065
69
            };
2066
69
2067
69
            self.outer_read_write.wake_up_asap();
2068
69
2069
69
            // Re-schedule the substream for writing, as it was maybe waiting for the queue to
2070
69
            // be flushed before writing more data.
2071
69
            self.yamux
2072
69
                .inner
2073
69
                .substreams_write_ready
2074
69
                .insert(self.substream_id);
2075
193
        } else if self.inner_read_write.write_bytes_queueable == Some(0) {
2076
0
            // Substream hasn't written anything because it wasn't able to write anything.
2077
0
            // Re-schedule the substream for when it is possible to write data out.
2078
0
            self.yamux
2079
0
                .inner
2080
0
                .substreams_write_ready
2081
0
                .insert(self.substream_id);
2082
193
        } else {
2083
193
            // Substream has nothing to write.
2084
193
        }
2085
2086
        // Mark the substream as dead if it won't ever wake up again.
2087
329
        if 
matches!313
(local_write_close, SubstreamStateLocalWrite::FinQueued)
2088
16
            && *remote_write_closed
2089
3
            && !will_wake_up_read_again
2090
2
            && !self
2091
2
                .yamux
2092
2
                .inner
2093
2
                .substreams_write_ready
2094
2
                .contains(&self.substream_id)
2095
0
            && !matches!(self.yamux.inner.outgoing,  Outgoing::PreparingDataFrame {
2096
0
                substream_id,
2097
                ..
2098
0
            } if substream_id == self.substream_id)
2099
        {
2100
1
            let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id);
2101
1
            debug_assert!(_was_inserted);
2102
1
            debug_assert!(!self
2103
1
                .yamux
2104
1
                .inner
2105
1
                .substreams_wake_up
2106
1
                .iter()
2107
1
                .any(|(_, s)| *s == self.substream_id));
2108
1
            debug_assert!(!self
2109
1
                .yamux
2110
1
                .inner
2111
1
                .substreams_write_ready
2112
1
                .contains(&self.substream_id));
2113
1
            debug_assert!(!self
2114
1
                .yamux
2115
1
                .inner
2116
1
                .window_frames_to_send
2117
1
                .contains_key(&self.substream_id));
2118
328
        }
2119
2120
329
        self.yamux
2121
329
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE6finishCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE6finishBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE6finishCsiUjFBJteJ7x_17smoldot_full_node
2122
2123
    /// Resets the substream being processed and returns the underlying [`Yamux`] object.
2124
5
    pub fn reset(self) -> Yamux<TNow, TSub> {
2125
5
        let substream_id = self.substream_id();
2126
5
        let mut yamux = self.finish();
2127
5
        match yamux.reset(substream_id) {
2128
4
            Ok(()) => {}
2129
1
            Err(ResetError::AlreadyClosed) => {}
2130
0
            Err(ResetError::AlreadyReset) => debug_assert!(false),
2131
        }
2132
5
        yamux
2133
5
    }
_RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE5resetBb_
Line
Count
Source
2124
5
    pub fn reset(self) -> Yamux<TNow, TSub> {
2125
5
        let substream_id = self.substream_id();
2126
5
        let mut yamux = self.finish();
2127
5
        match yamux.reset(substream_id) {
2128
4
            Ok(()) => {}
2129
1
            Err(ResetError::AlreadyClosed) => {}
2130
0
            Err(ResetError::AlreadyReset) => debug_assert!(false),
2131
        }
2132
5
        yamux
2133
5
    }
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE5resetCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE5resetBb_
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE5resetCsiUjFBJteJ7x_17smoldot_full_node
2134
}
2135
2136
impl<'a, TNow, TSub> fmt::Debug for SubstreamReadWrite<'a, TNow, TSub>
2137
where
2138
    TNow: Clone + cmp::Ord,
2139
{
2140
0
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2141
0
        f.debug_struct("SubstreamReadWrite")
2142
0
            .field("substream_id", &self.substream_id)
2143
0
            .finish()
2144
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs5_0ppEINtB5_18SubstreamReadWriteppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs5_0ppEINtB5_18SubstreamReadWriteppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
2145
}
2146
2147
/// Error potentially returned by [`Yamux::open_substream`].
2148
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsj_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_18OpenSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsj_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_18OpenSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2149
pub enum OpenSubstreamError {
2150
    /// A `GoAway` frame has been received in the past.
2151
    GoAwayReceived,
2152
    /// Impossible to allocate a new substream.
2153
    NoFreeSubstreamId,
2154
}
2155
2156
/// Error potentially returned by [`Yamux::reset`].
2157
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsl_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_10ResetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsl_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_10ResetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2158
pub enum ResetError {
2159
    /// Substream was already reset.
2160
    AlreadyReset,
2161
    /// Substream was already closed.
2162
    AlreadyClosed,
2163
}
2164
2165
/// Error potentially returned by [`Yamux::send_goaway`].
2166
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsn_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_15SendGoAwayErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsn_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_15SendGoAwayErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2167
pub enum SendGoAwayError {
2168
    /// A `GoAway` has already been sent.
2169
    AlreadySent,
2170
}
2171
2172
/// Error potentially returned by [`Yamux::accept_pending_substream`] or
2173
/// [`Yamux::reject_pending_substream`].
2174
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsp_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_21PendingSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsp_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_21PendingSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2175
pub enum PendingSubstreamError {
2176
    /// No substream is pending.
2177
    NoPendingSubstream,
2178
}
2179
2180
/// Error while decoding the Yamux stream.
2181
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsr_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsr_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2182
pub enum Error {
2183
    /// Failed to decode an incoming Yamux header.
2184
    HeaderDecode(header::YamuxHeaderDecodeError),
2185
    /// Received a SYN flag with a substream ID that is of the same side as the local side.
2186
    InvalidInboundStreamId(NonZeroU32),
2187
    /// Received a SYN flag with a known substream ID.
2188
    #[display(fmt = "Received a SYN flag with a known substream ID")]
2189
    UnexpectedSyn(NonZeroU32),
2190
    /// Remote tried to send more data than it was allowed to.
2191
    CreditsExceeded,
2192
    /// Number of credits allocated to the local node has overflowed.
2193
    LocalCreditsOverflow,
2194
    /// Remote sent additional data on a substream after having sent the FIN flag.
2195
    WriteAfterFin,
2196
    /// Remote has sent a data frame containing data at the same time as a `RST` flag.
2197
    DataWithRst,
2198
    /// Remote has sent a ping response, but its opaque data didn't match any of the ping that
2199
    /// have been sent out in the past.
2200
    PingResponseNotMatching,
2201
    /// Maximum number of simultaneous RST frames to send out has been exceeded.
2202
    MaxSimultaneousRstSubstreamsExceeded,
2203
    /// Maximum number of simultaneous PONG frames to send out has been exceeded.
2204
    MaxSimultaneousPingsExceeded,
2205
    /// The remote should have sent an ACK flag but didn't.
2206
    ExpectedAck,
2207
    /// The remote sent an ACK flag but shouldn't have.
2208
    UnexpectedAck,
2209
    /// Received multiple `GoAway` frames.
2210
    MultipleGoAways,
2211
}
2212
2213
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
2214
pub enum DeadSubstreamTy {
2215
    ClosedGracefully,
2216
    Reset,
2217
}
2218
2219
/// By default, all new substreams have this implicit window size.
2220
pub const NEW_SUBSTREAMS_FRAME_SIZE: u64 = 256 * 1024;