/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/yamux.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! Yamux multiplexing protocol. |
19 | | //! |
20 | | //! The Yamux protocol is a multiplexing protocol. As such, it allows dividing a single stream of |
21 | | //! data, typically a TCP socket, into multiple individual parallel substreams. The data sent and |
22 | | //! received over that single stream is divided into frames which, with the exception of `ping` |
23 | | //! and `goaway` frames, belong to a specific substream. In other words, the data transmitted |
24 | | //! over the substreams is interleaved. |
25 | | //! |
26 | | //! Specification available at <https://github.com/hashicorp/yamux/blob/master/spec.md> |
27 | | //! |
28 | | //! # Usage |
29 | | //! |
30 | | //! The [`Yamux`] object holds the state of all yamux-specific information, and the list of |
31 | | //! all currently-open substreams. |
32 | | //! |
33 | | //! The generic parameter of [`Yamux`] is an opaque "user data" associated to each substream. |
34 | | //! |
35 | | |
36 | | // TODO: more documentation |
37 | | |
38 | | use crate::{ |
39 | | libp2p::read_write::{self, ReadWrite}, |
40 | | util::SipHasherBuild, |
41 | | }; |
42 | | |
43 | | use alloc::{ |
44 | | boxed::Box, |
45 | | collections::{BTreeSet, VecDeque}, |
46 | | vec::Vec, |
47 | | }; |
48 | | use core::{ |
49 | | cmp, fmt, mem, |
50 | | num::{NonZeroU32, NonZeroU64, NonZeroUsize}, |
51 | | ops, |
52 | | }; |
53 | | use rand::seq::IteratorRandom as _; |
54 | | use rand_chacha::{ |
55 | | rand_core::{RngCore as _, SeedableRng as _}, |
56 | | ChaCha20Rng, |
57 | | }; |
58 | | |
59 | | pub use header::GoAwayErrorCode; |
60 | | |
61 | | mod header; |
62 | | mod tests; |
63 | | |
64 | | /// Name of the protocol, typically used when negotiated it using *multistream-select*. |
65 | | pub const PROTOCOL_NAME: &str = "/yamux/1.0.0"; |
66 | | |
67 | | /// Configuration for a new [`Yamux`]. |
68 | | #[derive(Debug)] |
69 | | pub struct Config { |
70 | | /// `true` if the local machine has initiated the connection. Otherwise, `false`. |
71 | | pub is_initiator: bool, |
72 | | |
73 | | /// Expected number of substreams simultaneously open, both inbound and outbound substreams |
74 | | /// combined. |
75 | | pub capacity: usize, |
76 | | |
77 | | /// Seed used for the randomness. Used to avoid HashDoS attack and determines the order in |
78 | | /// which the data on substreams is sent out. |
79 | | pub randomness_seed: [u8; 32], |
80 | | |
81 | | /// Maximum size of data frames to send out. |
82 | | /// |
83 | | /// A higher value increases the variance of the latency of the data sent on the substreams, |
84 | | /// which is undesirable. A lower value increases the overhead of the Yamux protocol. This |
85 | | /// overhead is equal to `1200 / (max_out_data_frame_size + 12)` per cent, for example setting |
86 | | /// `max_out_data_frame_size` to 24 incurs a `33%` overhead. |
87 | | /// |
88 | | /// The "best" value depends on the bandwidth speed of the underlying connection, and is thus |
89 | | /// impossible to tell. |
90 | | /// |
91 | | /// A typical value is `8192`. |
92 | | pub max_out_data_frame_size: NonZeroU32, |
93 | | |
94 | | /// When the remote sends a ping, we need to send out a pong. However, the remote could refuse |
95 | | /// to read any additional data from the socket and continue sending pings, thus increasing |
96 | | /// the local buffer size indefinitely. In order to protect against this attack, there exists |
97 | | /// a maximum number of queued pongs, after which the connection will be shut down abruptly. |
98 | | pub max_simultaneous_queued_pongs: NonZeroUsize, |
99 | | |
100 | | /// When the remote sends a substream, and this substream gets rejected by the API user, some |
101 | | /// data needs to be sent out. However, the remote could refuse reading any additional data |
102 | | /// and continue sending new substream requests, thus increasing the local buffer size |
103 | | /// indefinitely. In order to protect against this attack, there exists a maximum number of |
104 | | /// queued substream rejections after which the connection will be shut down abruptly. |
105 | | pub max_simultaneous_rst_substreams: NonZeroUsize, |
106 | | } |
107 | | |
108 | | /// Yamux state machine. See [the module-level documentation](..) for more information. |
109 | | pub struct Yamux<TNow, TSub> { |
110 | | /// The actual fields are wrapped in a `Box` because the `Yamux` object is moved around pretty |
111 | | /// often. |
112 | | inner: Box<YamuxInner<TNow, TSub>>, |
113 | | } |
114 | | |
115 | | struct YamuxInner<TNow, TSub> { |
116 | | /// List of substreams currently open in the Yamux state machine. |
117 | | /// |
118 | | /// A `SipHasher` is used in order to avoid hash collision attacks on substream IDs. |
119 | | substreams: hashbrown::HashMap<NonZeroU32, Substream<TNow, TSub>, SipHasherBuild>, |
120 | | |
121 | | /// Subset of the content of [`YamuxInner::substreams`] that is considered "dead", meaning |
122 | | /// that it is returned by [`Yamux::dead_substreams`]. |
123 | | dead_substreams: hashbrown::HashSet<NonZeroU32, SipHasherBuild>, |
124 | | |
125 | | /// Subset of the content of [`YamuxInner::substreams`] that requires some process because |
126 | | /// they have data in their read buffer or their `wake_up_after` value is reached. |
127 | | /// |
128 | | /// All the substreams are always in the "healthy" state. |
129 | | /// |
130 | | /// Keys are the time after which this substream should be processed, which can be inferior or |
131 | | /// equal to "now" for an immediate wake up. A key equal to `None` means "right now". |
132 | | substreams_wake_up: BTreeSet<(Option<TNow>, NonZeroU32)>, |
133 | | |
134 | | /// List of substreams that might want to write out additional data. Processed when it is |
135 | | /// possible to send out data. |
136 | | /// |
137 | | /// All the substreams are always in the "healthy" state. |
138 | | /// |
139 | | /// Contrary to [`YamuxInner::substreams_wake_up`], the substreams in this list are processed |
140 | | /// only if it is possible to queue out more data for sending. |
141 | | substreams_write_ready: hashbrown::HashSet<NonZeroU32, SipHasherBuild>, |
142 | | |
143 | | /// List of window frames to send to the remote. For each substream, the amount of bytes to |
144 | | /// add to the window. |
145 | | window_frames_to_send: hashbrown::HashMap<NonZeroU32, NonZeroU64, SipHasherBuild>, |
146 | | |
147 | | /// Number of substreams within [`YamuxInner::substreams`] whose [`Substream::inbound`] is |
148 | | /// `true`. |
149 | | num_inbound: usize, |
150 | | |
151 | | /// `Some` if a `GoAway` frame has been received in the past. |
152 | | received_goaway: Option<GoAwayErrorCode>, |
153 | | |
154 | | /// Whether to send out a `GoAway` frame. |
155 | | outgoing_goaway: OutgoingGoAway, |
156 | | |
157 | | /// What kind of data is expected on the socket next. |
158 | | incoming: Incoming, |
159 | | |
160 | | /// What is currently being written out. |
161 | | outgoing: Outgoing, |
162 | | |
163 | | /// See [`Config::max_out_data_frame_size`]. |
164 | | max_out_data_frame_size: NonZeroU32, |
165 | | |
166 | | /// Id of the next outgoing substream to open. |
167 | | /// This implementation allocates identifiers linearly. Every time a substream is open, its |
168 | | /// value is incremented by two. |
169 | | next_outbound_substream: NonZeroU32, |
170 | | |
171 | | /// Number of pings to send out that haven't been queued yet. |
172 | | pings_to_send: usize, |
173 | | |
174 | | /// List of pings that have been sent out but haven't been replied yet. |
175 | | pings_waiting_reply: VecDeque<u32>, |
176 | | |
177 | | /// List of opaque values corresponding to ping requests sent by the remote. For each entry, |
178 | | /// a PONG header should be sent to the remote. |
179 | | pongs_to_send: VecDeque<u32>, |
180 | | |
181 | | /// See [`Config::max_simultaneous_queued_pongs`]. |
182 | | max_simultaneous_queued_pongs: NonZeroUsize, |
183 | | |
184 | | /// List of substream IDs that have been reset locally. For each entry, a RST header should |
185 | | /// be sent to the remote. |
186 | | rsts_to_send: VecDeque<NonZeroU32>, |
187 | | |
188 | | /// See [`Config::max_simultaneous_rst_substreams`]. |
189 | | max_simultaneous_rst_substreams: NonZeroUsize, |
190 | | |
191 | | /// Source of randomness used for various purposes. |
192 | | randomness: ChaCha20Rng, |
193 | | } |
194 | | |
195 | | struct Substream<TNow, TSub> { |
196 | | /// State of the substream. |
197 | | state: SubstreamState<TNow>, |
198 | | /// `true` if the substream has been opened by the remote. |
199 | | inbound: bool, |
200 | | /// Data chosen by the user. |
201 | | user_data: TSub, |
202 | | } |
203 | | |
204 | | enum SubstreamState<TNow> { |
205 | | Healthy { |
206 | | /// True if a message on this substream has already been sent since it has been opened. The |
207 | | /// first message on a substream must contain either a SYN or `ACK` flag. |
208 | | first_message_queued: bool, |
209 | | /// True if the remote has sent a message on this substream and has thus acknowledged that |
210 | | /// this substream exists. |
211 | | remote_syn_acked: bool, |
212 | | /// Amount of data the remote is allowed to transmit to the local node. Does not take into |
213 | | /// account window frames that haven't been sent out yet. In other words, this value is |
214 | | /// increased when a window frame is sent out. |
215 | | remote_allowed_window: u64, |
216 | | /// Amount of data the local node is allowed to transmit to the remote. |
217 | | allowed_window: u64, |
218 | | /// State of the local writing side of this substream. |
219 | | local_write_close: SubstreamStateLocalWrite, |
220 | | /// True if the writing side of the remote node is closed for this substream. |
221 | | remote_write_closed: bool, |
222 | | /// Buffer of incoming data that hasn't been processed by the substream yet. |
223 | | read_buffer: Vec<u8>, |
224 | | /// Value of [`ReadWrite::expected_incoming_bytes`] previously yielded by the substream. |
225 | | /// `None` means "unknown". |
226 | | expected_incoming_bytes: Option<usize>, |
227 | | /// If this substream is currently in [`YamuxInner::substreams_wake_up`], this contains |
228 | | /// the key where it is currently inserted. |
229 | | substreams_wake_up_key: Option<Option<TNow>>, |
230 | | }, |
231 | | |
232 | | /// The substream has been reset, either locally or by the remote. Its entire purpose is to |
233 | | /// be removed by the API user. |
234 | | Reset, |
235 | | } |
236 | | |
237 | | enum SubstreamStateLocalWrite { |
238 | | Open, |
239 | | FinDesired, |
240 | | FinQueued, |
241 | | } |
242 | | |
243 | | enum Incoming { |
244 | | /// Expect a header. The field might contain some already-read bytes. |
245 | | Header, |
246 | | |
247 | | /// Expect the data of a previously-received data frame header. |
248 | | /// |
249 | | /// Note that the state of the reading side of the substream might already be closed. The |
250 | | /// implementation verifies that the remote is allowed to send data when processing the header, |
251 | | /// the immediately sets the reading side to "closed" if the FIN flag is set, even if there |
252 | | /// is still data to be received. |
253 | | DataFrame { |
254 | | /// Identifier of the substream the data belongs to. |
255 | | substream_id: NonZeroU32, |
256 | | /// Number of bytes of data remaining before the frame ends. |
257 | | remaining_bytes: u32, |
258 | | }, |
259 | | |
260 | | /// A header referring to a new substream has been received. The reception of any further data |
261 | | /// is blocked waiting for the API user to accept or reject this substream. |
262 | | PendingIncomingSubstream { |
263 | | /// Identifier of the pending substream. |
264 | | substream_id: NonZeroU32, |
265 | | /// Extra local window size to give to this substream. |
266 | | extra_window: u32, |
267 | | /// If non-zero, must transition to a [`Incoming::DataFrame`]. |
268 | | data_frame_size: u32, |
269 | | /// True if the remote writing side of the substream should immediately be closed. |
270 | | fin: bool, |
271 | | }, |
272 | | } |
273 | | |
274 | | enum Outgoing { |
275 | | WritingOut { |
276 | | /// Buffers of data to write out. |
277 | | buffers: Vec<Vec<u8>>, |
278 | | }, |
279 | | PreparingDataFrame { |
280 | | /// Substream concerned by the data frame. Always healthy. |
281 | | substream_id: NonZeroU32, |
282 | | /// Buffers of data that the substream is producing. Does not include the Yamux header. |
283 | | /// Must never be empty. |
284 | | write_buffers: Vec<Vec<u8>>, |
285 | | }, |
286 | | } |
287 | | |
288 | | enum OutgoingGoAway { |
289 | | /// No `GoAway` frame has been sent or requested. Normal mode of operations. |
290 | | NotRequired, |
291 | | |
292 | | /// API user has asked to send a `GoAway` frame. This frame hasn't been queued into |
293 | | /// [`YamuxInner::outgoing`] yet. |
294 | | Required(GoAwayErrorCode), |
295 | | |
296 | | /// A `GoAway` frame has been queued into [`YamuxInner::outgoing`] in the past. |
297 | | Queued, |
298 | | |
299 | | /// A `GoAway` frame has been extracted through [`Yamux::read_write`]. |
300 | | Sent, |
301 | | } |
302 | | |
303 | | /// Maximum number of simultaneous outgoing pings allowed. |
304 | | pub const MAX_PINGS: usize = 100000; |
305 | | |
306 | | impl<TNow, TSub> Yamux<TNow, TSub> { |
307 | | /// Initializes a new Yamux state machine. |
308 | 14 | pub fn new(config: Config) -> Yamux<TNow, TSub> { |
309 | 14 | let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed); |
310 | 14 | |
311 | 14 | Yamux { |
312 | 14 | inner: Box::new(YamuxInner { |
313 | 14 | substreams: hashbrown::HashMap::with_capacity_and_hasher( |
314 | 14 | config.capacity, |
315 | 14 | SipHasherBuild::new({ |
316 | 14 | let mut seed = [0; 16]; |
317 | 14 | randomness.fill_bytes(&mut seed); |
318 | 14 | seed |
319 | 14 | }), |
320 | 14 | ), |
321 | 14 | dead_substreams: hashbrown::HashSet::with_capacity_and_hasher( |
322 | 14 | config.capacity, |
323 | 14 | SipHasherBuild::new({ |
324 | 14 | let mut seed = [0; 16]; |
325 | 14 | randomness.fill_bytes(&mut seed); |
326 | 14 | seed |
327 | 14 | }), |
328 | 14 | ), |
329 | 14 | substreams_wake_up: BTreeSet::new(), |
330 | 14 | substreams_write_ready: hashbrown::HashSet::with_capacity_and_hasher( |
331 | 14 | config.capacity, |
332 | 14 | SipHasherBuild::new({ |
333 | 14 | let mut seed = [0; 16]; |
334 | 14 | randomness.fill_bytes(&mut seed); |
335 | 14 | seed |
336 | 14 | }), |
337 | 14 | ), |
338 | 14 | window_frames_to_send: hashbrown::HashMap::with_capacity_and_hasher( |
339 | 14 | config.capacity, |
340 | 14 | SipHasherBuild::new({ |
341 | 14 | let mut seed = [0; 16]; |
342 | 14 | randomness.fill_bytes(&mut seed); |
343 | 14 | seed |
344 | 14 | }), |
345 | 14 | ), |
346 | 14 | num_inbound: 0, |
347 | 14 | received_goaway: None, |
348 | 14 | incoming: Incoming::Header, |
349 | 14 | outgoing: Outgoing::WritingOut { |
350 | 14 | // TODO: capacity? |
351 | 14 | buffers: Vec::with_capacity(16), |
352 | 14 | }, |
353 | 14 | outgoing_goaway: OutgoingGoAway::NotRequired, |
354 | 14 | max_out_data_frame_size: config.max_out_data_frame_size, |
355 | 14 | next_outbound_substream: if config.is_initiator { |
356 | 7 | NonZeroU32::new(1).unwrap() |
357 | | } else { |
358 | 7 | NonZeroU32::new(2).unwrap() |
359 | | }, |
360 | | pings_to_send: 0, |
361 | | // We leave the initial capacity at 0, as it is likely that no ping is sent at all. |
362 | 14 | pings_waiting_reply: VecDeque::with_capacity(0), |
363 | 14 | pongs_to_send: VecDeque::with_capacity(4), |
364 | 14 | max_simultaneous_queued_pongs: config.max_simultaneous_queued_pongs, |
365 | 14 | rsts_to_send: VecDeque::with_capacity(4), |
366 | 14 | max_simultaneous_rst_substreams: config.max_simultaneous_rst_substreams, |
367 | 14 | randomness, |
368 | 14 | }), |
369 | 14 | } |
370 | 14 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE3newB8_ Line | Count | Source | 308 | 14 | pub fn new(config: Config) -> Yamux<TNow, TSub> { | 309 | 14 | let mut randomness = ChaCha20Rng::from_seed(config.randomness_seed); | 310 | 14 | | 311 | 14 | Yamux { | 312 | 14 | inner: Box::new(YamuxInner { | 313 | 14 | substreams: hashbrown::HashMap::with_capacity_and_hasher( | 314 | 14 | config.capacity, | 315 | 14 | SipHasherBuild::new({ | 316 | 14 | let mut seed = [0; 16]; | 317 | 14 | randomness.fill_bytes(&mut seed); | 318 | 14 | seed | 319 | 14 | }), | 320 | 14 | ), | 321 | 14 | dead_substreams: hashbrown::HashSet::with_capacity_and_hasher( | 322 | 14 | config.capacity, | 323 | 14 | SipHasherBuild::new({ | 324 | 14 | let mut seed = [0; 16]; | 325 | 14 | randomness.fill_bytes(&mut seed); | 326 | 14 | seed | 327 | 14 | }), | 328 | 14 | ), | 329 | 14 | substreams_wake_up: BTreeSet::new(), | 330 | 14 | substreams_write_ready: hashbrown::HashSet::with_capacity_and_hasher( | 331 | 14 | config.capacity, | 332 | 14 | SipHasherBuild::new({ | 333 | 14 | let mut seed = [0; 16]; | 334 | 14 | randomness.fill_bytes(&mut seed); | 335 | 14 | seed | 336 | 14 | }), | 337 | 14 | ), | 338 | 14 | window_frames_to_send: hashbrown::HashMap::with_capacity_and_hasher( | 339 | 14 | config.capacity, | 340 | 14 | SipHasherBuild::new({ | 341 | 14 | let mut seed = [0; 16]; | 342 | 14 | randomness.fill_bytes(&mut seed); | 343 | 14 | seed | 344 | 14 | }), | 345 | 14 | ), | 346 | 14 | num_inbound: 0, | 347 | 14 | received_goaway: None, | 348 | 14 | incoming: Incoming::Header, | 349 | 14 | outgoing: Outgoing::WritingOut { | 350 | 14 | // TODO: capacity? | 351 | 14 | buffers: Vec::with_capacity(16), | 352 | 14 | }, | 353 | 14 | outgoing_goaway: OutgoingGoAway::NotRequired, | 354 | 14 | max_out_data_frame_size: config.max_out_data_frame_size, | 355 | 14 | next_outbound_substream: if config.is_initiator { | 356 | 7 | NonZeroU32::new(1).unwrap() | 357 | | } else { | 358 | 7 | NonZeroU32::new(2).unwrap() | 359 | | }, | 360 | | pings_to_send: 0, | 361 | | // We leave the initial capacity at 0, as it is likely that no ping is sent at all. | 362 | 14 | pings_waiting_reply: VecDeque::with_capacity(0), | 363 | 14 | pongs_to_send: VecDeque::with_capacity(4), | 364 | 14 | max_simultaneous_queued_pongs: config.max_simultaneous_queued_pongs, | 365 | 14 | rsts_to_send: VecDeque::with_capacity(4), | 366 | 14 | max_simultaneous_rst_substreams: config.max_simultaneous_rst_substreams, | 367 | 14 | randomness, | 368 | 14 | }), | 369 | 14 | } | 370 | 14 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE3newB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE3newCsiUjFBJteJ7x_17smoldot_full_node |
371 | | |
372 | | /// Returns `true` if there is no substream in the state machine. |
373 | | /// |
374 | | /// > **Note**: After a substream has been closed or reset, it must be removed using |
375 | | /// > [`Yamux::remove_dead_substream`] before this function can return `true`. |
376 | 0 | pub fn is_empty(&self) -> bool { |
377 | 0 | self.inner.substreams.is_empty() |
378 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE8is_emptyB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE8is_emptyB8_ |
379 | | |
380 | | /// Returns the number of substreams in the Yamux state machine. Includes substreams that are |
381 | | /// dead but haven't been removed yet. |
382 | 389 | pub fn len(&self) -> usize { |
383 | 389 | self.inner.substreams.len() |
384 | 389 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE3lenB8_ Line | Count | Source | 382 | 389 | pub fn len(&self) -> usize { | 383 | 389 | self.inner.substreams.len() | 384 | 389 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE3lenCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE3lenB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE3lenCsiUjFBJteJ7x_17smoldot_full_node |
385 | | |
386 | | /// Returns the number of inbound substreams in the Yamux state machine. Includes substreams |
387 | | /// that are dead but haven't been removed yet. |
388 | 20 | pub fn num_inbound(&self) -> usize { |
389 | 20 | debug_assert_eq!( |
390 | | self.inner.num_inbound, |
391 | 26 | self.inner.substreams.values().filter(20 |s| s.inbound).count()20 _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE11num_inbound0Ba_ Line | Count | Source | 391 | 26 | self.inner.substreams.values().filter(|s| s.inbound).count() |
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE11num_inbound0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11num_inbound0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE11num_inbound0CsiUjFBJteJ7x_17smoldot_full_node |
392 | | ); |
393 | | |
394 | 20 | self.inner.num_inbound |
395 | 20 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE11num_inboundB8_ Line | Count | Source | 388 | 20 | pub fn num_inbound(&self) -> usize { | 389 | 20 | debug_assert_eq!( | 390 | | self.inner.num_inbound, | 391 | 20 | self.inner.substreams.values().filter(|s| s.inbound).count() | 392 | | ); | 393 | | | 394 | 20 | self.inner.num_inbound | 395 | 20 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE11num_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE11num_inboundB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE11num_inboundCsiUjFBJteJ7x_17smoldot_full_node |
396 | | |
397 | | /// Returns `Some` if a [`ReadWriteOutcome::GoAway`] event has been generated in the past, |
398 | | /// in which case the code is returned. |
399 | | /// |
400 | | /// If `Some` is returned, it is forbidden to open new outbound substreams. |
401 | 0 | pub fn received_goaway(&self) -> Option<GoAwayErrorCode> { |
402 | 0 | self.inner.received_goaway |
403 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE15received_goawayB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE15received_goawayCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE15received_goawayB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE15received_goawayCsiUjFBJteJ7x_17smoldot_full_node |
404 | | |
405 | | /// Returns an iterator to the list of all substream user datas. |
406 | 0 | pub fn user_datas(&self) -> impl ExactSizeIterator<Item = (SubstreamId, &TSub)> { |
407 | 0 | self.inner |
408 | 0 | .substreams |
409 | 0 | .iter() |
410 | 0 | .map(|(id, s)| (SubstreamId(*id), &s.user_data)) Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10user_datas0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10user_datas0Ba_ |
411 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE10user_datasB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE10user_datasB8_ |
412 | | |
413 | | /// Returns an iterator to the list of all substream user datas. |
414 | 0 | pub fn user_datas_mut(&mut self) -> impl ExactSizeIterator<Item = (SubstreamId, &mut TSub)> { |
415 | 0 | self.inner |
416 | 0 | .substreams |
417 | 0 | .iter_mut() |
418 | 0 | .map(|(id, s)| (SubstreamId(*id), &mut s.user_data)) Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14user_datas_mut0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14user_datas_mut0Ba_ |
419 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE14user_datas_mutB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE14user_datas_mutB8_ |
420 | | |
421 | | /// Returns `true` if the given [`SubstreamId`] exists. |
422 | | /// |
423 | | /// Also returns `true` if the substream is in a dead state. |
424 | 392 | pub fn has_substream(&self, substream_id: SubstreamId) -> bool { |
425 | 392 | self.inner.substreams.contains_key(&substream_id.0) |
426 | 392 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE13has_substreamB8_ Line | Count | Source | 424 | 392 | pub fn has_substream(&self, substream_id: SubstreamId) -> bool { | 425 | 392 | self.inner.substreams.contains_key(&substream_id.0) | 426 | 392 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE13has_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE13has_substreamB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE13has_substreamCsiUjFBJteJ7x_17smoldot_full_node |
427 | | |
428 | | /// Returns `true` if [`Yamux::send_goaway`] has been called in the past. |
429 | | /// |
430 | | /// In other words, returns `true` if a `GoAway` frame has been either queued for sending |
431 | | /// (and is available through [`Yamux::read_write`]) or has already been sent out. |
432 | 20 | pub fn goaway_queued_or_sent(&self) -> bool { |
433 | 20 | !matches!0 (self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) |
434 | 20 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE21goaway_queued_or_sentB8_ Line | Count | Source | 432 | 20 | pub fn goaway_queued_or_sent(&self) -> bool { | 433 | 20 | !matches!0 (self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) | 434 | 20 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE21goaway_queued_or_sentCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE21goaway_queued_or_sentB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE21goaway_queued_or_sentCsiUjFBJteJ7x_17smoldot_full_node |
435 | | |
436 | | /// Returns `true` if [`Yamux::send_goaway`] has been called in the past and that this |
437 | | /// `GoAway` frame has been extracted through [`Yamux::read_write`]. |
438 | 49 | pub fn goaway_sent(&self) -> bool { |
439 | 49 | matches!(self.inner.outgoing_goaway, OutgoingGoAway::Sent) |
440 | 49 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB18_6option6OptionTINtNtNtB4_11established9substream9SubstreamB14_EIB1G_uEEEE11goaway_sentB8_ Line | Count | Source | 438 | 49 | pub fn goaway_sent(&self) -> bool { | 439 | 49 | matches!(self.inner.outgoing_goaway, OutgoingGoAway::Sent) | 440 | 49 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB19_6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1H_IB1H_NtNtB6_10collection11SubstreamIdEEEEE11goaway_sentCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxppE11goaway_sentB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB2_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB4_11established9substream9SubstreamB15_EIB1F_IB1F_NtNtB6_10collection11SubstreamIdEEEEE11goaway_sentCsiUjFBJteJ7x_17smoldot_full_node |
441 | | } |
442 | | |
443 | | impl<TNow, TSub> Yamux<TNow, TSub> |
444 | | where |
445 | | TNow: Clone + cmp::Ord, |
446 | | { |
447 | | /// Opens a new substream. |
448 | | /// |
449 | | /// This method only modifies the state of `self` and reserves an identifier. No message needs |
450 | | /// to be sent to the remote before data is actually being sent on the substream. |
451 | | /// |
452 | | /// The substream will be automatically processed by [`Yamux::read_write`] in the future. |
453 | | /// |
454 | | /// > **Note**: Importantly, the remote will not be notified of the substream being open |
455 | | /// > before the local side sends data on this substream. As such, protocols where |
456 | | /// > the remote is expected to send data in response to a substream being open, |
457 | | /// > without the local side first sending some data on that substream, will not |
458 | | /// > work. In practice, while this is technically out of concern of the Yamux |
459 | | /// > protocol, all substreams in the context of libp2p start with a |
460 | | /// > multistream-select negotiation, and this scenario can therefore never happen. |
461 | | /// |
462 | | /// Returns an error if a [`ReadWriteOutcome::GoAway`] event has been generated. This can |
463 | | /// also be checked by calling [`Yamux::received_goaway`]. |
464 | | /// |
465 | | /// Returns an error if all possible substream IDs are already taken. This happen if there |
466 | | /// exists more than approximately `2^31` substreams, which is very unlikely to happen unless |
467 | | /// there exists a bug in the code. |
468 | | /// |
469 | 20 | pub fn open_substream(&mut self, user_data: TSub) -> Result<SubstreamId, OpenSubstreamError> { |
470 | 20 | if self.inner.received_goaway.is_some() { |
471 | 0 | return Err(OpenSubstreamError::GoAwayReceived); |
472 | 20 | } |
473 | 20 | |
474 | 20 | let substream_id = self.inner.next_outbound_substream; |
475 | 20 | |
476 | 20 | self.inner.next_outbound_substream = match self.inner.next_outbound_substream.checked_add(2) |
477 | | { |
478 | 20 | Some(new_id) => new_id, |
479 | 0 | None => return Err(OpenSubstreamError::NoFreeSubstreamId), |
480 | | }; |
481 | | |
482 | 20 | let _prev_value = self.inner.substreams.insert( |
483 | 20 | substream_id, |
484 | 20 | Substream { |
485 | 20 | state: SubstreamState::Healthy { |
486 | 20 | first_message_queued: false, |
487 | 20 | remote_syn_acked: false, |
488 | 20 | remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE, |
489 | 20 | allowed_window: NEW_SUBSTREAMS_FRAME_SIZE, |
490 | 20 | local_write_close: SubstreamStateLocalWrite::Open, |
491 | 20 | remote_write_closed: false, |
492 | 20 | expected_incoming_bytes: None, |
493 | 20 | read_buffer: Vec::new(), // TODO: capacity? |
494 | 20 | substreams_wake_up_key: Some(None), |
495 | 20 | }, |
496 | 20 | inbound: false, |
497 | 20 | user_data, |
498 | 20 | }, |
499 | 20 | ); |
500 | 20 | debug_assert!(_prev_value.is_none()); |
501 | | |
502 | | // The substream is added to `substreams_wake_up` rather than `substreams_write_ready`, |
503 | | // in case the substream processing does some magic such as generate events before having |
504 | | // even done anything. |
505 | 20 | self.inner.substreams_wake_up.insert((None, substream_id)); |
506 | 20 | |
507 | 20 | Ok(SubstreamId(substream_id)) |
508 | 20 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE14open_substreamBa_ Line | Count | Source | 469 | 20 | pub fn open_substream(&mut self, user_data: TSub) -> Result<SubstreamId, OpenSubstreamError> { | 470 | 20 | if self.inner.received_goaway.is_some() { | 471 | 0 | return Err(OpenSubstreamError::GoAwayReceived); | 472 | 20 | } | 473 | 20 | | 474 | 20 | let substream_id = self.inner.next_outbound_substream; | 475 | 20 | | 476 | 20 | self.inner.next_outbound_substream = match self.inner.next_outbound_substream.checked_add(2) | 477 | | { | 478 | 20 | Some(new_id) => new_id, | 479 | 0 | None => return Err(OpenSubstreamError::NoFreeSubstreamId), | 480 | | }; | 481 | | | 482 | 20 | let _prev_value = self.inner.substreams.insert( | 483 | 20 | substream_id, | 484 | 20 | Substream { | 485 | 20 | state: SubstreamState::Healthy { | 486 | 20 | first_message_queued: false, | 487 | 20 | remote_syn_acked: false, | 488 | 20 | remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE, | 489 | 20 | allowed_window: NEW_SUBSTREAMS_FRAME_SIZE, | 490 | 20 | local_write_close: SubstreamStateLocalWrite::Open, | 491 | 20 | remote_write_closed: false, | 492 | 20 | expected_incoming_bytes: None, | 493 | 20 | read_buffer: Vec::new(), // TODO: capacity? | 494 | 20 | substreams_wake_up_key: Some(None), | 495 | 20 | }, | 496 | 20 | inbound: false, | 497 | 20 | user_data, | 498 | 20 | }, | 499 | 20 | ); | 500 | 20 | debug_assert!(_prev_value.is_none()); | 501 | | | 502 | | // The substream is added to `substreams_wake_up` rather than `substreams_write_ready`, | 503 | | // in case the substream processing does some magic such as generate events before having | 504 | | // even done anything. | 505 | 20 | self.inner.substreams_wake_up.insert((None, substream_id)); | 506 | 20 | | 507 | 20 | Ok(SubstreamId(substream_id)) | 508 | 20 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE14open_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE14open_substreamBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE14open_substreamCsiUjFBJteJ7x_17smoldot_full_node |
509 | | |
510 | | /// Marks the given substream as being ready to write out data. |
511 | | /// |
512 | | /// Calling this function is necessary in situations where a substream didn't write any data |
513 | | /// in the past, but some sort of manual state update will make it write data in the future. |
514 | | /// |
515 | | /// Has no effect if the substream has been reset or closed. |
516 | | /// |
517 | | /// # Panic |
518 | | /// |
519 | | /// Panics if the [`SubstreamId`] is invalid. |
520 | | /// |
521 | 28 | pub fn mark_substream_write_ready(&mut self, substream_id: SubstreamId) { |
522 | 28 | assert!(self.inner.substreams.contains_key(&substream_id.0)); |
523 | 28 | if !self.inner.dead_substreams.contains(&substream_id.0) { |
524 | 28 | self.inner.substreams_write_ready.insert(substream_id.0); |
525 | 28 | }0 |
526 | 28 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE26mark_substream_write_readyBa_ Line | Count | Source | 521 | 28 | pub fn mark_substream_write_ready(&mut self, substream_id: SubstreamId) { | 522 | 28 | assert!(self.inner.substreams.contains_key(&substream_id.0)); | 523 | 28 | if !self.inner.dead_substreams.contains(&substream_id.0) { | 524 | 28 | self.inner.substreams_write_ready.insert(substream_id.0); | 525 | 28 | }0 | 526 | 28 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE26mark_substream_write_readyCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE26mark_substream_write_readyBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE26mark_substream_write_readyCsiUjFBJteJ7x_17smoldot_full_node |
527 | | |
528 | | /// Feeds data coming from a socket and outputs data to write to the socket. |
529 | | /// |
530 | | /// Returns an object that implements `Deref<Target = ReadWrite>`. This object represents the |
531 | | /// decrypted stream of data. |
532 | | /// |
533 | | /// An error is returned if the protocol is being violated by the remote or if the nonce |
534 | | /// overflows. When that happens, the connection should be closed altogether. |
535 | 389 | pub fn read_write( |
536 | 389 | mut self, |
537 | 389 | outer_read_write: &mut ReadWrite<TNow>, |
538 | 389 | ) -> Result<ReadWriteOutcome<'_, TNow, TSub>, Error> { |
539 | | // Queue something for writing if necessary. |
540 | 389 | if let Outgoing::WritingOut { buffers323 } = &mut self.inner.outgoing { |
541 | | // Make sure that it's not just a list of empty buffers. |
542 | 323 | debug_assert_eq!( |
543 | 323 | buffers.is_empty(), |
544 | 323 | buffers.iter().fold(0, |sz, b| sz + b.len()201 ) == 0 _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes6_0Bc_ Line | Count | Source | 544 | 201 | buffers.iter().fold(0, |sz, b| sz + b.len()) == 0 |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes6_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes6_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes6_0CsiUjFBJteJ7x_17smoldot_full_node |
545 | | ); |
546 | | |
547 | 323 | if buffers.is_empty() { |
548 | 254 | if let OutgoingGoAway::Queued = self.inner.outgoing_goaway { |
549 | 0 | self.inner.outgoing_goaway = OutgoingGoAway::Sent; |
550 | 254 | } |
551 | | |
552 | 254 | if let OutgoingGoAway::Required(error_code0 ) = self.inner.outgoing_goaway { |
553 | 0 | // Send a `GoAway` frame if demanded. |
554 | 0 | buffers.push( |
555 | 0 | header::encode(&header::DecodedYamuxHeader::GoAway { error_code }).to_vec(), |
556 | 0 | ); |
557 | 0 | self.inner.outgoing_goaway = OutgoingGoAway::Queued; |
558 | 254 | } else if let Some(substream_id0 ) = self.inner.rsts_to_send.pop_front() { |
559 | 0 | // Send RST frame. |
560 | 0 | buffers.push( |
561 | 0 | header::encode(&header::DecodedYamuxHeader::Window { |
562 | 0 | syn: false, |
563 | 0 | ack: false, |
564 | 0 | fin: false, |
565 | 0 | rst: true, |
566 | 0 | stream_id: substream_id, |
567 | 0 | length: 0, |
568 | 0 | }) |
569 | 0 | .to_vec(), |
570 | 0 | ); |
571 | 254 | } else if self.inner.pings_to_send > 0 { |
572 | | // Send outgoing pings. |
573 | 0 | self.inner.pings_to_send -= 1; |
574 | 0 | let opaque_value: u32 = self.inner.randomness.next_u32(); |
575 | 0 | self.inner.pings_waiting_reply.push_back(opaque_value); |
576 | 0 | buffers.push( |
577 | 0 | header::encode(&header::DecodedYamuxHeader::PingRequest { opaque_value }) |
578 | 0 | .to_vec(), |
579 | 0 | ); |
580 | 0 | debug_assert!(self.inner.pings_waiting_reply.len() <= MAX_PINGS); |
581 | 254 | } else if let Some(opaque_value0 ) = self.inner.pongs_to_send.pop_front() { |
582 | 0 | // Send outgoing pongs. |
583 | 0 | buffers.push( |
584 | 0 | header::encode(&header::DecodedYamuxHeader::PingResponse { opaque_value }) |
585 | 0 | .to_vec(), |
586 | 0 | ); |
587 | 254 | } else if let Some(substream_id0 ) = self |
588 | 254 | .inner |
589 | 254 | .window_frames_to_send |
590 | 254 | .keys() |
591 | 254 | .choose(&mut self.inner.randomness) |
592 | 254 | .copied() |
593 | | { |
594 | | // Send window frame. |
595 | | let Some(Substream { |
596 | 0 | inbound, |
597 | 0 | state: |
598 | 0 | SubstreamState::Healthy { |
599 | 0 | first_message_queued, |
600 | 0 | remote_allowed_window, |
601 | 0 | local_write_close, |
602 | | .. |
603 | | }, |
604 | | .. |
605 | 0 | }) = &mut self.inner.substreams.get_mut(&substream_id) |
606 | | else { |
607 | 0 | unreachable!() |
608 | | }; |
609 | | |
610 | 0 | let mut pending_window_increase = self |
611 | 0 | .inner |
612 | 0 | .window_frames_to_send |
613 | 0 | .remove(&substream_id) |
614 | 0 | .unwrap() |
615 | 0 | .get(); |
616 | 0 |
|
617 | 0 | let actual_window_update = |
618 | 0 | u32::try_from(pending_window_increase).unwrap_or(u32::MAX); |
619 | 0 | buffers.push( |
620 | 0 | header::encode(&header::DecodedYamuxHeader::Window { |
621 | 0 | syn: !*first_message_queued && !*inbound, |
622 | 0 | ack: !*first_message_queued && *inbound, |
623 | | // Note that it is unclear whether `fin` should be set if the local |
624 | | // writing state has been written before. |
625 | 0 | fin: matches!(local_write_close, SubstreamStateLocalWrite::FinDesired), |
626 | | rst: false, |
627 | 0 | stream_id: substream_id, |
628 | 0 | length: actual_window_update, |
629 | 0 | }) |
630 | 0 | .to_vec(), |
631 | 0 | ); |
632 | 0 |
|
633 | 0 | *remote_allowed_window = |
634 | 0 | remote_allowed_window.saturating_add(u64::from(actual_window_update)); |
635 | 0 | pending_window_increase -= u64::from(actual_window_update); |
636 | 0 | *first_message_queued = true; |
637 | 0 | if matches!(local_write_close, SubstreamStateLocalWrite::FinDesired) { |
638 | 0 | *local_write_close = SubstreamStateLocalWrite::FinQueued; |
639 | 0 | } |
640 | | |
641 | | // In the rare situation where the window update doesn't fit in a `u32`, we |
642 | | // have to send another window frame again later. |
643 | 0 | if let Some(pending_window_increase) = NonZeroU64::new(pending_window_increase) |
644 | 0 | { |
645 | 0 | self.inner |
646 | 0 | .window_frames_to_send |
647 | 0 | .insert(substream_id, pending_window_increase); |
648 | 0 | } |
649 | 254 | } |
650 | 69 | } |
651 | 66 | } |
652 | | |
653 | | // Try finish writing the data currently being written. |
654 | 389 | if let Outgoing::WritingOut { buffers323 } = &mut self.inner.outgoing { |
655 | 323 | let buffers_total_size = buffers.iter().fold(0, |count, buf| count + buf.len()201 ); _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_write0Bc_ Line | Count | Source | 655 | 201 | let buffers_total_size = buffers.iter().fold(0, |count, buf| count + buf.len()); |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_write0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_write0CsiUjFBJteJ7x_17smoldot_full_node |
656 | 323 | |
657 | 323 | if buffers_total_size == 0 { |
658 | 254 | // Do nothing. |
659 | 254 | } else if outer_read_write |
660 | 69 | .write_bytes_queueable |
661 | 69 | .map_or(false, |queuable| buffers_total_size <= queuable) _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes_0Bc_ Line | Count | Source | 661 | 69 | .map_or(false, |queuable| buffers_total_size <= queuable) |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes_0CsiUjFBJteJ7x_17smoldot_full_node |
662 | | { |
663 | | // We can directly push all the write buffers to the `ReadWrite`. |
664 | 69 | if outer_read_write.write_buffers.is_empty() { |
665 | 69 | debug_assert_eq!(outer_read_write.write_bytes_queued, 0); |
666 | 69 | outer_read_write.write_buffers = mem::take(buffers); |
667 | 0 | } else { |
668 | 0 | outer_read_write.write_buffers.append(buffers); |
669 | 0 | } |
670 | | |
671 | 69 | outer_read_write.write_bytes_queued += buffers_total_size; |
672 | 69 | *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_total_size; |
673 | 0 | } else if outer_read_write.write_buffers.is_empty() |
674 | 0 | && outer_read_write |
675 | 0 | .write_bytes_queueable |
676 | 0 | .map_or(false, |queueable| { |
677 | 0 | buffers.first().map_or(0, |b| b.len()) <= queueable Unexecuted instantiation: _RNCNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1e_6option6OptionTINtNtNtBa_11established9substream9SubstreamB1a_EIB1M_uEEEE10read_writes0_00Be_ Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1f_6option6OptionTINtNtNtBa_11established9substream9SubstreamB1b_EIB1N_IB1N_NtNtBc_10collection11SubstreamIdEEEEE10read_writes0_00CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxppE10read_writes0_00Be_ Unexecuted instantiation: _RNCNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB8_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtBa_11established9substream9SubstreamB1b_EIB1L_IB1L_NtNtBc_10collection11SubstreamIdEEEEE10read_writes0_00CsiUjFBJteJ7x_17smoldot_full_node |
678 | 0 | }) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes0_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes0_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes0_0CsiUjFBJteJ7x_17smoldot_full_node |
679 | 0 | { |
680 | 0 | // Not enough space to push all the buffers at once, but enough space to push at |
681 | 0 | // least the first one. Push as many buffers as possible. |
682 | 0 | let limit = outer_read_write.write_bytes_queueable.unwrap_or(0); |
683 | 0 | let (num_buffers, buffers_size) = buffers |
684 | 0 | .iter() |
685 | 0 | .scan(0, |count, buf| { |
686 | 0 | *count += buf.len(); |
687 | 0 | Some(*count) |
688 | 0 | }) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes1_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes1_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes1_0CsiUjFBJteJ7x_17smoldot_full_node |
689 | 0 | .enumerate() |
690 | 0 | .take_while(|(_, sz)| *sz <= limit) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes2_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes2_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes2_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes2_0CsiUjFBJteJ7x_17smoldot_full_node |
691 | 0 | .last() |
692 | 0 | .unwrap(); |
693 | 0 |
|
694 | 0 | outer_read_write |
695 | 0 | .write_buffers |
696 | 0 | .extend(buffers.drain(..num_buffers)); |
697 | 0 | outer_read_write.write_bytes_queued += buffers_size; |
698 | 0 | *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_size; |
699 | 0 | } else if outer_read_write.write_buffers.is_empty() { |
700 | | // Not enough space to fully push even the first buffer. |
701 | 0 | if let Some(first) = buffers.first_mut() { |
702 | 0 | outer_read_write.write_from_vec(first); |
703 | 0 | } |
704 | 0 | } |
705 | 66 | } |
706 | | |
707 | | // Consume as much incoming data as possible until either the incoming data buffer is |
708 | | // empty or we reach a non-empty data frame. |
709 | 506 | loop { |
710 | 506 | match self.inner.incoming { |
711 | 0 | Incoming::PendingIncomingSubstream { .. } => break, |
712 | | |
713 | | Incoming::DataFrame { |
714 | | remaining_bytes: 0, .. |
715 | 62 | } => { |
716 | 62 | // Nothing more to do. |
717 | 62 | self.inner.incoming = Incoming::Header; |
718 | 62 | } |
719 | | |
720 | | Incoming::DataFrame { |
721 | 225 | substream_id, |
722 | 225 | ref mut remaining_bytes, |
723 | | .. |
724 | | } => { |
725 | | // It is possible that we are receiving data corresponding to a substream for |
726 | | // which a RST has been sent out by the local node. Since the |
727 | | // local state machine doesn't keep track of RST'ted substreams, any |
728 | | // frame concerning a substream that has been RST or doesn't exist is |
729 | | // discarded and doesn't result in an error, under the presumption that we |
730 | | // are in this situation. |
731 | | let Some(Substream { |
732 | | state: |
733 | | SubstreamState::Healthy { |
734 | 225 | expected_incoming_bytes, |
735 | 225 | read_buffer, |
736 | 225 | substreams_wake_up_key, |
737 | | .. |
738 | | }, |
739 | | .. |
740 | 225 | }) = self.inner.substreams.get_mut(&substream_id) |
741 | | else { |
742 | | // Substream doesn't exist (as it's likely been RST as explained above). |
743 | | // Discard the next `remaining_bytes`. |
744 | | // TODO: don't use `incoming_bytes_take` but instead simply discard as much as possible or something |
745 | 0 | match outer_read_write.incoming_bytes_take( |
746 | 0 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX), |
747 | 0 | ) { |
748 | 0 | Ok(Some(taken)) => { |
749 | 0 | debug_assert_eq!(taken.len() as u32, *remaining_bytes); |
750 | 0 | *remaining_bytes = 0; |
751 | 0 | continue; |
752 | | } |
753 | | // TODO: how to deal with the read closed? |
754 | 0 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, |
755 | | } |
756 | | }; |
757 | | |
758 | | // We copy data from the main incoming buffer to the read buffer of that |
759 | | // substream. |
760 | | // If there isn't enough data in `outer_read_write.incoming_buffer` |
761 | | // compared to the substream has previously requested with |
762 | | // `expected_incoming_bytes`, then we request more data from the outside. |
763 | | // If instead there is enough data, we copy all the data immediately |
764 | | // in order to avoid the cost of splitting the buffer. |
765 | | |
766 | 225 | debug_assert_ne!(*remaining_bytes, 0); |
767 | 225 | let to_copy = if *expected_incoming_bytes == Some(0) { |
768 | | // If the substream refuses to process more incoming data right now, |
769 | | // we copy everything into its read buffer in order to free the outer |
770 | | // incoming buffer. |
771 | | // The window system of Yamux ensures that the read buffer size is capped. |
772 | 8 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX) |
773 | | } else { |
774 | 217 | cmp::min( |
775 | 217 | expected_incoming_bytes |
776 | 217 | .unwrap_or(0) |
777 | 217 | .saturating_sub(read_buffer.len()), |
778 | 217 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX), |
779 | 217 | ) |
780 | | }; |
781 | | |
782 | 225 | match outer_read_write.incoming_bytes_take(to_copy) { |
783 | 225 | Ok(Some(mut data)) => { |
784 | 225 | *remaining_bytes -= u32::try_from(data.len()).unwrap(); |
785 | 225 | if read_buffer.is_empty() { |
786 | 189 | *read_buffer = data; |
787 | 189 | } else { |
788 | 36 | read_buffer.append(&mut data); |
789 | 36 | } |
790 | | } |
791 | | // TODO: how to deal with the read closed? |
792 | 0 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, |
793 | | } |
794 | | |
795 | | // If the substream has enough data to read, or has never been processed |
796 | | // before, make sure that it will wake up as soon as possible. |
797 | 225 | if expected_incoming_bytes.map_or(true, |expected| { |
798 | 198 | expected != 0 && read_buffer.len() >= expected190 |
799 | 225 | }198 ) { _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes3_0Bc_ Line | Count | Source | 797 | 198 | if expected_incoming_bytes.map_or(true, |expected| { | 798 | 198 | expected != 0 && read_buffer.len() >= expected190 | 799 | 198 | }) { |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes3_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes3_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes3_0CsiUjFBJteJ7x_17smoldot_full_node |
800 | 14 | match substreams_wake_up_key { |
801 | 14 | Some(Some(when)) if *when <= outer_read_write.now => {}2 |
802 | 150 | Some(None) => {} |
803 | 12 | Some(Some(when)) => { |
804 | 12 | let _was_removed = self |
805 | 12 | .inner |
806 | 12 | .substreams_wake_up |
807 | 12 | .remove(&(Some(when.clone()), substream_id)); |
808 | 12 | debug_assert!(_was_removed); |
809 | 12 | self.inner.substreams_wake_up.insert((None, substream_id)); |
810 | 12 | *substreams_wake_up_key = Some(None); |
811 | | } |
812 | 53 | _ => { |
813 | 53 | self.inner.substreams_wake_up.insert((None, substream_id)); |
814 | 53 | *substreams_wake_up_key = Some(None); |
815 | 53 | } |
816 | | } |
817 | | |
818 | | // Also stop processing incoming data so that we can process the substream. |
819 | 217 | break; |
820 | 8 | } |
821 | 8 | |
822 | 8 | // Make sure that some progress was made, otherwise we might get into an |
823 | 8 | // infinite loop. |
824 | 8 | debug_assert_ne!(to_copy, 0); |
825 | | } |
826 | | |
827 | | Incoming::Header => { |
828 | | // Try to grab a header from the incoming buffer. |
829 | 219 | let header_bytes67 = match outer_read_write.incoming_bytes_take_array::<12>() { |
830 | 67 | Ok(Some(hdr)) => hdr, |
831 | 152 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, |
832 | | }; |
833 | | |
834 | | // Decode the header in `header_bytes`. |
835 | 67 | let decoded_header = match header::decode_yamux_header(&header_bytes) { |
836 | 67 | Ok(h) => h, |
837 | 0 | Err(err) => return Err(Error::HeaderDecode(err)), |
838 | | }; |
839 | | |
840 | 67 | match decoded_header { |
841 | 0 | header::DecodedYamuxHeader::PingRequest { opaque_value } => { |
842 | 0 | if self.inner.pongs_to_send.len() |
843 | 0 | >= self.inner.max_simultaneous_queued_pongs.get() |
844 | | { |
845 | 0 | return Err(Error::MaxSimultaneousPingsExceeded); |
846 | 0 | } |
847 | 0 |
|
848 | 0 | self.inner.pongs_to_send.push_back(opaque_value); |
849 | 0 | self.inner.incoming = Incoming::Header; |
850 | | } |
851 | | |
852 | 0 | header::DecodedYamuxHeader::PingResponse { opaque_value } => { |
853 | 0 | if self.inner.pings_waiting_reply.pop_front() != Some(opaque_value) { |
854 | 0 | return Err(Error::PingResponseNotMatching); |
855 | 0 | } |
856 | 0 |
|
857 | 0 | self.inner.incoming = Incoming::Header; |
858 | 0 | return Ok(ReadWriteOutcome::PingResponse { yamux: self }); |
859 | | } |
860 | | |
861 | 0 | header::DecodedYamuxHeader::GoAway { error_code } => { |
862 | 0 | if self.inner.received_goaway.is_some() { |
863 | 0 | return Err(Error::MultipleGoAways); |
864 | 0 | } |
865 | 0 |
|
866 | 0 | self.inner.incoming = Incoming::Header; |
867 | 0 | self.inner.received_goaway = Some(error_code); |
868 | 0 |
|
869 | 0 | let mut reset_substreams = |
870 | 0 | Vec::with_capacity(self.inner.substreams.len()); |
871 | 0 | for (substream_id, substream) in self.inner.substreams.iter_mut() { |
872 | | let SubstreamState::Healthy { |
873 | | remote_syn_acked: false, |
874 | 0 | substreams_wake_up_key, |
875 | | .. |
876 | 0 | } = &mut substream.state |
877 | | else { |
878 | 0 | continue; |
879 | | }; |
880 | | |
881 | 0 | reset_substreams.push(SubstreamId(*substream_id)); |
882 | 0 |
|
883 | 0 | let _was_inserted = |
884 | 0 | self.inner.dead_substreams.insert(*substream_id); |
885 | 0 | debug_assert!(_was_inserted); |
886 | | |
887 | 0 | self.inner.substreams_write_ready.remove(substream_id); |
888 | 0 | self.inner.window_frames_to_send.remove(substream_id); |
889 | | |
890 | 0 | if let Some(k) = substreams_wake_up_key.take() { |
891 | 0 | let _was_removed = |
892 | 0 | self.inner.substreams_wake_up.remove(&(k, *substream_id)); |
893 | 0 | debug_assert!(_was_removed); |
894 | 0 | } |
895 | | |
896 | | if let Outgoing::PreparingDataFrame { |
897 | 0 | substream_id: s, |
898 | 0 | write_buffers, |
899 | 0 | } = &mut self.inner.outgoing |
900 | | { |
901 | 0 | if *substream_id == *s { |
902 | 0 | write_buffers.clear(); |
903 | 0 | self.inner.outgoing = Outgoing::WritingOut { |
904 | 0 | buffers: mem::take(write_buffers), |
905 | 0 | } |
906 | 0 | } |
907 | 0 | } |
908 | | |
909 | 0 | substream.state = SubstreamState::Reset; |
910 | | } |
911 | | |
912 | 0 | outer_read_write.wake_up_asap(); |
913 | 0 | return Ok(ReadWriteOutcome::GoAway { |
914 | 0 | yamux: self, |
915 | 0 | code: error_code, |
916 | 0 | reset_substreams, |
917 | 0 | }); |
918 | | } |
919 | | |
920 | | header::DecodedYamuxHeader::Data { |
921 | | rst: true, |
922 | 0 | ack, |
923 | 0 | stream_id, |
924 | 0 | length, |
925 | | .. |
926 | | } |
927 | | | header::DecodedYamuxHeader::Window { |
928 | | rst: true, |
929 | 0 | ack, |
930 | 0 | stream_id, |
931 | 0 | length, |
932 | | .. |
933 | | } => { |
934 | | // Frame with the `RST` flag set. Destroy the substream. |
935 | | |
936 | | // Sending a `RST` flag and data together is a weird corner case and |
937 | | // is difficult to handle. It is unclear whether it is allowed at all. |
938 | | // We thus consider it as invalid. |
939 | 0 | if matches!(decoded_header, header::DecodedYamuxHeader::Data { .. }) |
940 | 0 | && length != 0 |
941 | | { |
942 | 0 | return Err(Error::DataWithRst); |
943 | 0 | } |
944 | 0 |
|
945 | 0 | self.inner.incoming = Incoming::Header; |
946 | | |
947 | | // The remote might have sent a RST frame concerning a substream for |
948 | | // which we have sent a RST frame earlier. Considering that we don't |
949 | | // always keep traces of old substreams, we have no way to know whether |
950 | | // this is the case or not. |
951 | 0 | let Some(substream) = self.inner.substreams.get_mut(&stream_id) else { |
952 | 0 | continue; |
953 | | }; |
954 | | let SubstreamState::Healthy { |
955 | 0 | remote_syn_acked, |
956 | 0 | substreams_wake_up_key, |
957 | | .. |
958 | 0 | } = &mut substream.state |
959 | | else { |
960 | 0 | continue; |
961 | | }; |
962 | | |
963 | 0 | let _was_inserted = self.inner.dead_substreams.insert(stream_id); |
964 | 0 | debug_assert!(_was_inserted); |
965 | | |
966 | | // Check whether the remote has ACKed multiple times. |
967 | 0 | if *remote_syn_acked && ack { |
968 | 0 | return Err(Error::UnexpectedAck); |
969 | 0 | } |
970 | | |
971 | | if let Outgoing::PreparingDataFrame { |
972 | 0 | substream_id, |
973 | 0 | write_buffers, |
974 | 0 | } = &mut self.inner.outgoing |
975 | | { |
976 | 0 | if *substream_id == stream_id { |
977 | 0 | write_buffers.clear(); |
978 | 0 | self.inner.outgoing = Outgoing::WritingOut { |
979 | 0 | buffers: mem::take(write_buffers), |
980 | 0 | } |
981 | 0 | } |
982 | 0 | } |
983 | | |
984 | 0 | self.inner.window_frames_to_send.remove(&stream_id); |
985 | 0 | self.inner.substreams_write_ready.remove(&stream_id); |
986 | | |
987 | 0 | if let Some(k) = substreams_wake_up_key.take() { |
988 | 0 | let _was_removed = |
989 | 0 | self.inner.substreams_wake_up.remove(&(k, stream_id)); |
990 | 0 | debug_assert!(_was_removed); |
991 | 0 | } |
992 | | |
993 | 0 | substream.state = SubstreamState::Reset; |
994 | 0 |
|
995 | 0 | outer_read_write.wake_up_asap(); |
996 | 0 | return Ok(ReadWriteOutcome::StreamReset { |
997 | 0 | yamux: self, |
998 | 0 | substream_id: SubstreamId(stream_id), |
999 | 0 | }); |
1000 | | } |
1001 | | |
1002 | | header::DecodedYamuxHeader::Data { |
1003 | | syn: true, |
1004 | | ack: true, |
1005 | | .. |
1006 | | } |
1007 | | | header::DecodedYamuxHeader::Window { |
1008 | | syn: true, |
1009 | | ack: true, |
1010 | | .. |
1011 | | } => { |
1012 | | // You're never supposed to send a SYN and ACK at the same time. |
1013 | 0 | return Err(Error::UnexpectedAck); |
1014 | | } |
1015 | | |
1016 | | header::DecodedYamuxHeader::Data { |
1017 | | syn: true, |
1018 | 20 | fin, |
1019 | 20 | rst: false, |
1020 | 20 | stream_id, |
1021 | 20 | length, |
1022 | | .. |
1023 | | } |
1024 | | | header::DecodedYamuxHeader::Window { |
1025 | | syn: true, |
1026 | 0 | fin, |
1027 | 0 | rst: false, |
1028 | 0 | stream_id, |
1029 | 0 | length, |
1030 | | .. |
1031 | | } => { |
1032 | | // The initiator should only allocate uneven substream IDs, and the |
1033 | | // other side only even IDs. We don't know anymore whether we're |
1034 | | // initiator at this point, but we can compare with the even-ness of |
1035 | | // the IDs that we allocate locally. |
1036 | 20 | if (self.inner.next_outbound_substream.get() % 2) |
1037 | 20 | == (stream_id.get() % 2) |
1038 | | { |
1039 | 0 | return Err(Error::InvalidInboundStreamId(stream_id)); |
1040 | 20 | } |
1041 | 20 | |
1042 | 20 | // Remote has sent a SYN flag. A new substream is to be opened. |
1043 | 20 | match self.inner.substreams.get(&stream_id) { |
1044 | 20 | None => {} |
1045 | | Some(Substream { |
1046 | | state: |
1047 | | SubstreamState::Healthy { |
1048 | | local_write_close: SubstreamStateLocalWrite::FinQueued, |
1049 | | remote_write_closed: true, |
1050 | | .. |
1051 | | }, |
1052 | | .. |
1053 | | }) |
1054 | | | Some(Substream { |
1055 | | state: SubstreamState::Reset, |
1056 | | .. |
1057 | | }) => { |
1058 | | // Because we don't immediately destroy substreams, the remote |
1059 | | // might decide to re-use a substream ID that is still |
1060 | | // allocated locally. If that happens, we block the reading. |
1061 | | // It will be unblocked when the API user destroys the old |
1062 | | // substream. |
1063 | 0 | break; |
1064 | | } |
1065 | | Some(Substream { |
1066 | | state: SubstreamState::Healthy { .. }, |
1067 | | .. |
1068 | | }) => { |
1069 | 0 | return Err(Error::UnexpectedSyn(stream_id)); |
1070 | | } |
1071 | | } |
1072 | | |
1073 | | // When receiving a new substream, we might have to potentially queue |
1074 | | // a substream rejection message later. |
1075 | | // In order to ensure that there is enough space in `rsts_to_send`, |
1076 | | // we check it against the limit now. |
1077 | 20 | if self.inner.rsts_to_send.len() |
1078 | 20 | >= self.inner.max_simultaneous_rst_substreams.get() |
1079 | | { |
1080 | 0 | return Err(Error::MaxSimultaneousRstSubstreamsExceeded); |
1081 | 20 | } |
1082 | | |
1083 | 20 | let is_data = |
1084 | 20 | matches!0 (decoded_header, header::DecodedYamuxHeader::Data { .. }); |
1085 | | |
1086 | | // If we have queued or sent a GoAway frame, then the substream is |
1087 | | // ignored. The remote understands when receiving the GoAway that the |
1088 | | // substream has been rejected. |
1089 | 20 | if !matches!0 (self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) { |
1090 | 0 | self.inner.incoming = if !is_data { |
1091 | 0 | Incoming::Header |
1092 | | } else { |
1093 | 0 | Incoming::DataFrame { |
1094 | 0 | substream_id: stream_id, |
1095 | 0 | remaining_bytes: length, |
1096 | 0 | } |
1097 | | }; |
1098 | | |
1099 | 0 | continue; |
1100 | 20 | } |
1101 | 20 | |
1102 | 20 | if is_data && u64::from(length) > NEW_SUBSTREAMS_FRAME_SIZE { |
1103 | 0 | return Err(Error::CreditsExceeded); |
1104 | 20 | } |
1105 | 20 | |
1106 | 20 | self.inner.incoming = Incoming::PendingIncomingSubstream { |
1107 | 20 | substream_id: stream_id, |
1108 | 20 | extra_window: if !is_data { length0 } else { 0 }, |
1109 | 20 | data_frame_size: if is_data { length } else { 00 }, |
1110 | 20 | fin, |
1111 | 20 | }; |
1112 | 20 | |
1113 | 20 | return Ok(ReadWriteOutcome::IncomingSubstream { yamux: self }); |
1114 | | } |
1115 | | |
1116 | | header::DecodedYamuxHeader::Data { |
1117 | | syn: false, |
1118 | | rst: false, |
1119 | 47 | stream_id, |
1120 | 47 | length, |
1121 | 47 | ack, |
1122 | 47 | fin, |
1123 | | .. |
1124 | | } => { |
1125 | | // Note that it is possible that the remote is referring to a substream |
1126 | | // for which a RST has been sent out by the local node. Since the |
1127 | | // local state machine doesn't keep track of RST'ted substreams, any |
1128 | | // frame concerning a substream that has been RST or with an unknown |
1129 | | // id is discarded and doesn't result in an error, under the |
1130 | | // presumption that we are in this situation. |
1131 | | if let Some(Substream { |
1132 | | state: |
1133 | | SubstreamState::Healthy { |
1134 | 47 | remote_write_closed, |
1135 | 47 | remote_allowed_window, |
1136 | 47 | remote_syn_acked, |
1137 | 47 | substreams_wake_up_key, |
1138 | | .. |
1139 | | }, |
1140 | | .. |
1141 | 47 | }) = self.inner.substreams.get_mut(&stream_id) |
1142 | | { |
1143 | 47 | match (ack, remote_syn_acked) { |
1144 | 28 | (false, true) => {} |
1145 | 19 | (true, acked @ false) => *acked = true, |
1146 | 0 | (true, true) => return Err(Error::UnexpectedAck), |
1147 | 0 | (false, false) => return Err(Error::ExpectedAck), |
1148 | | } |
1149 | | |
1150 | 47 | if *remote_write_closed { |
1151 | 0 | return Err(Error::WriteAfterFin); |
1152 | 47 | } |
1153 | 47 | |
1154 | 47 | if fin { |
1155 | 5 | *remote_write_closed = true; |
1156 | 5 | |
1157 | 5 | // No need to send window frames anymore if the remote has |
1158 | 5 | // sent a FIN. |
1159 | 5 | self.inner.window_frames_to_send.remove(&stream_id); |
1160 | | |
1161 | | // Wake up the substream. |
1162 | 0 | match substreams_wake_up_key { |
1163 | 0 | Some(Some(when)) if *when <= outer_read_write.now => {} |
1164 | 2 | Some(None) => {} |
1165 | 0 | Some(Some(when)) => { |
1166 | 0 | let _was_removed = self |
1167 | 0 | .inner |
1168 | 0 | .substreams_wake_up |
1169 | 0 | .remove(&(Some(when.clone()), stream_id)); |
1170 | 0 | debug_assert!(_was_removed); |
1171 | 0 | self.inner.substreams_wake_up.insert((None, stream_id)); |
1172 | 0 | *substreams_wake_up_key = Some(None); |
1173 | | } |
1174 | 3 | _ => { |
1175 | 3 | self.inner.substreams_wake_up.insert((None, stream_id)); |
1176 | 3 | *substreams_wake_up_key = Some(None); |
1177 | 3 | } |
1178 | | } |
1179 | 42 | } |
1180 | | |
1181 | | // Check whether the remote has the right to send that much data. |
1182 | | // Note that the credits aren't checked in the case of an unknown |
1183 | | // substream. |
1184 | 47 | *remote_allowed_window = remote_allowed_window |
1185 | 47 | .checked_sub(u64::from(length)) |
1186 | 47 | .ok_or(Error::CreditsExceeded)?0 ; |
1187 | 0 | } |
1188 | | |
1189 | | // Switch to the `DataFrame` state in order to process the frame, even |
1190 | | // if the substream no longer exists, in order to not duplicate code. |
1191 | 47 | self.inner.incoming = Incoming::DataFrame { |
1192 | 47 | substream_id: stream_id, |
1193 | 47 | remaining_bytes: length, |
1194 | 47 | }; |
1195 | | } |
1196 | | |
1197 | | header::DecodedYamuxHeader::Window { |
1198 | | syn: false, |
1199 | | rst: false, |
1200 | 0 | stream_id, |
1201 | 0 | length, |
1202 | 0 | ack, |
1203 | 0 | fin, |
1204 | | .. |
1205 | | } => { |
1206 | | // Note that it is possible that the remote is referring to a substream |
1207 | | // for which a RST has been sent out by the local node. Since the |
1208 | | // local state machine doesn't keep track of RST'ted substreams, any |
1209 | | // frame concerning a substream that has been RST or with an unknown |
1210 | | // id is discarded and doesn't result in an error, under the |
1211 | | // presumption that we are in this situation. |
1212 | | let Some(Substream { |
1213 | | state: |
1214 | | SubstreamState::Healthy { |
1215 | 0 | remote_syn_acked, |
1216 | 0 | allowed_window, |
1217 | 0 | remote_write_closed, |
1218 | | .. |
1219 | | }, |
1220 | | .. |
1221 | 0 | }) = self.inner.substreams.get_mut(&stream_id) |
1222 | | else { |
1223 | 0 | self.inner.incoming = Incoming::Header; |
1224 | 0 | continue; |
1225 | | }; |
1226 | | |
1227 | 0 | match (ack, remote_syn_acked) { |
1228 | 0 | (false, true) => {} |
1229 | 0 | (true, acked @ false) => *acked = true, |
1230 | 0 | (true, true) => return Err(Error::UnexpectedAck), |
1231 | 0 | (false, false) => return Err(Error::ExpectedAck), |
1232 | | } |
1233 | | |
1234 | | // Note that the spec is unclear about whether the remote can or |
1235 | | // should continue sending FIN flags on window size frames after |
1236 | | // their side of the substream has already been closed before. |
1237 | 0 | if fin { |
1238 | 0 | *remote_write_closed = true; |
1239 | 0 | } |
1240 | | |
1241 | | // If a substream was processed with a non-zero queuable bytes |
1242 | | // value (i.e. `allowed_window` non-zero) but yields empty write |
1243 | | // buffers, then we can assume that the substream has nothing to write, |
1244 | | // and thus that simply increasing the queueable bytes will not change |
1245 | | // the situation. After all, we have no idea whether the remote will |
1246 | | // increase the window size of this substream any further in the |
1247 | | // future, and thus we must write data out no matter what the window |
1248 | | // size is as long as it is non-zero. |
1249 | | // On the other hand, if the queueable bytes were 0, we take the guess |
1250 | | // that the substream might have more to write. |
1251 | 0 | if length != 0 && *allowed_window == 0 { |
1252 | 0 | self.inner.substreams_write_ready.insert(stream_id); |
1253 | 0 | } |
1254 | | |
1255 | 0 | *allowed_window = allowed_window |
1256 | 0 | .checked_add(u64::from(length)) |
1257 | 0 | .ok_or(Error::LocalCreditsOverflow)?; |
1258 | | |
1259 | 0 | self.inner.incoming = Incoming::Header; |
1260 | | } |
1261 | | } |
1262 | | } |
1263 | | } |
1264 | | } |
1265 | | |
1266 | | // Choose which substream to read/write (if any). |
1267 | 329 | let substream_id = match ( |
1268 | 369 | &self.inner.outgoing, |
1269 | 369 | self.inner.substreams_write_ready.iter().next().copied(), |
1270 | 369 | self.inner.substreams_wake_up.first(), |
1271 | | ) { |
1272 | | ( |
1273 | | Outgoing::PreparingDataFrame { |
1274 | 66 | substream_id, |
1275 | 66 | write_buffers, |
1276 | 66 | }, |
1277 | 66 | _, |
1278 | 66 | _, |
1279 | 66 | ) => { |
1280 | 66 | // Continue writing to the substream whose frame we're already preparing. |
1281 | 66 | debug_assert!(!write_buffers.is_empty()); |
1282 | 66 | self.inner.substreams_write_ready.remove(substream_id); |
1283 | 66 | *substream_id |
1284 | | } |
1285 | 92 | (Outgoing::WritingOut { buffers }, Some(substream_id), _) if buffers.is_empty() => { |
1286 | 92 | // Pull a substream from `substreams_write_ready`. |
1287 | 92 | self.inner.substreams_write_ready.remove(&substream_id); |
1288 | 92 | substream_id |
1289 | | } |
1290 | 195 | (_, _, Some((when, substream_id))) |
1291 | 195 | if when |
1292 | 195 | .as_ref() |
1293 | 195 | .map_or(true, |when| *when <= outer_read_write.now26 ) => _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes4_0Bc_ Line | Count | Source | 1293 | 26 | .map_or(true, |when| *when <= outer_read_write.now) => |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes4_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes4_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes4_0CsiUjFBJteJ7x_17smoldot_full_node |
1294 | 171 | { |
1295 | 171 | *substream_id |
1296 | | } |
1297 | | _ => { |
1298 | | // No substream to read/write. |
1299 | 40 | return Ok(ReadWriteOutcome::Idle { yamux: self }); |
1300 | | } |
1301 | | }; |
1302 | | |
1303 | | // Extract some fields from the substream state. |
1304 | | let SubstreamState::Healthy { |
1305 | 329 | allowed_window, |
1306 | 329 | local_write_close, |
1307 | 329 | remote_write_closed, |
1308 | 329 | read_buffer, |
1309 | 329 | substreams_wake_up_key, |
1310 | | .. |
1311 | 329 | } = &mut self.inner.substreams.get_mut(&substream_id).unwrap().state |
1312 | | else { |
1313 | 0 | unreachable!() |
1314 | | }; |
1315 | | |
1316 | | // Remove the substream from `substreams_wake_up`, since we're processing it now. |
1317 | | // If the processing produces a `wake_up_after` value when being processed, it will be |
1318 | | // re-inserted. |
1319 | 329 | if let Some(substreams_wake_up_key273 ) = substreams_wake_up_key.take() { |
1320 | 273 | let _was_removed = self |
1321 | 273 | .inner |
1322 | 273 | .substreams_wake_up |
1323 | 273 | .remove(&(substreams_wake_up_key, substream_id)); |
1324 | 273 | debug_assert!(_was_removed); |
1325 | 56 | } |
1326 | | |
1327 | 329 | let (write_buffers, can_queue_data) = match &mut self.inner.outgoing66 { |
1328 | 263 | Outgoing::WritingOut { buffers } if buffers.is_empty() => { |
1329 | 263 | let mut buffers = mem::take(buffers); |
1330 | 263 | // As a small optimization, we push an empty buffer at the front where the header |
1331 | 263 | // might later get written. |
1332 | 263 | buffers.push(Vec::with_capacity(12)); |
1333 | 263 | (buffers, true) |
1334 | | } |
1335 | | Outgoing::PreparingDataFrame { |
1336 | 66 | substream_id: s, |
1337 | 66 | write_buffers, |
1338 | 66 | } if *s == substream_id => { |
1339 | 66 | let buffers = mem::take(write_buffers); |
1340 | 66 | self.inner.outgoing = Outgoing::WritingOut { |
1341 | 66 | buffers: Vec::new(), |
1342 | 66 | }; |
1343 | 66 | (buffers, true) |
1344 | | } |
1345 | 0 | _ => (Vec::new(), false), |
1346 | | }; |
1347 | 461 | let write_buffers_len_before = write_buffers.iter().fold(0, 329 |count, buf| count + buf.len()); _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE10read_writes5_0Bc_ Line | Count | Source | 1347 | 461 | let write_buffers_len_before = write_buffers.iter().fold(0, |count, buf| count + buf.len()); |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE10read_writes5_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE10read_writes5_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE10read_writes5_0CsiUjFBJteJ7x_17smoldot_full_node |
1348 | 329 | |
1349 | 329 | Ok(ReadWriteOutcome::ProcessSubstream { |
1350 | 329 | substream_read_write: SubstreamReadWrite { |
1351 | 329 | substream_id, |
1352 | 329 | write_buffers_len_before, |
1353 | 329 | inner_read_write: ReadWrite { |
1354 | 329 | now: outer_read_write.now.clone(), |
1355 | 329 | incoming_buffer: mem::take(read_buffer), |
1356 | 329 | expected_incoming_bytes: if !*remote_write_closed { Some(0)320 } else { None9 }, |
1357 | | read_bytes: 0, |
1358 | 329 | write_buffers, |
1359 | 329 | write_bytes_queued: write_buffers_len_before, |
1360 | 10 | write_bytes_queueable: if matches!( |
1361 | 329 | local_write_close, |
1362 | | SubstreamStateLocalWrite::Open |
1363 | | ) { |
1364 | 319 | if can_queue_data { |
1365 | 319 | Some( |
1366 | 319 | usize::try_from(cmp::min( |
1367 | 319 | u64::from(self.inner.max_out_data_frame_size.get()), |
1368 | 319 | *allowed_window, |
1369 | 319 | )) |
1370 | 319 | .unwrap_or(usize::MAX) |
1371 | 319 | .saturating_sub(write_buffers_len_before), |
1372 | 319 | ) |
1373 | | } else { |
1374 | 0 | Some(0) |
1375 | | } |
1376 | | } else { |
1377 | 10 | None |
1378 | | }, |
1379 | 329 | wake_up_after: None, |
1380 | 329 | }, |
1381 | 329 | outer_read_write, |
1382 | 329 | yamux: self, |
1383 | | }, |
1384 | | }) |
1385 | 389 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE10read_writeBa_ Line | Count | Source | 535 | 389 | pub fn read_write( | 536 | 389 | mut self, | 537 | 389 | outer_read_write: &mut ReadWrite<TNow>, | 538 | 389 | ) -> Result<ReadWriteOutcome<'_, TNow, TSub>, Error> { | 539 | | // Queue something for writing if necessary. | 540 | 389 | if let Outgoing::WritingOut { buffers323 } = &mut self.inner.outgoing { | 541 | | // Make sure that it's not just a list of empty buffers. | 542 | 323 | debug_assert_eq!( | 543 | 323 | buffers.is_empty(), | 544 | 323 | buffers.iter().fold(0, |sz, b| sz + b.len()) == 0 | 545 | | ); | 546 | | | 547 | 323 | if buffers.is_empty() { | 548 | 254 | if let OutgoingGoAway::Queued = self.inner.outgoing_goaway { | 549 | 0 | self.inner.outgoing_goaway = OutgoingGoAway::Sent; | 550 | 254 | } | 551 | | | 552 | 254 | if let OutgoingGoAway::Required(error_code0 ) = self.inner.outgoing_goaway { | 553 | 0 | // Send a `GoAway` frame if demanded. | 554 | 0 | buffers.push( | 555 | 0 | header::encode(&header::DecodedYamuxHeader::GoAway { error_code }).to_vec(), | 556 | 0 | ); | 557 | 0 | self.inner.outgoing_goaway = OutgoingGoAway::Queued; | 558 | 254 | } else if let Some(substream_id0 ) = self.inner.rsts_to_send.pop_front() { | 559 | 0 | // Send RST frame. | 560 | 0 | buffers.push( | 561 | 0 | header::encode(&header::DecodedYamuxHeader::Window { | 562 | 0 | syn: false, | 563 | 0 | ack: false, | 564 | 0 | fin: false, | 565 | 0 | rst: true, | 566 | 0 | stream_id: substream_id, | 567 | 0 | length: 0, | 568 | 0 | }) | 569 | 0 | .to_vec(), | 570 | 0 | ); | 571 | 254 | } else if self.inner.pings_to_send > 0 { | 572 | | // Send outgoing pings. | 573 | 0 | self.inner.pings_to_send -= 1; | 574 | 0 | let opaque_value: u32 = self.inner.randomness.next_u32(); | 575 | 0 | self.inner.pings_waiting_reply.push_back(opaque_value); | 576 | 0 | buffers.push( | 577 | 0 | header::encode(&header::DecodedYamuxHeader::PingRequest { opaque_value }) | 578 | 0 | .to_vec(), | 579 | 0 | ); | 580 | 0 | debug_assert!(self.inner.pings_waiting_reply.len() <= MAX_PINGS); | 581 | 254 | } else if let Some(opaque_value0 ) = self.inner.pongs_to_send.pop_front() { | 582 | 0 | // Send outgoing pongs. | 583 | 0 | buffers.push( | 584 | 0 | header::encode(&header::DecodedYamuxHeader::PingResponse { opaque_value }) | 585 | 0 | .to_vec(), | 586 | 0 | ); | 587 | 254 | } else if let Some(substream_id0 ) = self | 588 | 254 | .inner | 589 | 254 | .window_frames_to_send | 590 | 254 | .keys() | 591 | 254 | .choose(&mut self.inner.randomness) | 592 | 254 | .copied() | 593 | | { | 594 | | // Send window frame. | 595 | | let Some(Substream { | 596 | 0 | inbound, | 597 | 0 | state: | 598 | 0 | SubstreamState::Healthy { | 599 | 0 | first_message_queued, | 600 | 0 | remote_allowed_window, | 601 | 0 | local_write_close, | 602 | | .. | 603 | | }, | 604 | | .. | 605 | 0 | }) = &mut self.inner.substreams.get_mut(&substream_id) | 606 | | else { | 607 | 0 | unreachable!() | 608 | | }; | 609 | | | 610 | 0 | let mut pending_window_increase = self | 611 | 0 | .inner | 612 | 0 | .window_frames_to_send | 613 | 0 | .remove(&substream_id) | 614 | 0 | .unwrap() | 615 | 0 | .get(); | 616 | 0 |
| 617 | 0 | let actual_window_update = | 618 | 0 | u32::try_from(pending_window_increase).unwrap_or(u32::MAX); | 619 | 0 | buffers.push( | 620 | 0 | header::encode(&header::DecodedYamuxHeader::Window { | 621 | 0 | syn: !*first_message_queued && !*inbound, | 622 | 0 | ack: !*first_message_queued && *inbound, | 623 | | // Note that it is unclear whether `fin` should be set if the local | 624 | | // writing state has been written before. | 625 | 0 | fin: matches!(local_write_close, SubstreamStateLocalWrite::FinDesired), | 626 | | rst: false, | 627 | 0 | stream_id: substream_id, | 628 | 0 | length: actual_window_update, | 629 | 0 | }) | 630 | 0 | .to_vec(), | 631 | 0 | ); | 632 | 0 |
| 633 | 0 | *remote_allowed_window = | 634 | 0 | remote_allowed_window.saturating_add(u64::from(actual_window_update)); | 635 | 0 | pending_window_increase -= u64::from(actual_window_update); | 636 | 0 | *first_message_queued = true; | 637 | 0 | if matches!(local_write_close, SubstreamStateLocalWrite::FinDesired) { | 638 | 0 | *local_write_close = SubstreamStateLocalWrite::FinQueued; | 639 | 0 | } | 640 | | | 641 | | // In the rare situation where the window update doesn't fit in a `u32`, we | 642 | | // have to send another window frame again later. | 643 | 0 | if let Some(pending_window_increase) = NonZeroU64::new(pending_window_increase) | 644 | 0 | { | 645 | 0 | self.inner | 646 | 0 | .window_frames_to_send | 647 | 0 | .insert(substream_id, pending_window_increase); | 648 | 0 | } | 649 | 254 | } | 650 | 69 | } | 651 | 66 | } | 652 | | | 653 | | // Try finish writing the data currently being written. | 654 | 389 | if let Outgoing::WritingOut { buffers323 } = &mut self.inner.outgoing { | 655 | 323 | let buffers_total_size = buffers.iter().fold(0, |count, buf| count + buf.len()); | 656 | 323 | | 657 | 323 | if buffers_total_size == 0 { | 658 | 254 | // Do nothing. | 659 | 254 | } else if outer_read_write | 660 | 69 | .write_bytes_queueable | 661 | 69 | .map_or(false, |queuable| buffers_total_size <= queuable) | 662 | | { | 663 | | // We can directly push all the write buffers to the `ReadWrite`. | 664 | 69 | if outer_read_write.write_buffers.is_empty() { | 665 | 69 | debug_assert_eq!(outer_read_write.write_bytes_queued, 0); | 666 | 69 | outer_read_write.write_buffers = mem::take(buffers); | 667 | 0 | } else { | 668 | 0 | outer_read_write.write_buffers.append(buffers); | 669 | 0 | } | 670 | | | 671 | 69 | outer_read_write.write_bytes_queued += buffers_total_size; | 672 | 69 | *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_total_size; | 673 | 0 | } else if outer_read_write.write_buffers.is_empty() | 674 | 0 | && outer_read_write | 675 | 0 | .write_bytes_queueable | 676 | 0 | .map_or(false, |queueable| { | 677 | | buffers.first().map_or(0, |b| b.len()) <= queueable | 678 | 0 | }) | 679 | 0 | { | 680 | 0 | // Not enough space to push all the buffers at once, but enough space to push at | 681 | 0 | // least the first one. Push as many buffers as possible. | 682 | 0 | let limit = outer_read_write.write_bytes_queueable.unwrap_or(0); | 683 | 0 | let (num_buffers, buffers_size) = buffers | 684 | 0 | .iter() | 685 | 0 | .scan(0, |count, buf| { | 686 | | *count += buf.len(); | 687 | | Some(*count) | 688 | 0 | }) | 689 | 0 | .enumerate() | 690 | 0 | .take_while(|(_, sz)| *sz <= limit) | 691 | 0 | .last() | 692 | 0 | .unwrap(); | 693 | 0 |
| 694 | 0 | outer_read_write | 695 | 0 | .write_buffers | 696 | 0 | .extend(buffers.drain(..num_buffers)); | 697 | 0 | outer_read_write.write_bytes_queued += buffers_size; | 698 | 0 | *outer_read_write.write_bytes_queueable.as_mut().unwrap() -= buffers_size; | 699 | 0 | } else if outer_read_write.write_buffers.is_empty() { | 700 | | // Not enough space to fully push even the first buffer. | 701 | 0 | if let Some(first) = buffers.first_mut() { | 702 | 0 | outer_read_write.write_from_vec(first); | 703 | 0 | } | 704 | 0 | } | 705 | 66 | } | 706 | | | 707 | | // Consume as much incoming data as possible until either the incoming data buffer is | 708 | | // empty or we reach a non-empty data frame. | 709 | 506 | loop { | 710 | 506 | match self.inner.incoming { | 711 | 0 | Incoming::PendingIncomingSubstream { .. } => break, | 712 | | | 713 | | Incoming::DataFrame { | 714 | | remaining_bytes: 0, .. | 715 | 62 | } => { | 716 | 62 | // Nothing more to do. | 717 | 62 | self.inner.incoming = Incoming::Header; | 718 | 62 | } | 719 | | | 720 | | Incoming::DataFrame { | 721 | 225 | substream_id, | 722 | 225 | ref mut remaining_bytes, | 723 | | .. | 724 | | } => { | 725 | | // It is possible that we are receiving data corresponding to a substream for | 726 | | // which a RST has been sent out by the local node. Since the | 727 | | // local state machine doesn't keep track of RST'ted substreams, any | 728 | | // frame concerning a substream that has been RST or doesn't exist is | 729 | | // discarded and doesn't result in an error, under the presumption that we | 730 | | // are in this situation. | 731 | | let Some(Substream { | 732 | | state: | 733 | | SubstreamState::Healthy { | 734 | 225 | expected_incoming_bytes, | 735 | 225 | read_buffer, | 736 | 225 | substreams_wake_up_key, | 737 | | .. | 738 | | }, | 739 | | .. | 740 | 225 | }) = self.inner.substreams.get_mut(&substream_id) | 741 | | else { | 742 | | // Substream doesn't exist (as it's likely been RST as explained above). | 743 | | // Discard the next `remaining_bytes`. | 744 | | // TODO: don't use `incoming_bytes_take` but instead simply discard as much as possible or something | 745 | 0 | match outer_read_write.incoming_bytes_take( | 746 | 0 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX), | 747 | 0 | ) { | 748 | 0 | Ok(Some(taken)) => { | 749 | 0 | debug_assert_eq!(taken.len() as u32, *remaining_bytes); | 750 | 0 | *remaining_bytes = 0; | 751 | 0 | continue; | 752 | | } | 753 | | // TODO: how to deal with the read closed? | 754 | 0 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, | 755 | | } | 756 | | }; | 757 | | | 758 | | // We copy data from the main incoming buffer to the read buffer of that | 759 | | // substream. | 760 | | // If there isn't enough data in `outer_read_write.incoming_buffer` | 761 | | // compared to the substream has previously requested with | 762 | | // `expected_incoming_bytes`, then we request more data from the outside. | 763 | | // If instead there is enough data, we copy all the data immediately | 764 | | // in order to avoid the cost of splitting the buffer. | 765 | | | 766 | 225 | debug_assert_ne!(*remaining_bytes, 0); | 767 | 225 | let to_copy = if *expected_incoming_bytes == Some(0) { | 768 | | // If the substream refuses to process more incoming data right now, | 769 | | // we copy everything into its read buffer in order to free the outer | 770 | | // incoming buffer. | 771 | | // The window system of Yamux ensures that the read buffer size is capped. | 772 | 8 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX) | 773 | | } else { | 774 | 217 | cmp::min( | 775 | 217 | expected_incoming_bytes | 776 | 217 | .unwrap_or(0) | 777 | 217 | .saturating_sub(read_buffer.len()), | 778 | 217 | usize::try_from(*remaining_bytes).unwrap_or(usize::MAX), | 779 | 217 | ) | 780 | | }; | 781 | | | 782 | 225 | match outer_read_write.incoming_bytes_take(to_copy) { | 783 | 225 | Ok(Some(mut data)) => { | 784 | 225 | *remaining_bytes -= u32::try_from(data.len()).unwrap(); | 785 | 225 | if read_buffer.is_empty() { | 786 | 189 | *read_buffer = data; | 787 | 189 | } else { | 788 | 36 | read_buffer.append(&mut data); | 789 | 36 | } | 790 | | } | 791 | | // TODO: how to deal with the read closed? | 792 | 0 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, | 793 | | } | 794 | | | 795 | | // If the substream has enough data to read, or has never been processed | 796 | | // before, make sure that it will wake up as soon as possible. | 797 | 225 | if expected_incoming_bytes.map_or(true, |expected| { | 798 | | expected != 0 && read_buffer.len() >= expected | 799 | 225 | }) { | 800 | 14 | match substreams_wake_up_key { | 801 | 14 | Some(Some(when)) if *when <= outer_read_write.now => {}2 | 802 | 150 | Some(None) => {} | 803 | 12 | Some(Some(when)) => { | 804 | 12 | let _was_removed = self | 805 | 12 | .inner | 806 | 12 | .substreams_wake_up | 807 | 12 | .remove(&(Some(when.clone()), substream_id)); | 808 | 12 | debug_assert!(_was_removed); | 809 | 12 | self.inner.substreams_wake_up.insert((None, substream_id)); | 810 | 12 | *substreams_wake_up_key = Some(None); | 811 | | } | 812 | 53 | _ => { | 813 | 53 | self.inner.substreams_wake_up.insert((None, substream_id)); | 814 | 53 | *substreams_wake_up_key = Some(None); | 815 | 53 | } | 816 | | } | 817 | | | 818 | | // Also stop processing incoming data so that we can process the substream. | 819 | 217 | break; | 820 | 8 | } | 821 | 8 | | 822 | 8 | // Make sure that some progress was made, otherwise we might get into an | 823 | 8 | // infinite loop. | 824 | 8 | debug_assert_ne!(to_copy, 0); | 825 | | } | 826 | | | 827 | | Incoming::Header => { | 828 | | // Try to grab a header from the incoming buffer. | 829 | 219 | let header_bytes67 = match outer_read_write.incoming_bytes_take_array::<12>() { | 830 | 67 | Ok(Some(hdr)) => hdr, | 831 | 152 | Ok(None) | Err(read_write::IncomingBytesTakeError::ReadClosed) => break, | 832 | | }; | 833 | | | 834 | | // Decode the header in `header_bytes`. | 835 | 67 | let decoded_header = match header::decode_yamux_header(&header_bytes) { | 836 | 67 | Ok(h) => h, | 837 | 0 | Err(err) => return Err(Error::HeaderDecode(err)), | 838 | | }; | 839 | | | 840 | 67 | match decoded_header { | 841 | 0 | header::DecodedYamuxHeader::PingRequest { opaque_value } => { | 842 | 0 | if self.inner.pongs_to_send.len() | 843 | 0 | >= self.inner.max_simultaneous_queued_pongs.get() | 844 | | { | 845 | 0 | return Err(Error::MaxSimultaneousPingsExceeded); | 846 | 0 | } | 847 | 0 |
| 848 | 0 | self.inner.pongs_to_send.push_back(opaque_value); | 849 | 0 | self.inner.incoming = Incoming::Header; | 850 | | } | 851 | | | 852 | 0 | header::DecodedYamuxHeader::PingResponse { opaque_value } => { | 853 | 0 | if self.inner.pings_waiting_reply.pop_front() != Some(opaque_value) { | 854 | 0 | return Err(Error::PingResponseNotMatching); | 855 | 0 | } | 856 | 0 |
| 857 | 0 | self.inner.incoming = Incoming::Header; | 858 | 0 | return Ok(ReadWriteOutcome::PingResponse { yamux: self }); | 859 | | } | 860 | | | 861 | 0 | header::DecodedYamuxHeader::GoAway { error_code } => { | 862 | 0 | if self.inner.received_goaway.is_some() { | 863 | 0 | return Err(Error::MultipleGoAways); | 864 | 0 | } | 865 | 0 |
| 866 | 0 | self.inner.incoming = Incoming::Header; | 867 | 0 | self.inner.received_goaway = Some(error_code); | 868 | 0 |
| 869 | 0 | let mut reset_substreams = | 870 | 0 | Vec::with_capacity(self.inner.substreams.len()); | 871 | 0 | for (substream_id, substream) in self.inner.substreams.iter_mut() { | 872 | | let SubstreamState::Healthy { | 873 | | remote_syn_acked: false, | 874 | 0 | substreams_wake_up_key, | 875 | | .. | 876 | 0 | } = &mut substream.state | 877 | | else { | 878 | 0 | continue; | 879 | | }; | 880 | | | 881 | 0 | reset_substreams.push(SubstreamId(*substream_id)); | 882 | 0 |
| 883 | 0 | let _was_inserted = | 884 | 0 | self.inner.dead_substreams.insert(*substream_id); | 885 | 0 | debug_assert!(_was_inserted); | 886 | | | 887 | 0 | self.inner.substreams_write_ready.remove(substream_id); | 888 | 0 | self.inner.window_frames_to_send.remove(substream_id); | 889 | | | 890 | 0 | if let Some(k) = substreams_wake_up_key.take() { | 891 | 0 | let _was_removed = | 892 | 0 | self.inner.substreams_wake_up.remove(&(k, *substream_id)); | 893 | 0 | debug_assert!(_was_removed); | 894 | 0 | } | 895 | | | 896 | | if let Outgoing::PreparingDataFrame { | 897 | 0 | substream_id: s, | 898 | 0 | write_buffers, | 899 | 0 | } = &mut self.inner.outgoing | 900 | | { | 901 | 0 | if *substream_id == *s { | 902 | 0 | write_buffers.clear(); | 903 | 0 | self.inner.outgoing = Outgoing::WritingOut { | 904 | 0 | buffers: mem::take(write_buffers), | 905 | 0 | } | 906 | 0 | } | 907 | 0 | } | 908 | | | 909 | 0 | substream.state = SubstreamState::Reset; | 910 | | } | 911 | | | 912 | 0 | outer_read_write.wake_up_asap(); | 913 | 0 | return Ok(ReadWriteOutcome::GoAway { | 914 | 0 | yamux: self, | 915 | 0 | code: error_code, | 916 | 0 | reset_substreams, | 917 | 0 | }); | 918 | | } | 919 | | | 920 | | header::DecodedYamuxHeader::Data { | 921 | | rst: true, | 922 | 0 | ack, | 923 | 0 | stream_id, | 924 | 0 | length, | 925 | | .. | 926 | | } | 927 | | | header::DecodedYamuxHeader::Window { | 928 | | rst: true, | 929 | 0 | ack, | 930 | 0 | stream_id, | 931 | 0 | length, | 932 | | .. | 933 | | } => { | 934 | | // Frame with the `RST` flag set. Destroy the substream. | 935 | | | 936 | | // Sending a `RST` flag and data together is a weird corner case and | 937 | | // is difficult to handle. It is unclear whether it is allowed at all. | 938 | | // We thus consider it as invalid. | 939 | 0 | if matches!(decoded_header, header::DecodedYamuxHeader::Data { .. }) | 940 | 0 | && length != 0 | 941 | | { | 942 | 0 | return Err(Error::DataWithRst); | 943 | 0 | } | 944 | 0 |
| 945 | 0 | self.inner.incoming = Incoming::Header; | 946 | | | 947 | | // The remote might have sent a RST frame concerning a substream for | 948 | | // which we have sent a RST frame earlier. Considering that we don't | 949 | | // always keep traces of old substreams, we have no way to know whether | 950 | | // this is the case or not. | 951 | 0 | let Some(substream) = self.inner.substreams.get_mut(&stream_id) else { | 952 | 0 | continue; | 953 | | }; | 954 | | let SubstreamState::Healthy { | 955 | 0 | remote_syn_acked, | 956 | 0 | substreams_wake_up_key, | 957 | | .. | 958 | 0 | } = &mut substream.state | 959 | | else { | 960 | 0 | continue; | 961 | | }; | 962 | | | 963 | 0 | let _was_inserted = self.inner.dead_substreams.insert(stream_id); | 964 | 0 | debug_assert!(_was_inserted); | 965 | | | 966 | | // Check whether the remote has ACKed multiple times. | 967 | 0 | if *remote_syn_acked && ack { | 968 | 0 | return Err(Error::UnexpectedAck); | 969 | 0 | } | 970 | | | 971 | | if let Outgoing::PreparingDataFrame { | 972 | 0 | substream_id, | 973 | 0 | write_buffers, | 974 | 0 | } = &mut self.inner.outgoing | 975 | | { | 976 | 0 | if *substream_id == stream_id { | 977 | 0 | write_buffers.clear(); | 978 | 0 | self.inner.outgoing = Outgoing::WritingOut { | 979 | 0 | buffers: mem::take(write_buffers), | 980 | 0 | } | 981 | 0 | } | 982 | 0 | } | 983 | | | 984 | 0 | self.inner.window_frames_to_send.remove(&stream_id); | 985 | 0 | self.inner.substreams_write_ready.remove(&stream_id); | 986 | | | 987 | 0 | if let Some(k) = substreams_wake_up_key.take() { | 988 | 0 | let _was_removed = | 989 | 0 | self.inner.substreams_wake_up.remove(&(k, stream_id)); | 990 | 0 | debug_assert!(_was_removed); | 991 | 0 | } | 992 | | | 993 | 0 | substream.state = SubstreamState::Reset; | 994 | 0 |
| 995 | 0 | outer_read_write.wake_up_asap(); | 996 | 0 | return Ok(ReadWriteOutcome::StreamReset { | 997 | 0 | yamux: self, | 998 | 0 | substream_id: SubstreamId(stream_id), | 999 | 0 | }); | 1000 | | } | 1001 | | | 1002 | | header::DecodedYamuxHeader::Data { | 1003 | | syn: true, | 1004 | | ack: true, | 1005 | | .. | 1006 | | } | 1007 | | | header::DecodedYamuxHeader::Window { | 1008 | | syn: true, | 1009 | | ack: true, | 1010 | | .. | 1011 | | } => { | 1012 | | // You're never supposed to send a SYN and ACK at the same time. | 1013 | 0 | return Err(Error::UnexpectedAck); | 1014 | | } | 1015 | | | 1016 | | header::DecodedYamuxHeader::Data { | 1017 | | syn: true, | 1018 | 20 | fin, | 1019 | 20 | rst: false, | 1020 | 20 | stream_id, | 1021 | 20 | length, | 1022 | | .. | 1023 | | } | 1024 | | | header::DecodedYamuxHeader::Window { | 1025 | | syn: true, | 1026 | 0 | fin, | 1027 | 0 | rst: false, | 1028 | 0 | stream_id, | 1029 | 0 | length, | 1030 | | .. | 1031 | | } => { | 1032 | | // The initiator should only allocate uneven substream IDs, and the | 1033 | | // other side only even IDs. We don't know anymore whether we're | 1034 | | // initiator at this point, but we can compare with the even-ness of | 1035 | | // the IDs that we allocate locally. | 1036 | 20 | if (self.inner.next_outbound_substream.get() % 2) | 1037 | 20 | == (stream_id.get() % 2) | 1038 | | { | 1039 | 0 | return Err(Error::InvalidInboundStreamId(stream_id)); | 1040 | 20 | } | 1041 | 20 | | 1042 | 20 | // Remote has sent a SYN flag. A new substream is to be opened. | 1043 | 20 | match self.inner.substreams.get(&stream_id) { | 1044 | 20 | None => {} | 1045 | | Some(Substream { | 1046 | | state: | 1047 | | SubstreamState::Healthy { | 1048 | | local_write_close: SubstreamStateLocalWrite::FinQueued, | 1049 | | remote_write_closed: true, | 1050 | | .. | 1051 | | }, | 1052 | | .. | 1053 | | }) | 1054 | | | Some(Substream { | 1055 | | state: SubstreamState::Reset, | 1056 | | .. | 1057 | | }) => { | 1058 | | // Because we don't immediately destroy substreams, the remote | 1059 | | // might decide to re-use a substream ID that is still | 1060 | | // allocated locally. If that happens, we block the reading. | 1061 | | // It will be unblocked when the API user destroys the old | 1062 | | // substream. | 1063 | 0 | break; | 1064 | | } | 1065 | | Some(Substream { | 1066 | | state: SubstreamState::Healthy { .. }, | 1067 | | .. | 1068 | | }) => { | 1069 | 0 | return Err(Error::UnexpectedSyn(stream_id)); | 1070 | | } | 1071 | | } | 1072 | | | 1073 | | // When receiving a new substream, we might have to potentially queue | 1074 | | // a substream rejection message later. | 1075 | | // In order to ensure that there is enough space in `rsts_to_send`, | 1076 | | // we check it against the limit now. | 1077 | 20 | if self.inner.rsts_to_send.len() | 1078 | 20 | >= self.inner.max_simultaneous_rst_substreams.get() | 1079 | | { | 1080 | 0 | return Err(Error::MaxSimultaneousRstSubstreamsExceeded); | 1081 | 20 | } | 1082 | | | 1083 | 20 | let is_data = | 1084 | 20 | matches!0 (decoded_header, header::DecodedYamuxHeader::Data { .. }); | 1085 | | | 1086 | | // If we have queued or sent a GoAway frame, then the substream is | 1087 | | // ignored. The remote understands when receiving the GoAway that the | 1088 | | // substream has been rejected. | 1089 | 20 | if !matches!0 (self.inner.outgoing_goaway, OutgoingGoAway::NotRequired) { | 1090 | 0 | self.inner.incoming = if !is_data { | 1091 | 0 | Incoming::Header | 1092 | | } else { | 1093 | 0 | Incoming::DataFrame { | 1094 | 0 | substream_id: stream_id, | 1095 | 0 | remaining_bytes: length, | 1096 | 0 | } | 1097 | | }; | 1098 | | | 1099 | 0 | continue; | 1100 | 20 | } | 1101 | 20 | | 1102 | 20 | if is_data && u64::from(length) > NEW_SUBSTREAMS_FRAME_SIZE { | 1103 | 0 | return Err(Error::CreditsExceeded); | 1104 | 20 | } | 1105 | 20 | | 1106 | 20 | self.inner.incoming = Incoming::PendingIncomingSubstream { | 1107 | 20 | substream_id: stream_id, | 1108 | 20 | extra_window: if !is_data { length0 } else { 0 }, | 1109 | 20 | data_frame_size: if is_data { length } else { 00 }, | 1110 | 20 | fin, | 1111 | 20 | }; | 1112 | 20 | | 1113 | 20 | return Ok(ReadWriteOutcome::IncomingSubstream { yamux: self }); | 1114 | | } | 1115 | | | 1116 | | header::DecodedYamuxHeader::Data { | 1117 | | syn: false, | 1118 | | rst: false, | 1119 | 47 | stream_id, | 1120 | 47 | length, | 1121 | 47 | ack, | 1122 | 47 | fin, | 1123 | | .. | 1124 | | } => { | 1125 | | // Note that it is possible that the remote is referring to a substream | 1126 | | // for which a RST has been sent out by the local node. Since the | 1127 | | // local state machine doesn't keep track of RST'ted substreams, any | 1128 | | // frame concerning a substream that has been RST or with an unknown | 1129 | | // id is discarded and doesn't result in an error, under the | 1130 | | // presumption that we are in this situation. | 1131 | | if let Some(Substream { | 1132 | | state: | 1133 | | SubstreamState::Healthy { | 1134 | 47 | remote_write_closed, | 1135 | 47 | remote_allowed_window, | 1136 | 47 | remote_syn_acked, | 1137 | 47 | substreams_wake_up_key, | 1138 | | .. | 1139 | | }, | 1140 | | .. | 1141 | 47 | }) = self.inner.substreams.get_mut(&stream_id) | 1142 | | { | 1143 | 47 | match (ack, remote_syn_acked) { | 1144 | 28 | (false, true) => {} | 1145 | 19 | (true, acked @ false) => *acked = true, | 1146 | 0 | (true, true) => return Err(Error::UnexpectedAck), | 1147 | 0 | (false, false) => return Err(Error::ExpectedAck), | 1148 | | } | 1149 | | | 1150 | 47 | if *remote_write_closed { | 1151 | 0 | return Err(Error::WriteAfterFin); | 1152 | 47 | } | 1153 | 47 | | 1154 | 47 | if fin { | 1155 | 5 | *remote_write_closed = true; | 1156 | 5 | | 1157 | 5 | // No need to send window frames anymore if the remote has | 1158 | 5 | // sent a FIN. | 1159 | 5 | self.inner.window_frames_to_send.remove(&stream_id); | 1160 | | | 1161 | | // Wake up the substream. | 1162 | 0 | match substreams_wake_up_key { | 1163 | 0 | Some(Some(when)) if *when <= outer_read_write.now => {} | 1164 | 2 | Some(None) => {} | 1165 | 0 | Some(Some(when)) => { | 1166 | 0 | let _was_removed = self | 1167 | 0 | .inner | 1168 | 0 | .substreams_wake_up | 1169 | 0 | .remove(&(Some(when.clone()), stream_id)); | 1170 | 0 | debug_assert!(_was_removed); | 1171 | 0 | self.inner.substreams_wake_up.insert((None, stream_id)); | 1172 | 0 | *substreams_wake_up_key = Some(None); | 1173 | | } | 1174 | 3 | _ => { | 1175 | 3 | self.inner.substreams_wake_up.insert((None, stream_id)); | 1176 | 3 | *substreams_wake_up_key = Some(None); | 1177 | 3 | } | 1178 | | } | 1179 | 42 | } | 1180 | | | 1181 | | // Check whether the remote has the right to send that much data. | 1182 | | // Note that the credits aren't checked in the case of an unknown | 1183 | | // substream. | 1184 | 47 | *remote_allowed_window = remote_allowed_window | 1185 | 47 | .checked_sub(u64::from(length)) | 1186 | 47 | .ok_or(Error::CreditsExceeded)?0 ; | 1187 | 0 | } | 1188 | | | 1189 | | // Switch to the `DataFrame` state in order to process the frame, even | 1190 | | // if the substream no longer exists, in order to not duplicate code. | 1191 | 47 | self.inner.incoming = Incoming::DataFrame { | 1192 | 47 | substream_id: stream_id, | 1193 | 47 | remaining_bytes: length, | 1194 | 47 | }; | 1195 | | } | 1196 | | | 1197 | | header::DecodedYamuxHeader::Window { | 1198 | | syn: false, | 1199 | | rst: false, | 1200 | 0 | stream_id, | 1201 | 0 | length, | 1202 | 0 | ack, | 1203 | 0 | fin, | 1204 | | .. | 1205 | | } => { | 1206 | | // Note that it is possible that the remote is referring to a substream | 1207 | | // for which a RST has been sent out by the local node. Since the | 1208 | | // local state machine doesn't keep track of RST'ted substreams, any | 1209 | | // frame concerning a substream that has been RST or with an unknown | 1210 | | // id is discarded and doesn't result in an error, under the | 1211 | | // presumption that we are in this situation. | 1212 | | let Some(Substream { | 1213 | | state: | 1214 | | SubstreamState::Healthy { | 1215 | 0 | remote_syn_acked, | 1216 | 0 | allowed_window, | 1217 | 0 | remote_write_closed, | 1218 | | .. | 1219 | | }, | 1220 | | .. | 1221 | 0 | }) = self.inner.substreams.get_mut(&stream_id) | 1222 | | else { | 1223 | 0 | self.inner.incoming = Incoming::Header; | 1224 | 0 | continue; | 1225 | | }; | 1226 | | | 1227 | 0 | match (ack, remote_syn_acked) { | 1228 | 0 | (false, true) => {} | 1229 | 0 | (true, acked @ false) => *acked = true, | 1230 | 0 | (true, true) => return Err(Error::UnexpectedAck), | 1231 | 0 | (false, false) => return Err(Error::ExpectedAck), | 1232 | | } | 1233 | | | 1234 | | // Note that the spec is unclear about whether the remote can or | 1235 | | // should continue sending FIN flags on window size frames after | 1236 | | // their side of the substream has already been closed before. | 1237 | 0 | if fin { | 1238 | 0 | *remote_write_closed = true; | 1239 | 0 | } | 1240 | | | 1241 | | // If a substream was processed with a non-zero queuable bytes | 1242 | | // value (i.e. `allowed_window` non-zero) but yields empty write | 1243 | | // buffers, then we can assume that the substream has nothing to write, | 1244 | | // and thus that simply increasing the queueable bytes will not change | 1245 | | // the situation. After all, we have no idea whether the remote will | 1246 | | // increase the window size of this substream any further in the | 1247 | | // future, and thus we must write data out no matter what the window | 1248 | | // size is as long as it is non-zero. | 1249 | | // On the other hand, if the queueable bytes were 0, we take the guess | 1250 | | // that the substream might have more to write. | 1251 | 0 | if length != 0 && *allowed_window == 0 { | 1252 | 0 | self.inner.substreams_write_ready.insert(stream_id); | 1253 | 0 | } | 1254 | | | 1255 | 0 | *allowed_window = allowed_window | 1256 | 0 | .checked_add(u64::from(length)) | 1257 | 0 | .ok_or(Error::LocalCreditsOverflow)?; | 1258 | | | 1259 | 0 | self.inner.incoming = Incoming::Header; | 1260 | | } | 1261 | | } | 1262 | | } | 1263 | | } | 1264 | | } | 1265 | | | 1266 | | // Choose which substream to read/write (if any). | 1267 | 329 | let substream_id = match ( | 1268 | 369 | &self.inner.outgoing, | 1269 | 369 | self.inner.substreams_write_ready.iter().next().copied(), | 1270 | 369 | self.inner.substreams_wake_up.first(), | 1271 | | ) { | 1272 | | ( | 1273 | | Outgoing::PreparingDataFrame { | 1274 | 66 | substream_id, | 1275 | 66 | write_buffers, | 1276 | 66 | }, | 1277 | 66 | _, | 1278 | 66 | _, | 1279 | 66 | ) => { | 1280 | 66 | // Continue writing to the substream whose frame we're already preparing. | 1281 | 66 | debug_assert!(!write_buffers.is_empty()); | 1282 | 66 | self.inner.substreams_write_ready.remove(substream_id); | 1283 | 66 | *substream_id | 1284 | | } | 1285 | 92 | (Outgoing::WritingOut { buffers }, Some(substream_id), _) if buffers.is_empty() => { | 1286 | 92 | // Pull a substream from `substreams_write_ready`. | 1287 | 92 | self.inner.substreams_write_ready.remove(&substream_id); | 1288 | 92 | substream_id | 1289 | | } | 1290 | 195 | (_, _, Some((when, substream_id))) | 1291 | 195 | if when | 1292 | 195 | .as_ref() | 1293 | 195 | .map_or(true, |when| *when <= outer_read_write.now) => | 1294 | 171 | { | 1295 | 171 | *substream_id | 1296 | | } | 1297 | | _ => { | 1298 | | // No substream to read/write. | 1299 | 40 | return Ok(ReadWriteOutcome::Idle { yamux: self }); | 1300 | | } | 1301 | | }; | 1302 | | | 1303 | | // Extract some fields from the substream state. | 1304 | | let SubstreamState::Healthy { | 1305 | 329 | allowed_window, | 1306 | 329 | local_write_close, | 1307 | 329 | remote_write_closed, | 1308 | 329 | read_buffer, | 1309 | 329 | substreams_wake_up_key, | 1310 | | .. | 1311 | 329 | } = &mut self.inner.substreams.get_mut(&substream_id).unwrap().state | 1312 | | else { | 1313 | 0 | unreachable!() | 1314 | | }; | 1315 | | | 1316 | | // Remove the substream from `substreams_wake_up`, since we're processing it now. | 1317 | | // If the processing produces a `wake_up_after` value when being processed, it will be | 1318 | | // re-inserted. | 1319 | 329 | if let Some(substreams_wake_up_key273 ) = substreams_wake_up_key.take() { | 1320 | 273 | let _was_removed = self | 1321 | 273 | .inner | 1322 | 273 | .substreams_wake_up | 1323 | 273 | .remove(&(substreams_wake_up_key, substream_id)); | 1324 | 273 | debug_assert!(_was_removed); | 1325 | 56 | } | 1326 | | | 1327 | 329 | let (write_buffers, can_queue_data) = match &mut self.inner.outgoing66 { | 1328 | 263 | Outgoing::WritingOut { buffers } if buffers.is_empty() => { | 1329 | 263 | let mut buffers = mem::take(buffers); | 1330 | 263 | // As a small optimization, we push an empty buffer at the front where the header | 1331 | 263 | // might later get written. | 1332 | 263 | buffers.push(Vec::with_capacity(12)); | 1333 | 263 | (buffers, true) | 1334 | | } | 1335 | | Outgoing::PreparingDataFrame { | 1336 | 66 | substream_id: s, | 1337 | 66 | write_buffers, | 1338 | 66 | } if *s == substream_id => { | 1339 | 66 | let buffers = mem::take(write_buffers); | 1340 | 66 | self.inner.outgoing = Outgoing::WritingOut { | 1341 | 66 | buffers: Vec::new(), | 1342 | 66 | }; | 1343 | 66 | (buffers, true) | 1344 | | } | 1345 | 0 | _ => (Vec::new(), false), | 1346 | | }; | 1347 | 329 | let write_buffers_len_before = write_buffers.iter().fold(0, |count, buf| count + buf.len()); | 1348 | 329 | | 1349 | 329 | Ok(ReadWriteOutcome::ProcessSubstream { | 1350 | 329 | substream_read_write: SubstreamReadWrite { | 1351 | 329 | substream_id, | 1352 | 329 | write_buffers_len_before, | 1353 | 329 | inner_read_write: ReadWrite { | 1354 | 329 | now: outer_read_write.now.clone(), | 1355 | 329 | incoming_buffer: mem::take(read_buffer), | 1356 | 329 | expected_incoming_bytes: if !*remote_write_closed { Some(0)320 } else { None9 }, | 1357 | | read_bytes: 0, | 1358 | 329 | write_buffers, | 1359 | 329 | write_bytes_queued: write_buffers_len_before, | 1360 | 10 | write_bytes_queueable: if matches!( | 1361 | 329 | local_write_close, | 1362 | | SubstreamStateLocalWrite::Open | 1363 | | ) { | 1364 | 319 | if can_queue_data { | 1365 | 319 | Some( | 1366 | 319 | usize::try_from(cmp::min( | 1367 | 319 | u64::from(self.inner.max_out_data_frame_size.get()), | 1368 | 319 | *allowed_window, | 1369 | 319 | )) | 1370 | 319 | .unwrap_or(usize::MAX) | 1371 | 319 | .saturating_sub(write_buffers_len_before), | 1372 | 319 | ) | 1373 | | } else { | 1374 | 0 | Some(0) | 1375 | | } | 1376 | | } else { | 1377 | 10 | None | 1378 | | }, | 1379 | 329 | wake_up_after: None, | 1380 | 329 | }, | 1381 | 329 | outer_read_write, | 1382 | 329 | yamux: self, | 1383 | | }, | 1384 | | }) | 1385 | 389 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10read_writeBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node |
1386 | | |
1387 | | /// Adds `bytes` to the number of bytes the remote is allowed to send at once in the next |
1388 | | /// packet. |
1389 | | /// |
1390 | | /// > **Note**: The [`SubstreamReadWrite`] object ensures that the remote window is big enough |
1391 | | /// > to receive the amount of bytes requested by the substream. Calling |
1392 | | /// > [`Yamux::add_remote_window_saturating`] is therefore not mandatory, but only an |
1393 | | /// > optimization to reduce the number of networking round trips. |
1394 | | /// |
1395 | | /// The counter saturates if its maximum is reached. This could cause stalls if the |
1396 | | /// remote sends a ton of data. However, given that the number of bytes is stored in a `u64`, |
1397 | | /// the remote would have to send at least `2^64` bytes in order to reach this situation, |
1398 | | /// making it basically impossible. |
1399 | | /// |
1400 | | /// It is, furthermore, a bad idea to increase this counter by an immense number ahead of |
1401 | | /// time, as the remote can shut down the connection if its own counter overflows. The way |
1402 | | /// this counter is supposed to be used in a "streaming" way. |
1403 | | /// |
1404 | | /// Increasing the window of a substream by `bytes` requires sending out `12 * bytes / 2^32` |
1405 | | /// bytes. For example, increasing the window by `2^64` adds an overhead of 48 GiB. Please |
1406 | | /// don't do that. |
1407 | | /// |
1408 | | /// > **Note**: When a substream has just been opened or accepted, it starts with an initial |
1409 | | /// > window of [`NEW_SUBSTREAMS_FRAME_SIZE`]. |
1410 | | /// |
1411 | | /// > **Note**: It is only possible to add more bytes to the window and not set or reduce this |
1412 | | /// > number of bytes, and it is also not possible to obtain the number of bytes the |
1413 | | /// > remote is allowed. That's because it would be ambiguous whether bytes possibly |
1414 | | /// > in the send or receive queue should be counted or not. |
1415 | | /// |
1416 | | /// Has no effect if the remote has already closed their writing side. |
1417 | | /// |
1418 | | /// # Panic |
1419 | | /// |
1420 | | /// Panics if the [`SubstreamId`] is invalid. |
1421 | | /// |
1422 | 3 | pub fn add_remote_window_saturating(&mut self, substream_id: SubstreamId, bytes: u64) { |
1423 | 3 | if let SubstreamState::Healthy { |
1424 | 3 | remote_write_closed: false, |
1425 | 3 | .. |
1426 | 3 | } = &mut self |
1427 | 3 | .inner |
1428 | 3 | .substreams |
1429 | 3 | .get_mut(&substream_id.0) |
1430 | 3 | .unwrap_or_else(|| panic!()0 ) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE28add_remote_window_saturating0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturating0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE28add_remote_window_saturating0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturating0CsiUjFBJteJ7x_17smoldot_full_node |
1431 | 3 | .state |
1432 | 0 | { |
1433 | 3 | let Some(bytes) = NonZeroU64::new(bytes) else { |
1434 | 3 | return; |
1435 | | }; |
1436 | | |
1437 | 0 | self.inner |
1438 | 0 | .window_frames_to_send |
1439 | 0 | .entry(substream_id.0) |
1440 | 0 | .and_modify(|window| *window = window.saturating_add(bytes.get())) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE28add_remote_window_saturatings_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturatings_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE28add_remote_window_saturatings_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE28add_remote_window_saturatings_0CsiUjFBJteJ7x_17smoldot_full_node |
1441 | 0 | .or_insert(bytes); |
1442 | 0 | } |
1443 | 3 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE28add_remote_window_saturatingBa_ Line | Count | Source | 1422 | 3 | pub fn add_remote_window_saturating(&mut self, substream_id: SubstreamId, bytes: u64) { | 1423 | 3 | if let SubstreamState::Healthy { | 1424 | 3 | remote_write_closed: false, | 1425 | 3 | .. | 1426 | 3 | } = &mut self | 1427 | 3 | .inner | 1428 | 3 | .substreams | 1429 | 3 | .get_mut(&substream_id.0) | 1430 | 3 | .unwrap_or_else(|| panic!()) | 1431 | 3 | .state | 1432 | 0 | { | 1433 | 3 | let Some(bytes) = NonZeroU64::new(bytes) else { | 1434 | 3 | return; | 1435 | | }; | 1436 | | | 1437 | 0 | self.inner | 1438 | 0 | .window_frames_to_send | 1439 | 0 | .entry(substream_id.0) | 1440 | 0 | .and_modify(|window| *window = window.saturating_add(bytes.get())) | 1441 | 0 | .or_insert(bytes); | 1442 | 0 | } | 1443 | 3 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE28add_remote_window_saturatingCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE28add_remote_window_saturatingBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE28add_remote_window_saturatingCsiUjFBJteJ7x_17smoldot_full_node |
1444 | | |
1445 | | /// Abruptly shuts down the substream. Sends a frame with the `RST` flag to the remote. |
1446 | | /// |
1447 | | /// Use this method when a protocol error happens on a substream. |
1448 | | /// |
1449 | | /// Returns an error if [`Yamux::reset`] has already been called on this substream, or if the |
1450 | | /// remote has reset the substream in the past, or if the substream was closed. |
1451 | | /// |
1452 | | /// # Panic |
1453 | | /// |
1454 | | /// Panics if the [`SubstreamId`] is invalid. |
1455 | | /// |
1456 | 5 | pub fn reset(&mut self, substream_id: SubstreamId) -> Result<(), ResetError> { |
1457 | | // Add an entry to the list of RST headers to send to the remote. |
1458 | | let SubstreamState::Healthy { |
1459 | 5 | substreams_wake_up_key, |
1460 | | .. |
1461 | 5 | } = &mut self |
1462 | 5 | .inner |
1463 | 5 | .substreams |
1464 | 5 | .get_mut(&substream_id.0) |
1465 | 5 | .unwrap_or_else(|| panic!()0 ) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE5reset0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE5reset0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE5reset0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE5reset0CsiUjFBJteJ7x_17smoldot_full_node |
1466 | 5 | .state |
1467 | | else { |
1468 | 0 | return Err(ResetError::AlreadyReset); |
1469 | | }; |
1470 | | |
1471 | 5 | if !self.inner.dead_substreams.insert(substream_id.0) { |
1472 | 1 | return Err(ResetError::AlreadyClosed); |
1473 | 4 | } |
1474 | 4 | |
1475 | 4 | self.inner.window_frames_to_send.remove(&substream_id.0); |
1476 | 4 | self.inner.substreams_write_ready.remove(&substream_id.0); |
1477 | | |
1478 | 4 | if let Some(key2 ) = substreams_wake_up_key.take() { |
1479 | 2 | let _was_removed = self.inner.substreams_wake_up.remove(&(key, substream_id.0)); |
1480 | 2 | debug_assert!(_was_removed); |
1481 | 2 | } |
1482 | | |
1483 | | if let Outgoing::PreparingDataFrame { |
1484 | 0 | substream_id: id, |
1485 | 0 | write_buffers, |
1486 | 4 | } = &mut self.inner.outgoing |
1487 | | { |
1488 | 0 | if *id == substream_id.0 { |
1489 | 0 | write_buffers.clear(); |
1490 | 0 | self.inner.outgoing = Outgoing::WritingOut { |
1491 | 0 | buffers: mem::take(write_buffers), |
1492 | 0 | } |
1493 | 0 | } |
1494 | 4 | } |
1495 | | |
1496 | | // Note that we intentionally don't check the size against |
1497 | | // `max_simultaneous_rst_substreams`, as locally-emitted RST frames aren't the |
1498 | | // remote's fault. |
1499 | 4 | self.inner.rsts_to_send.push_back(substream_id.0); |
1500 | 4 | |
1501 | 4 | self.inner |
1502 | 4 | .substreams |
1503 | 4 | .get_mut(&substream_id.0) |
1504 | 4 | .unwrap_or_else(|| panic!()0 ) Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE5resets_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE5resets_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE5resets_0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE5resets_0CsiUjFBJteJ7x_17smoldot_full_node |
1505 | 4 | .state = SubstreamState::Reset; |
1506 | 4 | |
1507 | 4 | Ok(()) |
1508 | 5 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE5resetBa_ Line | Count | Source | 1456 | 5 | pub fn reset(&mut self, substream_id: SubstreamId) -> Result<(), ResetError> { | 1457 | | // Add an entry to the list of RST headers to send to the remote. | 1458 | | let SubstreamState::Healthy { | 1459 | 5 | substreams_wake_up_key, | 1460 | | .. | 1461 | 5 | } = &mut self | 1462 | 5 | .inner | 1463 | 5 | .substreams | 1464 | 5 | .get_mut(&substream_id.0) | 1465 | 5 | .unwrap_or_else(|| panic!()) | 1466 | 5 | .state | 1467 | | else { | 1468 | 0 | return Err(ResetError::AlreadyReset); | 1469 | | }; | 1470 | | | 1471 | 5 | if !self.inner.dead_substreams.insert(substream_id.0) { | 1472 | 1 | return Err(ResetError::AlreadyClosed); | 1473 | 4 | } | 1474 | 4 | | 1475 | 4 | self.inner.window_frames_to_send.remove(&substream_id.0); | 1476 | 4 | self.inner.substreams_write_ready.remove(&substream_id.0); | 1477 | | | 1478 | 4 | if let Some(key2 ) = substreams_wake_up_key.take() { | 1479 | 2 | let _was_removed = self.inner.substreams_wake_up.remove(&(key, substream_id.0)); | 1480 | 2 | debug_assert!(_was_removed); | 1481 | 2 | } | 1482 | | | 1483 | | if let Outgoing::PreparingDataFrame { | 1484 | 0 | substream_id: id, | 1485 | 0 | write_buffers, | 1486 | 4 | } = &mut self.inner.outgoing | 1487 | | { | 1488 | 0 | if *id == substream_id.0 { | 1489 | 0 | write_buffers.clear(); | 1490 | 0 | self.inner.outgoing = Outgoing::WritingOut { | 1491 | 0 | buffers: mem::take(write_buffers), | 1492 | 0 | } | 1493 | 0 | } | 1494 | 4 | } | 1495 | | | 1496 | | // Note that we intentionally don't check the size against | 1497 | | // `max_simultaneous_rst_substreams`, as locally-emitted RST frames aren't the | 1498 | | // remote's fault. | 1499 | 4 | self.inner.rsts_to_send.push_back(substream_id.0); | 1500 | 4 | | 1501 | 4 | self.inner | 1502 | 4 | .substreams | 1503 | 4 | .get_mut(&substream_id.0) | 1504 | 4 | .unwrap_or_else(|| panic!()) | 1505 | 4 | .state = SubstreamState::Reset; | 1506 | 4 | | 1507 | 4 | Ok(()) | 1508 | 5 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE5resetCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE5resetBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE5resetCsiUjFBJteJ7x_17smoldot_full_node |
1509 | | |
1510 | | /// Queues sending out a ping to the remote. |
1511 | | /// |
1512 | | /// # Panic |
1513 | | /// |
1514 | | /// Panics if there are already [`MAX_PINGS`] pings that have been queued and that the remote |
1515 | | /// hasn't answered yet. [`MAX_PINGS`] is pretty large, and unless there is a bug in the API |
1516 | | /// user's code causing pings to be allocated in a loop, this limit is not likely to ever be |
1517 | | /// reached. |
1518 | | /// |
1519 | 0 | pub fn queue_ping(&mut self) { |
1520 | 0 | // A maximum number of simultaneous pings (`MAX_PINGS`) is necessary because we don't |
1521 | 0 | // support sending multiple identical ping opaque values. Since the ping opaque values |
1522 | 0 | // are 32 bits, the actual maximum number of simultaneous pings is 2^32. But because we |
1523 | 0 | // allocate ping values by looping until we find a not-yet-allocated value, the arbitrary |
1524 | 0 | // self-enforced maximum needs to be way lower. |
1525 | 0 | assert!(self.inner.pings_to_send + self.inner.pings_waiting_reply.len() < MAX_PINGS); |
1526 | 0 | self.inner.pings_to_send += 1; |
1527 | 0 | } Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10queue_pingBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE10queue_pingBa_ |
1528 | | |
1529 | | /// Queues a `GoAway` frame, requesting the remote to no longer open any substream. |
1530 | | /// |
1531 | | /// If the state of [`Yamux`] is currently waiting for a confirmation to accept/reject a |
1532 | | /// substream, then this function automatically implies calling |
1533 | | /// [`Yamux::reject_pending_substream`]. |
1534 | | /// |
1535 | | /// All follow-up requests for new substreams from the remote are automatically rejected. |
1536 | | /// [`ReadWriteOutcome::IncomingSubstream`] events can no longer happen. |
1537 | | /// |
1538 | 0 | pub fn send_goaway(&mut self, code: GoAwayErrorCode) -> Result<(), SendGoAwayError> { |
1539 | 0 | match self.inner.outgoing_goaway { |
1540 | | OutgoingGoAway::NotRequired => { |
1541 | 0 | self.inner.outgoing_goaway = OutgoingGoAway::Required(code) |
1542 | | } |
1543 | 0 | _ => return Err(SendGoAwayError::AlreadySent), |
1544 | | } |
1545 | | |
1546 | | // If the remote is currently opening a substream, ignore it. The remote understands when |
1547 | | // receiving the GoAway that the substream has been rejected. |
1548 | | if let Incoming::PendingIncomingSubstream { |
1549 | 0 | substream_id, |
1550 | 0 | data_frame_size, |
1551 | | .. |
1552 | 0 | } = self.inner.incoming |
1553 | 0 | { |
1554 | 0 | self.inner.incoming = Incoming::DataFrame { |
1555 | 0 | substream_id, |
1556 | 0 | remaining_bytes: data_frame_size, |
1557 | 0 | }; |
1558 | 0 | } |
1559 | | |
1560 | 0 | Ok(()) |
1561 | 0 | } Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11send_goawayBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE11send_goawayBa_ |
1562 | | |
1563 | | /// Returns the list of all substreams that have been closed or reset. |
1564 | | /// |
1565 | | /// This function does not remove dead substreams from the state machine. In other words, if |
1566 | | /// this function is called multiple times in a row, it will always return the same |
1567 | | /// substreams. Use [`Yamux::remove_dead_substream`] to remove substreams. |
1568 | 295 | pub fn dead_substreams( |
1569 | 295 | &'_ self, |
1570 | 295 | ) -> impl Iterator<Item = (SubstreamId, DeadSubstreamTy, &'_ TSub)> + '_ { |
1571 | 295 | self.inner.dead_substreams.iter().map(|id| { |
1572 | 2 | let substream = self.inner.substreams.get(id).unwrap(); |
1573 | 2 | match &substream.state { |
1574 | 2 | SubstreamState::Reset => ( |
1575 | 2 | SubstreamId(*id), |
1576 | 2 | DeadSubstreamTy::Reset, |
1577 | 2 | &substream.user_data, |
1578 | 2 | ), |
1579 | | SubstreamState::Healthy { |
1580 | 0 | local_write_close, |
1581 | 0 | remote_write_closed, |
1582 | 0 | .. |
1583 | 0 | } => { |
1584 | 0 | debug_assert!( |
1585 | 0 | matches!(local_write_close, SubstreamStateLocalWrite::FinQueued) |
1586 | 0 | && *remote_write_closed |
1587 | | ); |
1588 | | |
1589 | 0 | ( |
1590 | 0 | SubstreamId(*id), |
1591 | 0 | DeadSubstreamTy::ClosedGracefully, |
1592 | 0 | &substream.user_data, |
1593 | 0 | ) |
1594 | | } |
1595 | | } |
1596 | 295 | }2 ) _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE15dead_substreams0Bc_ Line | Count | Source | 1571 | 2 | self.inner.dead_substreams.iter().map(|id| { | 1572 | 2 | let substream = self.inner.substreams.get(id).unwrap(); | 1573 | 2 | match &substream.state { | 1574 | 2 | SubstreamState::Reset => ( | 1575 | 2 | SubstreamId(*id), | 1576 | 2 | DeadSubstreamTy::Reset, | 1577 | 2 | &substream.user_data, | 1578 | 2 | ), | 1579 | | SubstreamState::Healthy { | 1580 | 0 | local_write_close, | 1581 | 0 | remote_write_closed, | 1582 | 0 | .. | 1583 | 0 | } => { | 1584 | 0 | debug_assert!( | 1585 | 0 | matches!(local_write_close, SubstreamStateLocalWrite::FinQueued) | 1586 | 0 | && *remote_write_closed | 1587 | | ); | 1588 | | | 1589 | 0 | ( | 1590 | 0 | SubstreamId(*id), | 1591 | 0 | DeadSubstreamTy::ClosedGracefully, | 1592 | 0 | &substream.user_data, | 1593 | 0 | ) | 1594 | | } | 1595 | | } | 1596 | 2 | }) |
Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE15dead_substreams0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE15dead_substreams0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE15dead_substreams0CsiUjFBJteJ7x_17smoldot_full_node |
1597 | 295 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE15dead_substreamsBa_ Line | Count | Source | 1568 | 295 | pub fn dead_substreams( | 1569 | 295 | &'_ self, | 1570 | 295 | ) -> impl Iterator<Item = (SubstreamId, DeadSubstreamTy, &'_ TSub)> + '_ { | 1571 | 295 | self.inner.dead_substreams.iter().map(|id| { | 1572 | | let substream = self.inner.substreams.get(id).unwrap(); | 1573 | | match &substream.state { | 1574 | | SubstreamState::Reset => ( | 1575 | | SubstreamId(*id), | 1576 | | DeadSubstreamTy::Reset, | 1577 | | &substream.user_data, | 1578 | | ), | 1579 | | SubstreamState::Healthy { | 1580 | | local_write_close, | 1581 | | remote_write_closed, | 1582 | | .. | 1583 | | } => { | 1584 | | debug_assert!( | 1585 | | matches!(local_write_close, SubstreamStateLocalWrite::FinQueued) | 1586 | | && *remote_write_closed | 1587 | | ); | 1588 | | | 1589 | | ( | 1590 | | SubstreamId(*id), | 1591 | | DeadSubstreamTy::ClosedGracefully, | 1592 | | &substream.user_data, | 1593 | | ) | 1594 | | } | 1595 | | } | 1596 | 295 | }) | 1597 | 295 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE15dead_substreamsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE15dead_substreamsBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE15dead_substreamsCsiUjFBJteJ7x_17smoldot_full_node |
1598 | | |
1599 | | /// Removes a dead substream from the state machine. |
1600 | | /// |
1601 | | /// # Panic |
1602 | | /// |
1603 | | /// Panics if the substream with that id doesn't exist or isn't dead. |
1604 | | /// |
1605 | 2 | pub fn remove_dead_substream(&mut self, id: SubstreamId) -> TSub { |
1606 | 2 | let was_in = self.inner.dead_substreams.remove(&id.0); |
1607 | 2 | if !was_in { |
1608 | 0 | panic!() |
1609 | 2 | } |
1610 | 2 | |
1611 | 2 | debug_assert!(!self |
1612 | 2 | .inner |
1613 | 2 | .substreams_wake_up |
1614 | 2 | .iter() |
1615 | 2 | .any(|(_, s)| s == &id.00 )); Unexecuted instantiation: _RNCNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB8_11established9substream9SubstreamB18_EIB1K_uEEEE21remove_dead_substream0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1L_IB1L_NtNtBa_10collection11SubstreamIdEEEEE21remove_dead_substream0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxppE21remove_dead_substream0Bc_ Unexecuted instantiation: _RNCNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB6_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB8_11established9substream9SubstreamB19_EIB1J_IB1J_NtNtBa_10collection11SubstreamIdEEEEE21remove_dead_substream0CsiUjFBJteJ7x_17smoldot_full_node |
1616 | 2 | debug_assert!(!self.inner.window_frames_to_send.contains_key(&id.0)); |
1617 | 2 | debug_assert!(!self.inner.substreams_write_ready.contains(&id.0)); |
1618 | | |
1619 | 2 | let substream = self.inner.substreams.remove(&id.0).unwrap(); |
1620 | 2 | |
1621 | 2 | if substream.inbound { |
1622 | 2 | self.inner.num_inbound -= 1; |
1623 | 2 | }0 |
1624 | | |
1625 | 2 | substream.user_data |
1626 | 2 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE21remove_dead_substreamBa_ Line | Count | Source | 1605 | 2 | pub fn remove_dead_substream(&mut self, id: SubstreamId) -> TSub { | 1606 | 2 | let was_in = self.inner.dead_substreams.remove(&id.0); | 1607 | 2 | if !was_in { | 1608 | 0 | panic!() | 1609 | 2 | } | 1610 | 2 | | 1611 | 2 | debug_assert!(!self | 1612 | 2 | .inner | 1613 | 2 | .substreams_wake_up | 1614 | 2 | .iter() | 1615 | 2 | .any(|(_, s)| s == &id.0)); | 1616 | 2 | debug_assert!(!self.inner.window_frames_to_send.contains_key(&id.0)); | 1617 | 2 | debug_assert!(!self.inner.substreams_write_ready.contains(&id.0)); | 1618 | | | 1619 | 2 | let substream = self.inner.substreams.remove(&id.0).unwrap(); | 1620 | 2 | | 1621 | 2 | if substream.inbound { | 1622 | 2 | self.inner.num_inbound -= 1; | 1623 | 2 | }0 | 1624 | | | 1625 | 2 | substream.user_data | 1626 | 2 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE21remove_dead_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE21remove_dead_substreamBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE21remove_dead_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1627 | | |
1628 | | /// Accepts an incoming substream. |
1629 | | /// |
1630 | | /// Either [`Yamux::accept_pending_substream`] or [`Yamux::reject_pending_substream`] must be |
1631 | | /// called after [`ReadWriteOutcome::IncomingSubstream`] is returned. |
1632 | | /// |
1633 | | /// Note that there is no expiration window after [`ReadWriteOutcome::IncomingSubstream`] |
1634 | | /// is returned until the substream is no longer valid. However, reading will be blocked until |
1635 | | /// the substream is either accepted or rejected. This function should thus be called as |
1636 | | /// soon as possible. |
1637 | | /// |
1638 | | /// Returns an error if no incoming substream is currently pending. |
1639 | | /// |
1640 | 20 | pub fn accept_pending_substream( |
1641 | 20 | &mut self, |
1642 | 20 | user_data: TSub, |
1643 | 20 | ) -> Result<SubstreamId, PendingSubstreamError> { |
1644 | | let Incoming::PendingIncomingSubstream { |
1645 | 20 | substream_id, |
1646 | 20 | extra_window, |
1647 | 20 | data_frame_size, |
1648 | 20 | fin, |
1649 | 20 | } = self.inner.incoming |
1650 | | else { |
1651 | 0 | return Err(PendingSubstreamError::NoPendingSubstream); |
1652 | | }; |
1653 | | |
1654 | 20 | debug_assert!(u64::from(data_frame_size) <= NEW_SUBSTREAMS_FRAME_SIZE); |
1655 | | |
1656 | 20 | let _was_before = self.inner.substreams.insert( |
1657 | 20 | substream_id, |
1658 | 20 | Substream { |
1659 | 20 | state: SubstreamState::Healthy { |
1660 | 20 | first_message_queued: false, |
1661 | 20 | remote_syn_acked: true, |
1662 | 20 | remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE - u64::from(data_frame_size), |
1663 | 20 | allowed_window: NEW_SUBSTREAMS_FRAME_SIZE + u64::from(extra_window), |
1664 | 20 | local_write_close: SubstreamStateLocalWrite::Open, |
1665 | 20 | remote_write_closed: fin, |
1666 | 20 | expected_incoming_bytes: None, |
1667 | 20 | read_buffer: Vec::new(), // TODO: capacity? |
1668 | 20 | // There is no need to insert the substream in `substreams_wake_up`, as we |
1669 | 20 | // switch `self.inner.incoming` to a data frame below, which will cause the |
1670 | 20 | // substream to be processed. |
1671 | 20 | substreams_wake_up_key: None, |
1672 | 20 | }, |
1673 | 20 | inbound: true, |
1674 | 20 | user_data, |
1675 | 20 | }, |
1676 | 20 | ); |
1677 | 20 | debug_assert!(_was_before.is_none()); |
1678 | | |
1679 | 20 | self.inner.num_inbound += 1; |
1680 | 20 | |
1681 | 20 | self.inner.incoming = Incoming::DataFrame { |
1682 | 20 | substream_id, |
1683 | 20 | remaining_bytes: data_frame_size, |
1684 | 20 | }; |
1685 | 20 | |
1686 | 20 | Ok(SubstreamId(substream_id)) |
1687 | 20 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE24accept_pending_substreamBa_ Line | Count | Source | 1640 | 20 | pub fn accept_pending_substream( | 1641 | 20 | &mut self, | 1642 | 20 | user_data: TSub, | 1643 | 20 | ) -> Result<SubstreamId, PendingSubstreamError> { | 1644 | | let Incoming::PendingIncomingSubstream { | 1645 | 20 | substream_id, | 1646 | 20 | extra_window, | 1647 | 20 | data_frame_size, | 1648 | 20 | fin, | 1649 | 20 | } = self.inner.incoming | 1650 | | else { | 1651 | 0 | return Err(PendingSubstreamError::NoPendingSubstream); | 1652 | | }; | 1653 | | | 1654 | 20 | debug_assert!(u64::from(data_frame_size) <= NEW_SUBSTREAMS_FRAME_SIZE); | 1655 | | | 1656 | 20 | let _was_before = self.inner.substreams.insert( | 1657 | 20 | substream_id, | 1658 | 20 | Substream { | 1659 | 20 | state: SubstreamState::Healthy { | 1660 | 20 | first_message_queued: false, | 1661 | 20 | remote_syn_acked: true, | 1662 | 20 | remote_allowed_window: NEW_SUBSTREAMS_FRAME_SIZE - u64::from(data_frame_size), | 1663 | 20 | allowed_window: NEW_SUBSTREAMS_FRAME_SIZE + u64::from(extra_window), | 1664 | 20 | local_write_close: SubstreamStateLocalWrite::Open, | 1665 | 20 | remote_write_closed: fin, | 1666 | 20 | expected_incoming_bytes: None, | 1667 | 20 | read_buffer: Vec::new(), // TODO: capacity? | 1668 | 20 | // There is no need to insert the substream in `substreams_wake_up`, as we | 1669 | 20 | // switch `self.inner.incoming` to a data frame below, which will cause the | 1670 | 20 | // substream to be processed. | 1671 | 20 | substreams_wake_up_key: None, | 1672 | 20 | }, | 1673 | 20 | inbound: true, | 1674 | 20 | user_data, | 1675 | 20 | }, | 1676 | 20 | ); | 1677 | 20 | debug_assert!(_was_before.is_none()); | 1678 | | | 1679 | 20 | self.inner.num_inbound += 1; | 1680 | 20 | | 1681 | 20 | self.inner.incoming = Incoming::DataFrame { | 1682 | 20 | substream_id, | 1683 | 20 | remaining_bytes: data_frame_size, | 1684 | 20 | }; | 1685 | 20 | | 1686 | 20 | Ok(SubstreamId(substream_id)) | 1687 | 20 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE24accept_pending_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE24accept_pending_substreamBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE24accept_pending_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1688 | | |
1689 | | /// Rejects an incoming substream. |
1690 | | /// |
1691 | | /// Either [`Yamux::accept_pending_substream`] or [`Yamux::reject_pending_substream`] must be |
1692 | | /// called after [`ReadWriteOutcome::IncomingSubstream`] is returned. |
1693 | | /// |
1694 | | /// Note that there is no expiration window after [`ReadWriteOutcome::IncomingSubstream`] |
1695 | | /// is returned until the substream is no longer valid. However, reading will be blocked until |
1696 | | /// the substream is either accepted or rejected. This function should thus be called as |
1697 | | /// soon as possible. |
1698 | | /// |
1699 | | /// Returns an error if no incoming substream is currently pending. |
1700 | | /// |
1701 | 0 | pub fn reject_pending_substream(&mut self) -> Result<(), PendingSubstreamError> { |
1702 | | let Incoming::PendingIncomingSubstream { |
1703 | 0 | substream_id, |
1704 | 0 | data_frame_size, |
1705 | | .. |
1706 | 0 | } = self.inner.incoming |
1707 | | else { |
1708 | 0 | return Err(PendingSubstreamError::NoPendingSubstream); |
1709 | | }; |
1710 | | |
1711 | 0 | self.inner.rsts_to_send.push_back(substream_id); |
1712 | 0 | self.inner.incoming = Incoming::DataFrame { |
1713 | 0 | substream_id, |
1714 | 0 | remaining_bytes: data_frame_size, |
1715 | 0 | }; |
1716 | 0 | Ok(()) |
1717 | 0 | } Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1a_6option6OptionTINtNtNtB6_11established9substream9SubstreamB16_EIB1I_uEEEE24reject_pending_substreamBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1J_IB1J_NtNtB8_10collection11SubstreamIdEEEEE24reject_pending_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxppE24reject_pending_substreamBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB4_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB6_11established9substream9SubstreamB17_EIB1H_IB1H_NtNtB8_10collection11SubstreamIdEEEEE24reject_pending_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1718 | | } |
1719 | | |
1720 | | impl<TNow, TSub> ops::Index<SubstreamId> for Yamux<TNow, TSub> { |
1721 | | type Output = TSub; |
1722 | | |
1723 | 0 | fn index(&self, substream_id: SubstreamId) -> &TSub { |
1724 | 0 | &self |
1725 | 0 | .inner |
1726 | 0 | .substreams |
1727 | 0 | .get(&substream_id.0) |
1728 | 0 | .unwrap() |
1729 | 0 | .user_data |
1730 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs0_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_11SubstreamIdE5indexBb_ Unexecuted instantiation: _RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1K_IB1K_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1c_3ops5index5IndexNtB5_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs0_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_11SubstreamIdE5indexBb_ Unexecuted instantiation: _RNvXs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1I_IB1I_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1M_3ops5index5IndexNtB5_11SubstreamIdE5indexCsiUjFBJteJ7x_17smoldot_full_node |
1731 | | } |
1732 | | |
1733 | | impl<TNow, TSub> ops::IndexMut<SubstreamId> for Yamux<TNow, TSub> { |
1734 | 28 | fn index_mut(&mut self, substream_id: SubstreamId) -> &mut TSub { |
1735 | 28 | &mut self |
1736 | 28 | .inner |
1737 | 28 | .substreams |
1738 | 28 | .get_mut(&substream_id.0) |
1739 | 28 | .unwrap_or_else(|| panic!()0 ) Unexecuted instantiation: _RNCNvXs1_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1d_6option6OptionTINtNtNtB9_11established9substream9SubstreamB19_EIB1L_uEEEEINtNtNtB1d_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0Bd_ Unexecuted instantiation: _RNCNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1e_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1a_EIB1M_IB1M_NtNtBb_10collection11SubstreamIdEEEEEINtNtNtB1e_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs1_0ppEINtB7_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0Bd_ Unexecuted instantiation: _RNCNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1a_EIB1K_IB1K_NtNtBb_10collection11SubstreamIdEEEEEINtNtNtB1O_3ops5index8IndexMutNtB7_11SubstreamIdE9index_mut0CsiUjFBJteJ7x_17smoldot_full_node |
1740 | 28 | .user_data |
1741 | 28 | } _RNvXs1_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1b_6option6OptionTINtNtNtB7_11established9substream9SubstreamB17_EIB1J_uEEEEINtNtNtB1b_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutBb_ Line | Count | Source | 1734 | 28 | fn index_mut(&mut self, substream_id: SubstreamId) -> &mut TSub { | 1735 | 28 | &mut self | 1736 | 28 | .inner | 1737 | 28 | .substreams | 1738 | 28 | .get_mut(&substream_id.0) | 1739 | 28 | .unwrap_or_else(|| panic!()) | 1740 | 28 | .user_data | 1741 | 28 | } |
Unexecuted instantiation: _RNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1c_6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1K_IB1K_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1c_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs1_0ppEINtB5_5YamuxppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutBb_ Unexecuted instantiation: _RNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_5YamuxNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB18_EIB1I_IB1I_NtNtB9_10collection11SubstreamIdEEEEEINtNtNtB1M_3ops5index8IndexMutNtB5_11SubstreamIdE9index_mutCsiUjFBJteJ7x_17smoldot_full_node |
1742 | | } |
1743 | | |
1744 | | impl<TNow, TSub> fmt::Debug for Yamux<TNow, TSub> |
1745 | | where |
1746 | | TSub: fmt::Debug, |
1747 | | { |
1748 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1749 | 0 | struct List<'a, TNow, TSub>(&'a Yamux<TNow, TSub>); |
1750 | 0 | impl<'a, TNow, TSub> fmt::Debug for List<'a, TNow, TSub> |
1751 | 0 | where |
1752 | 0 | TSub: fmt::Debug, |
1753 | 0 | { |
1754 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
1755 | 0 | f.debug_list() |
1756 | 0 | .entries(self.0.inner.substreams.values().map(|v| &v.user_data)) Unexecuted instantiation: _RNCNvXININvXs2_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtBd_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB7_4ListppEB1i_3fmt0Bj_ Unexecuted instantiation: _RNCNvXININvXs2_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtBd_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB7_4ListppEB1j_3fmt0Bj_ |
1757 | 0 | .finish() |
1758 | 0 | } Unexecuted instantiation: _RNvXININvXs2_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtBb_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB5_4ListppEB1g_3fmtBh_ Unexecuted instantiation: _RNvXININvXs2_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtBb_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmt0ppEINtB5_4ListppEB1h_3fmtBh_ |
1759 | 0 | } |
1760 | 0 |
|
1761 | 0 | f.debug_struct("Yamux") |
1762 | 0 | .field("substreams", &List(self)) |
1763 | 0 | .finish() |
1764 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs2_0ppEINtB5_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs2_0ppEINtB5_5YamuxppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ |
1765 | | } |
1766 | | |
1767 | | /// Identifier of a substream in the context of a connection. |
1768 | | #[derive(Debug, Copy, Clone, Ord, PartialOrd, Eq, PartialEq, Hash, derive_more::From)] |
1769 | | pub struct SubstreamId(NonZeroU32); |
1770 | | |
1771 | | impl SubstreamId { |
1772 | | /// Value that compares inferior or equal to all possible values. |
1773 | | pub const MIN: Self = Self(match NonZeroU32::new(1) { |
1774 | | Some(v) => v, |
1775 | | None => unreachable!(), |
1776 | | }); |
1777 | | |
1778 | | /// Value that compares superior or equal to all possible values. |
1779 | | pub const MAX: Self = Self(match NonZeroU32::new(u32::MAX) { |
1780 | | Some(v) => v, |
1781 | | None => unreachable!(), |
1782 | | }); |
1783 | | } |
1784 | | |
1785 | | /// Details about the incoming data. |
1786 | | #[must_use] |
1787 | | #[derive(Debug)] |
1788 | | pub enum ReadWriteOutcome<'a, TNow, TSub> |
1789 | | where |
1790 | | TNow: Clone + cmp::Ord, |
1791 | | { |
1792 | | /// Nothing in particular happened. |
1793 | | Idle { |
1794 | | /// The [`Yamux`] state machine yielded back. |
1795 | | yamux: Yamux<TNow, TSub>, |
1796 | | }, |
1797 | | |
1798 | | /// Remote has requested to open a new substream. |
1799 | | /// |
1800 | | /// After this has been received, either [`Yamux::accept_pending_substream`] or |
1801 | | /// [`Yamux::reject_pending_substream`] needs to be called in order to accept or reject |
1802 | | /// this substream. [`Yamux::read_write`] will stop reading incoming data before this is done. |
1803 | | /// |
1804 | | /// Note that this can never happen after [`Yamux::send_goaway`] has been called, as all |
1805 | | /// substreams are then automatically rejected. |
1806 | | IncomingSubstream { |
1807 | | /// The [`Yamux`] state machine yielded back. |
1808 | | // TODO: use an accept/reject wrapper instead |
1809 | | yamux: Yamux<TNow, TSub>, |
1810 | | }, |
1811 | | |
1812 | | /// Received data corresponding to a substream. |
1813 | | ProcessSubstream { |
1814 | | /// Object allowing reading and writing data from/to the given substream. |
1815 | | substream_read_write: SubstreamReadWrite<'a, TNow, TSub>, |
1816 | | }, |
1817 | | |
1818 | | /// Remote has asked to reset a substream. |
1819 | | StreamReset { |
1820 | | /// The [`Yamux`] state machine yielded back. |
1821 | | yamux: Yamux<TNow, TSub>, |
1822 | | /// Substream that has been reset. |
1823 | | substream_id: SubstreamId, |
1824 | | }, |
1825 | | |
1826 | | /// Received a "go away" request. This means that it is now forbidden to open new outbound |
1827 | | /// substreams. It is still allowed to send and receive data on existing substreams, and the |
1828 | | /// remote is still allowed to open substreams. |
1829 | | GoAway { |
1830 | | /// The [`Yamux`] state machine yielded back. |
1831 | | yamux: Yamux<TNow, TSub>, |
1832 | | /// Error code sent by the remote. |
1833 | | code: GoAwayErrorCode, |
1834 | | /// List of all outgoing substreams that haven't been acknowledged by the remote yet. |
1835 | | /// These substreams are considered as reset, similar to |
1836 | | /// [`ReadWriteOutcome::StreamReset`]. |
1837 | | reset_substreams: Vec<SubstreamId>, |
1838 | | }, |
1839 | | |
1840 | | /// Received a response to a ping that has been sent out earlier. |
1841 | | /// |
1842 | | /// If multiple pings have been sent out simultaneously, they are always answered in the same |
1843 | | /// order as they have been sent out. |
1844 | | PingResponse { |
1845 | | /// The [`Yamux`] state machine yielded back. |
1846 | | yamux: Yamux<TNow, TSub>, |
1847 | | }, |
1848 | | } |
1849 | | |
1850 | | pub struct SubstreamReadWrite<'a, TNow, TSub> |
1851 | | where |
1852 | | TNow: Clone + cmp::Ord, |
1853 | | { |
1854 | | outer_read_write: &'a mut ReadWrite<TNow>, |
1855 | | inner_read_write: ReadWrite<TNow>, |
1856 | | yamux: Yamux<TNow, TSub>, |
1857 | | substream_id: NonZeroU32, |
1858 | | |
1859 | | /// Size of the write buffers of the substream prior to its processing. |
1860 | | write_buffers_len_before: usize, |
1861 | | } |
1862 | | |
1863 | | impl<'a, TNow, TSub> SubstreamReadWrite<'a, TNow, TSub> |
1864 | | where |
1865 | | TNow: Clone + cmp::Ord, |
1866 | | { |
1867 | | /// Returns the identifier of the substream being read/written. |
1868 | 39 | pub fn substream_id(&self) -> SubstreamId { |
1869 | 39 | SubstreamId(self.substream_id) |
1870 | 39 | } _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE12substream_idBb_ Line | Count | Source | 1868 | 39 | pub fn substream_id(&self) -> SubstreamId { | 1869 | 39 | SubstreamId(self.substream_id) | 1870 | 39 | } |
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE12substream_idCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE12substream_idBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE12substream_idCsiUjFBJteJ7x_17smoldot_full_node |
1871 | | |
1872 | 329 | pub fn read_write(&mut self) -> &mut ReadWrite<TNow> { |
1873 | 329 | &mut self.inner_read_write |
1874 | 329 | } _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE10read_writeBb_ Line | Count | Source | 1872 | 329 | pub fn read_write(&mut self) -> &mut ReadWrite<TNow> { | 1873 | 329 | &mut self.inner_read_write | 1874 | 329 | } |
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE10read_writeBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node |
1875 | | |
1876 | | /// Returns the user data associated to the substream being read/written. |
1877 | 0 | pub fn user_data(&self) -> &TSub { |
1878 | 0 | &self |
1879 | 0 | .yamux |
1880 | 0 | .inner |
1881 | 0 | .substreams |
1882 | 0 | .get(&self.substream_id) |
1883 | 0 | .unwrap() |
1884 | 0 | .user_data |
1885 | 0 | } Unexecuted instantiation: _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE9user_dataBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE9user_dataBb_ |
1886 | | |
1887 | | /// Returns the user data associated to the substream being read/written. |
1888 | 653 | pub fn user_data_mut(&mut self) -> &mut TSub { |
1889 | 653 | &mut self |
1890 | 653 | .yamux |
1891 | 653 | .inner |
1892 | 653 | .substreams |
1893 | 653 | .get_mut(&self.substream_id) |
1894 | 653 | .unwrap() |
1895 | 653 | .user_data |
1896 | 653 | } _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE13user_data_mutBb_ Line | Count | Source | 1888 | 653 | pub fn user_data_mut(&mut self) -> &mut TSub { | 1889 | 653 | &mut self | 1890 | 653 | .yamux | 1891 | 653 | .inner | 1892 | 653 | .substreams | 1893 | 653 | .get_mut(&self.substream_id) | 1894 | 653 | .unwrap() | 1895 | 653 | .user_data | 1896 | 653 | } |
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE13user_data_mutCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE13user_data_mutBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE13user_data_mutCsiUjFBJteJ7x_17smoldot_full_node |
1897 | | |
1898 | 329 | pub fn finish(mut self) -> Yamux<TNow, TSub> { |
1899 | | let Substream { |
1900 | 329 | inbound, |
1901 | 329 | state: |
1902 | 329 | SubstreamState::Healthy { |
1903 | 329 | first_message_queued, |
1904 | 329 | remote_allowed_window, |
1905 | 329 | local_write_close, |
1906 | 329 | remote_write_closed, |
1907 | 329 | read_buffer, |
1908 | 329 | expected_incoming_bytes, |
1909 | 329 | substreams_wake_up_key, |
1910 | | .. |
1911 | | }, |
1912 | | .. |
1913 | 329 | } = &mut self |
1914 | 329 | .yamux |
1915 | 329 | .inner |
1916 | 329 | .substreams |
1917 | 329 | .get_mut(&self.substream_id) |
1918 | 329 | .unwrap() |
1919 | | else { |
1920 | 0 | unreachable!() |
1921 | | }; |
1922 | | |
1923 | | // Update the reading part of the substream's internal state. |
1924 | 329 | *read_buffer = mem::take(&mut self.inner_read_write.incoming_buffer); |
1925 | 329 | *expected_incoming_bytes = Some(self.inner_read_write.expected_incoming_bytes.unwrap_or(0)); |
1926 | | |
1927 | | // If the substream requests more data than the remote is allowed to send, send out a |
1928 | | // window frame. This ensures that the reading can never stall due to window frames issues. |
1929 | 329 | if let Some(mut missing_window_size0 ) = NonZeroUsize::new( |
1930 | 329 | expected_incoming_bytes |
1931 | 329 | .unwrap() |
1932 | 329 | .saturating_sub(usize::try_from(*remote_allowed_window).unwrap_or(usize::MAX)), |
1933 | 329 | ) { |
1934 | | // Don't send super tiny window frames. |
1935 | 0 | if missing_window_size.get() < 1024 { |
1936 | 0 | missing_window_size = NonZeroUsize::new(1024).unwrap(); |
1937 | 0 | } |
1938 | | |
1939 | 0 | let missing_window_size = |
1940 | 0 | NonZeroU64::new(u64::try_from(missing_window_size.get()).unwrap_or(u64::MAX)) |
1941 | 0 | .unwrap(); |
1942 | 0 |
|
1943 | 0 | self.yamux |
1944 | 0 | .inner |
1945 | 0 | .window_frames_to_send |
1946 | 0 | .entry(self.substream_id) |
1947 | 0 | .and_modify(|v| { |
1948 | 0 | if *v < missing_window_size { |
1949 | 0 | *v = missing_window_size; |
1950 | 0 | } |
1951 | 0 | }) Unexecuted instantiation: _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finish0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finish0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finish0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finish0CsiUjFBJteJ7x_17smoldot_full_node |
1952 | 0 | .or_insert(missing_window_size); |
1953 | 0 |
|
1954 | 0 | self.outer_read_write.wake_up_asap(); |
1955 | 329 | } |
1956 | | |
1957 | | // When to wake up the substream for reading again. |
1958 | 329 | debug_assert!(substreams_wake_up_key.is_none()); |
1959 | 329 | let will_wake_up_read_again = match ( |
1960 | 329 | self.inner_read_write.read_bytes, |
1961 | 329 | &self.inner_read_write.wake_up_after, |
1962 | | ) { |
1963 | | (0, None) => { |
1964 | | // Don't wake it up for reading. |
1965 | 122 | false |
1966 | | } |
1967 | 39 | (0, Some(when)) if *when > self.outer_read_write.now => { |
1968 | 37 | // Wake it up at `when`. |
1969 | 37 | self.outer_read_write.wake_up_after(when); |
1970 | 37 | self.yamux |
1971 | 37 | .inner |
1972 | 37 | .substreams_wake_up |
1973 | 37 | .insert((Some(when.clone()), self.substream_id)); |
1974 | 37 | *substreams_wake_up_key = Some(Some(when.clone())); |
1975 | 37 | true |
1976 | | } |
1977 | | _ => { |
1978 | | // Non-zero bytes written or `when <= now`. |
1979 | | // Wake it up as soon as possible so it continues reading from its read buffer. |
1980 | 170 | self.outer_read_write.wake_up_asap(); |
1981 | 170 | self.yamux |
1982 | 170 | .inner |
1983 | 170 | .substreams_wake_up |
1984 | 170 | .insert((None, self.substream_id)); |
1985 | 170 | *substreams_wake_up_key = Some(None); |
1986 | 170 | true |
1987 | | } |
1988 | | }; |
1989 | | |
1990 | | // Update the `local_write_close` state of the substream. |
1991 | 329 | if matches!10 (*local_write_close, SubstreamStateLocalWrite::Open) |
1992 | 319 | && self.inner_read_write.write_bytes_queueable.is_none() |
1993 | 6 | { |
1994 | 6 | *local_write_close = SubstreamStateLocalWrite::FinDesired; |
1995 | 323 | } |
1996 | | |
1997 | | // Sanity check. |
1998 | 329 | debug_assert!(matches!0 ( |
1999 | 329 | self.yamux.inner.outgoing, |
2000 | | Outgoing::WritingOut { .. } |
2001 | | )); |
2002 | | |
2003 | | // Process the writing side of the substream. |
2004 | 329 | if self.inner_read_write.write_bytes_queued != self.write_buffers_len_before |
2005 | 67 | && matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty()) |
2006 | 67 | && self |
2007 | 67 | .inner_read_write |
2008 | 67 | .write_bytes_queueable |
2009 | 67 | .map_or(false, |n| n != 0) _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs_0Bd_ Line | Count | Source | 2009 | 67 | .map_or(false, |n| n != 0) |
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs_0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs_0CsiUjFBJteJ7x_17smoldot_full_node |
2010 | 67 | { |
2011 | 67 | // Substream has written out data, but might have more to write. Put back the write |
2012 | 67 | // buffers for next time. |
2013 | 67 | // Note that there's no need to insert the substream in `substreams_write_read`, as |
2014 | 67 | // as the `Outgoing::PreparingDataFrame` state guarantees that this substream will be |
2015 | 67 | // processed again as soon as possible. |
2016 | 67 | self.yamux.inner.outgoing = Outgoing::PreparingDataFrame { |
2017 | 67 | substream_id: self.substream_id, |
2018 | 67 | write_buffers: mem::take(&mut self.inner_read_write.write_buffers), |
2019 | 67 | }; |
2020 | 67 | self.outer_read_write.wake_up_asap(); |
2021 | 262 | } else if self.inner_read_write.write_bytes_queued != 0 |
2022 | 196 | || matches!193 (*local_write_close, SubstreamStateLocalWrite::FinDesired) |
2023 | | { |
2024 | | // Substream hasn't written anything more. A data frame is ready. Flush its data. |
2025 | | |
2026 | | // The substream should only have been able to write data if we're not currently |
2027 | | // writing out. If this assertion fails, it indicates that the substream hasn't |
2028 | | // respected the `ReadWrite` contract. |
2029 | 69 | debug_assert!( |
2030 | 69 | matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty()) |
2031 | | ); |
2032 | | |
2033 | 69 | let mut write_buffers = mem::take(&mut self.inner_read_write.write_buffers); |
2034 | 69 | |
2035 | 69 | // When preparing the inner `ReadWrite` object, the `write_buffers` are set to |
2036 | 69 | // contain one empty entry of enough capacity to hold the header. There is a high |
2037 | 69 | // chance that this empty entry is still there, but if it's not we add it now. |
2038 | 69 | if write_buffers.first().map_or(true, |b| !b.is_empty()) { _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs0_0Bd_ Line | Count | Source | 2038 | 69 | if write_buffers.first().map_or(true, |b| !b.is_empty()) { |
Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs0_0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs0_0CsiUjFBJteJ7x_17smoldot_full_node |
2039 | 0 | write_buffers.insert(0, Vec::with_capacity(12)); |
2040 | 69 | } |
2041 | | |
2042 | 69 | write_buffers[0].extend_from_slice(&header::encode( |
2043 | 69 | &header::DecodedYamuxHeader::Data { |
2044 | 69 | syn: !*first_message_queued && !*inbound40 , |
2045 | 69 | ack: !*first_message_queued && *inbound40 , |
2046 | 69 | fin: matches!63 (*local_write_close, SubstreamStateLocalWrite::FinDesired), |
2047 | | rst: false, |
2048 | 69 | stream_id: self.substream_id, |
2049 | 69 | length: { |
2050 | 69 | // Because the number of queuable bytes is capped by the value in |
2051 | 69 | // `Config::max_out_data_frame_size`, we are guaranteed that the length |
2052 | 69 | // to write out fits in a `u32`. |
2053 | 69 | debug_assert!(self.yamux.inner.max_out_data_frame_size.get() <= u32::MAX); |
2054 | 69 | u32::try_from(self.inner_read_write.write_bytes_queued).unwrap() |
2055 | | }, |
2056 | | }, |
2057 | | )); |
2058 | 69 | if matches!63 (*local_write_close, SubstreamStateLocalWrite::FinDesired) { |
2059 | 6 | *local_write_close = SubstreamStateLocalWrite::FinQueued; |
2060 | 63 | } |
2061 | 69 | *first_message_queued = true; |
2062 | 69 | |
2063 | 69 | self.yamux.inner.outgoing = Outgoing::WritingOut { |
2064 | 69 | buffers: write_buffers, |
2065 | 69 | }; |
2066 | 69 | |
2067 | 69 | self.outer_read_write.wake_up_asap(); |
2068 | 69 | |
2069 | 69 | // Re-schedule the substream for writing, as it was maybe waiting for the queue to |
2070 | 69 | // be flushed before writing more data. |
2071 | 69 | self.yamux |
2072 | 69 | .inner |
2073 | 69 | .substreams_write_ready |
2074 | 69 | .insert(self.substream_id); |
2075 | 193 | } else if self.inner_read_write.write_bytes_queueable == Some(0) { |
2076 | 0 | // Substream hasn't written anything because it wasn't able to write anything. |
2077 | 0 | // Re-schedule the substream for when it is possible to write data out. |
2078 | 0 | self.yamux |
2079 | 0 | .inner |
2080 | 0 | .substreams_write_ready |
2081 | 0 | .insert(self.substream_id); |
2082 | 193 | } else { |
2083 | 193 | // Substream has nothing to write. |
2084 | 193 | } |
2085 | | |
2086 | | // Mark the substream as dead if it won't ever wake up again. |
2087 | 329 | if matches!313 (local_write_close, SubstreamStateLocalWrite::FinQueued) |
2088 | 16 | && *remote_write_closed |
2089 | 3 | && !will_wake_up_read_again |
2090 | 2 | && !self |
2091 | 2 | .yamux |
2092 | 2 | .inner |
2093 | 2 | .substreams_write_ready |
2094 | 2 | .contains(&self.substream_id) |
2095 | 0 | && !matches!(self.yamux.inner.outgoing, Outgoing::PreparingDataFrame { |
2096 | 0 | substream_id, |
2097 | | .. |
2098 | 0 | } if substream_id == self.substream_id) |
2099 | | { |
2100 | 1 | let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id); |
2101 | 1 | debug_assert!(_was_inserted); |
2102 | 1 | debug_assert!(!self |
2103 | 1 | .yamux |
2104 | 1 | .inner |
2105 | 1 | .substreams_wake_up |
2106 | 1 | .iter() |
2107 | 1 | .any(|(_, s)| *s == self.substream_id0 )); Unexecuted instantiation: _RNCNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1r_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1n_EIB1Z_uEEEE6finishs1_0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1s_6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB20_IB20_NtNtBb_10collection11SubstreamIdEEEEE6finishs1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteppE6finishs1_0Bd_ Unexecuted instantiation: _RNCNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB7_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB9_11established9substream9SubstreamB1o_EIB1Y_IB1Y_NtNtBb_10collection11SubstreamIdEEEEE6finishs1_0CsiUjFBJteJ7x_17smoldot_full_node |
2108 | 1 | debug_assert!(!self |
2109 | 1 | .yamux |
2110 | 1 | .inner |
2111 | 1 | .substreams_write_ready |
2112 | 1 | .contains(&self.substream_id)); |
2113 | 1 | debug_assert!(!self |
2114 | 1 | .yamux |
2115 | 1 | .inner |
2116 | 1 | .window_frames_to_send |
2117 | 1 | .contains_key(&self.substream_id)); |
2118 | 328 | } |
2119 | | |
2120 | 329 | self.yamux |
2121 | 329 | } _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE6finishBb_ Line | Count | Source | 1898 | 329 | pub fn finish(mut self) -> Yamux<TNow, TSub> { | 1899 | | let Substream { | 1900 | 329 | inbound, | 1901 | 329 | state: | 1902 | 329 | SubstreamState::Healthy { | 1903 | 329 | first_message_queued, | 1904 | 329 | remote_allowed_window, | 1905 | 329 | local_write_close, | 1906 | 329 | remote_write_closed, | 1907 | 329 | read_buffer, | 1908 | 329 | expected_incoming_bytes, | 1909 | 329 | substreams_wake_up_key, | 1910 | | .. | 1911 | | }, | 1912 | | .. | 1913 | 329 | } = &mut self | 1914 | 329 | .yamux | 1915 | 329 | .inner | 1916 | 329 | .substreams | 1917 | 329 | .get_mut(&self.substream_id) | 1918 | 329 | .unwrap() | 1919 | | else { | 1920 | 0 | unreachable!() | 1921 | | }; | 1922 | | | 1923 | | // Update the reading part of the substream's internal state. | 1924 | 329 | *read_buffer = mem::take(&mut self.inner_read_write.incoming_buffer); | 1925 | 329 | *expected_incoming_bytes = Some(self.inner_read_write.expected_incoming_bytes.unwrap_or(0)); | 1926 | | | 1927 | | // If the substream requests more data than the remote is allowed to send, send out a | 1928 | | // window frame. This ensures that the reading can never stall due to window frames issues. | 1929 | 329 | if let Some(mut missing_window_size0 ) = NonZeroUsize::new( | 1930 | 329 | expected_incoming_bytes | 1931 | 329 | .unwrap() | 1932 | 329 | .saturating_sub(usize::try_from(*remote_allowed_window).unwrap_or(usize::MAX)), | 1933 | 329 | ) { | 1934 | | // Don't send super tiny window frames. | 1935 | 0 | if missing_window_size.get() < 1024 { | 1936 | 0 | missing_window_size = NonZeroUsize::new(1024).unwrap(); | 1937 | 0 | } | 1938 | | | 1939 | 0 | let missing_window_size = | 1940 | 0 | NonZeroU64::new(u64::try_from(missing_window_size.get()).unwrap_or(u64::MAX)) | 1941 | 0 | .unwrap(); | 1942 | 0 |
| 1943 | 0 | self.yamux | 1944 | 0 | .inner | 1945 | 0 | .window_frames_to_send | 1946 | 0 | .entry(self.substream_id) | 1947 | 0 | .and_modify(|v| { | 1948 | | if *v < missing_window_size { | 1949 | | *v = missing_window_size; | 1950 | | } | 1951 | 0 | }) | 1952 | 0 | .or_insert(missing_window_size); | 1953 | 0 |
| 1954 | 0 | self.outer_read_write.wake_up_asap(); | 1955 | 329 | } | 1956 | | | 1957 | | // When to wake up the substream for reading again. | 1958 | 329 | debug_assert!(substreams_wake_up_key.is_none()); | 1959 | 329 | let will_wake_up_read_again = match ( | 1960 | 329 | self.inner_read_write.read_bytes, | 1961 | 329 | &self.inner_read_write.wake_up_after, | 1962 | | ) { | 1963 | | (0, None) => { | 1964 | | // Don't wake it up for reading. | 1965 | 122 | false | 1966 | | } | 1967 | 39 | (0, Some(when)) if *when > self.outer_read_write.now => { | 1968 | 37 | // Wake it up at `when`. | 1969 | 37 | self.outer_read_write.wake_up_after(when); | 1970 | 37 | self.yamux | 1971 | 37 | .inner | 1972 | 37 | .substreams_wake_up | 1973 | 37 | .insert((Some(when.clone()), self.substream_id)); | 1974 | 37 | *substreams_wake_up_key = Some(Some(when.clone())); | 1975 | 37 | true | 1976 | | } | 1977 | | _ => { | 1978 | | // Non-zero bytes written or `when <= now`. | 1979 | | // Wake it up as soon as possible so it continues reading from its read buffer. | 1980 | 170 | self.outer_read_write.wake_up_asap(); | 1981 | 170 | self.yamux | 1982 | 170 | .inner | 1983 | 170 | .substreams_wake_up | 1984 | 170 | .insert((None, self.substream_id)); | 1985 | 170 | *substreams_wake_up_key = Some(None); | 1986 | 170 | true | 1987 | | } | 1988 | | }; | 1989 | | | 1990 | | // Update the `local_write_close` state of the substream. | 1991 | 329 | if matches!10 (*local_write_close, SubstreamStateLocalWrite::Open) | 1992 | 319 | && self.inner_read_write.write_bytes_queueable.is_none() | 1993 | 6 | { | 1994 | 6 | *local_write_close = SubstreamStateLocalWrite::FinDesired; | 1995 | 323 | } | 1996 | | | 1997 | | // Sanity check. | 1998 | 329 | debug_assert!(matches!0 ( | 1999 | 329 | self.yamux.inner.outgoing, | 2000 | | Outgoing::WritingOut { .. } | 2001 | | )); | 2002 | | | 2003 | | // Process the writing side of the substream. | 2004 | 329 | if self.inner_read_write.write_bytes_queued != self.write_buffers_len_before | 2005 | 67 | && matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty()) | 2006 | 67 | && self | 2007 | 67 | .inner_read_write | 2008 | 67 | .write_bytes_queueable | 2009 | 67 | .map_or(false, |n| n != 0) | 2010 | 67 | { | 2011 | 67 | // Substream has written out data, but might have more to write. Put back the write | 2012 | 67 | // buffers for next time. | 2013 | 67 | // Note that there's no need to insert the substream in `substreams_write_read`, as | 2014 | 67 | // as the `Outgoing::PreparingDataFrame` state guarantees that this substream will be | 2015 | 67 | // processed again as soon as possible. | 2016 | 67 | self.yamux.inner.outgoing = Outgoing::PreparingDataFrame { | 2017 | 67 | substream_id: self.substream_id, | 2018 | 67 | write_buffers: mem::take(&mut self.inner_read_write.write_buffers), | 2019 | 67 | }; | 2020 | 67 | self.outer_read_write.wake_up_asap(); | 2021 | 262 | } else if self.inner_read_write.write_bytes_queued != 0 | 2022 | 196 | || matches!193 (*local_write_close, SubstreamStateLocalWrite::FinDesired) | 2023 | | { | 2024 | | // Substream hasn't written anything more. A data frame is ready. Flush its data. | 2025 | | | 2026 | | // The substream should only have been able to write data if we're not currently | 2027 | | // writing out. If this assertion fails, it indicates that the substream hasn't | 2028 | | // respected the `ReadWrite` contract. | 2029 | 69 | debug_assert!( | 2030 | 69 | matches!(&self.yamux.inner.outgoing, Outgoing::WritingOut { buffers } if buffers.is_empty()) | 2031 | | ); | 2032 | | | 2033 | 69 | let mut write_buffers = mem::take(&mut self.inner_read_write.write_buffers); | 2034 | 69 | | 2035 | 69 | // When preparing the inner `ReadWrite` object, the `write_buffers` are set to | 2036 | 69 | // contain one empty entry of enough capacity to hold the header. There is a high | 2037 | 69 | // chance that this empty entry is still there, but if it's not we add it now. | 2038 | 69 | if write_buffers.first().map_or(true, |b| !b.is_empty()) { | 2039 | 0 | write_buffers.insert(0, Vec::with_capacity(12)); | 2040 | 69 | } | 2041 | | | 2042 | 69 | write_buffers[0].extend_from_slice(&header::encode( | 2043 | 69 | &header::DecodedYamuxHeader::Data { | 2044 | 69 | syn: !*first_message_queued && !*inbound40 , | 2045 | 69 | ack: !*first_message_queued && *inbound40 , | 2046 | 69 | fin: matches!63 (*local_write_close, SubstreamStateLocalWrite::FinDesired), | 2047 | | rst: false, | 2048 | 69 | stream_id: self.substream_id, | 2049 | 69 | length: { | 2050 | 69 | // Because the number of queuable bytes is capped by the value in | 2051 | 69 | // `Config::max_out_data_frame_size`, we are guaranteed that the length | 2052 | 69 | // to write out fits in a `u32`. | 2053 | 69 | debug_assert!(self.yamux.inner.max_out_data_frame_size.get() <= u32::MAX); | 2054 | 69 | u32::try_from(self.inner_read_write.write_bytes_queued).unwrap() | 2055 | | }, | 2056 | | }, | 2057 | | )); | 2058 | 69 | if matches!63 (*local_write_close, SubstreamStateLocalWrite::FinDesired) { | 2059 | 6 | *local_write_close = SubstreamStateLocalWrite::FinQueued; | 2060 | 63 | } | 2061 | 69 | *first_message_queued = true; | 2062 | 69 | | 2063 | 69 | self.yamux.inner.outgoing = Outgoing::WritingOut { | 2064 | 69 | buffers: write_buffers, | 2065 | 69 | }; | 2066 | 69 | | 2067 | 69 | self.outer_read_write.wake_up_asap(); | 2068 | 69 | | 2069 | 69 | // Re-schedule the substream for writing, as it was maybe waiting for the queue to | 2070 | 69 | // be flushed before writing more data. | 2071 | 69 | self.yamux | 2072 | 69 | .inner | 2073 | 69 | .substreams_write_ready | 2074 | 69 | .insert(self.substream_id); | 2075 | 193 | } else if self.inner_read_write.write_bytes_queueable == Some(0) { | 2076 | 0 | // Substream hasn't written anything because it wasn't able to write anything. | 2077 | 0 | // Re-schedule the substream for when it is possible to write data out. | 2078 | 0 | self.yamux | 2079 | 0 | .inner | 2080 | 0 | .substreams_write_ready | 2081 | 0 | .insert(self.substream_id); | 2082 | 193 | } else { | 2083 | 193 | // Substream has nothing to write. | 2084 | 193 | } | 2085 | | | 2086 | | // Mark the substream as dead if it won't ever wake up again. | 2087 | 329 | if matches!313 (local_write_close, SubstreamStateLocalWrite::FinQueued) | 2088 | 16 | && *remote_write_closed | 2089 | 3 | && !will_wake_up_read_again | 2090 | 2 | && !self | 2091 | 2 | .yamux | 2092 | 2 | .inner | 2093 | 2 | .substreams_write_ready | 2094 | 2 | .contains(&self.substream_id) | 2095 | 0 | && !matches!(self.yamux.inner.outgoing, Outgoing::PreparingDataFrame { | 2096 | 0 | substream_id, | 2097 | | .. | 2098 | 0 | } if substream_id == self.substream_id) | 2099 | | { | 2100 | 1 | let _was_inserted = self.yamux.inner.dead_substreams.insert(self.substream_id); | 2101 | 1 | debug_assert!(_was_inserted); | 2102 | 1 | debug_assert!(!self | 2103 | 1 | .yamux | 2104 | 1 | .inner | 2105 | 1 | .substreams_wake_up | 2106 | 1 | .iter() | 2107 | 1 | .any(|(_, s)| *s == self.substream_id)); | 2108 | 1 | debug_assert!(!self | 2109 | 1 | .yamux | 2110 | 1 | .inner | 2111 | 1 | .substreams_write_ready | 2112 | 1 | .contains(&self.substream_id)); | 2113 | 1 | debug_assert!(!self | 2114 | 1 | .yamux | 2115 | 1 | .inner | 2116 | 1 | .window_frames_to_send | 2117 | 1 | .contains_key(&self.substream_id)); | 2118 | 328 | } | 2119 | | | 2120 | 329 | self.yamux | 2121 | 329 | } |
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE6finishCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE6finishBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE6finishCsiUjFBJteJ7x_17smoldot_full_node |
2122 | | |
2123 | | /// Resets the substream being processed and returns the underlying [`Yamux`] object. |
2124 | 5 | pub fn reset(self) -> Yamux<TNow, TSub> { |
2125 | 5 | let substream_id = self.substream_id(); |
2126 | 5 | let mut yamux = self.finish(); |
2127 | 5 | match yamux.reset(substream_id) { |
2128 | 4 | Ok(()) => {} |
2129 | 1 | Err(ResetError::AlreadyClosed) => {} |
2130 | 0 | Err(ResetError::AlreadyReset) => debug_assert!(false), |
2131 | | } |
2132 | 5 | yamux |
2133 | 5 | } _RNvMs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1p_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1l_EIB1X_uEEEE5resetBb_ Line | Count | Source | 2124 | 5 | pub fn reset(self) -> Yamux<TNow, TSub> { | 2125 | 5 | let substream_id = self.substream_id(); | 2126 | 5 | let mut yamux = self.finish(); | 2127 | 5 | match yamux.reset(substream_id) { | 2128 | 4 | Ok(()) => {} | 2129 | 1 | Err(ResetError::AlreadyClosed) => {} | 2130 | 0 | Err(ResetError::AlreadyReset) => debug_assert!(false), | 2131 | | } | 2132 | 5 | yamux | 2133 | 5 | } |
Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1q_6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1Y_IB1Y_NtNtB9_10collection11SubstreamIdEEEEE5resetCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteppE5resetBb_ Unexecuted instantiation: _RNvMs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxINtB5_18SubstreamReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionTINtNtNtB7_11established9substream9SubstreamB1m_EIB1W_IB1W_NtNtB9_10collection11SubstreamIdEEEEE5resetCsiUjFBJteJ7x_17smoldot_full_node |
2134 | | } |
2135 | | |
2136 | | impl<'a, TNow, TSub> fmt::Debug for SubstreamReadWrite<'a, TNow, TSub> |
2137 | | where |
2138 | | TNow: Clone + cmp::Ord, |
2139 | | { |
2140 | 0 | fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { |
2141 | 0 | f.debug_struct("SubstreamReadWrite") |
2142 | 0 | .field("substream_id", &self.substream_id) |
2143 | 0 | .finish() |
2144 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxs5_0ppEINtB5_18SubstreamReadWriteppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxs5_0ppEINtB5_18SubstreamReadWriteppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ |
2145 | | } |
2146 | | |
2147 | | /// Error potentially returned by [`Yamux::open_substream`]. |
2148 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsj_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_18OpenSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsj_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_18OpenSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2149 | | pub enum OpenSubstreamError { |
2150 | | /// A `GoAway` frame has been received in the past. |
2151 | | GoAwayReceived, |
2152 | | /// Impossible to allocate a new substream. |
2153 | | NoFreeSubstreamId, |
2154 | | } |
2155 | | |
2156 | | /// Error potentially returned by [`Yamux::reset`]. |
2157 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsl_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_10ResetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsl_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_10ResetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2158 | | pub enum ResetError { |
2159 | | /// Substream was already reset. |
2160 | | AlreadyReset, |
2161 | | /// Substream was already closed. |
2162 | | AlreadyClosed, |
2163 | | } |
2164 | | |
2165 | | /// Error potentially returned by [`Yamux::send_goaway`]. |
2166 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsn_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_15SendGoAwayErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsn_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_15SendGoAwayErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2167 | | pub enum SendGoAwayError { |
2168 | | /// A `GoAway` has already been sent. |
2169 | | AlreadySent, |
2170 | | } |
2171 | | |
2172 | | /// Error potentially returned by [`Yamux::accept_pending_substream`] or |
2173 | | /// [`Yamux::reject_pending_substream`]. |
2174 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsp_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_21PendingSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsp_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_21PendingSubstreamErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2175 | | pub enum PendingSubstreamError { |
2176 | | /// No substream is pending. |
2177 | | NoPendingSubstream, |
2178 | | } |
2179 | | |
2180 | | /// Error while decoding the Yamux stream. |
2181 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsr_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection5yamuxNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsr_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection5yamuxNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2182 | | pub enum Error { |
2183 | | /// Failed to decode an incoming Yamux header. |
2184 | | HeaderDecode(header::YamuxHeaderDecodeError), |
2185 | | /// Received a SYN flag with a substream ID that is of the same side as the local side. |
2186 | | InvalidInboundStreamId(NonZeroU32), |
2187 | | /// Received a SYN flag with a known substream ID. |
2188 | | #[display(fmt = "Received a SYN flag with a known substream ID")] |
2189 | | UnexpectedSyn(NonZeroU32), |
2190 | | /// Remote tried to send more data than it was allowed to. |
2191 | | CreditsExceeded, |
2192 | | /// Number of credits allocated to the local node has overflowed. |
2193 | | LocalCreditsOverflow, |
2194 | | /// Remote sent additional data on a substream after having sent the FIN flag. |
2195 | | WriteAfterFin, |
2196 | | /// Remote has sent a data frame containing data at the same time as a `RST` flag. |
2197 | | DataWithRst, |
2198 | | /// Remote has sent a ping response, but its opaque data didn't match any of the ping that |
2199 | | /// have been sent out in the past. |
2200 | | PingResponseNotMatching, |
2201 | | /// Maximum number of simultaneous RST frames to send out has been exceeded. |
2202 | | MaxSimultaneousRstSubstreamsExceeded, |
2203 | | /// Maximum number of simultaneous PONG frames to send out has been exceeded. |
2204 | | MaxSimultaneousPingsExceeded, |
2205 | | /// The remote should have sent an ACK flag but didn't. |
2206 | | ExpectedAck, |
2207 | | /// The remote sent an ACK flag but shouldn't have. |
2208 | | UnexpectedAck, |
2209 | | /// Received multiple `GoAway` frames. |
2210 | | MultipleGoAways, |
2211 | | } |
2212 | | |
2213 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
2214 | | pub enum DeadSubstreamTy { |
2215 | | ClosedGracefully, |
2216 | | Reset, |
2217 | | } |
2218 | | |
2219 | | /// By default, all new substreams have this implicit window size. |
2220 | | pub const NEW_SUBSTREAMS_FRAME_SIZE: u64 = 256 * 1024; |