/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/single_stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! State machine handling a single TCP or WebSocket libp2p connection. |
19 | | //! |
20 | | //! # About resources allocation and back-pressure |
21 | | //! |
22 | | //! In order to avoid DoS attacks, it is important, in networking code, to make sure that the |
23 | | //! amount of memory allocated directly or indirectly by a connection stays bounded. |
24 | | //! |
25 | | //! The situations in the [`SingleStream`] that lead to an increase in memory consumption are: |
26 | | //! |
27 | | //! 1- On incoming or outgoing substreams. |
28 | | //! 2- When sending a request or receiving a response in a request-response protocol. |
29 | | //! 3- When sending a notification. |
30 | | //! 4- When receiving a request and sending back a response. |
31 | | //! 5- When receiving a notification. |
32 | | //! // TODO: 6- on Yamux ping frames |
33 | | //! |
34 | | //! In order to solve 1-, there exists a maximum number of simultaneous substreams allowed by the |
35 | | //! protocol, thereby guaranteeing that the memory consumption doesn't exceed a certain bound. |
36 | | //! Since receiving a request and a response is a one-time process that occupies an entire |
37 | | //! substream, allocations referenced by points 2- and 4- are also bounded thanks to this limit. |
38 | | //! Request-response protocols enforce a limit to the size of the request and response, again |
39 | | //! guaranteeing a bound on the memory consumption. |
40 | | //! |
41 | | //! In order to solve 3-, always use [`SingleStream::notification_substream_queued_bytes`] in order |
42 | | //! to check the current amount of buffered data before calling |
43 | | //! [`SingleStream::write_notification_unbounded`]. See the documentation of |
44 | | //! [`SingleStream::write_notification_unbounded`] for more details. |
45 | | //! |
46 | | //! In order to solve 5-, // TODO: . |
47 | | //! |
48 | | |
49 | | // TODO: expand docs ^ |
50 | | |
51 | | // TODO: consider implementing on top of multi_stream |
52 | | |
53 | | use super::{ |
54 | | super::{super::read_write::ReadWrite, noise, yamux}, |
55 | | substream::{self, RespondInRequestError}, |
56 | | Config, Event, SubstreamId, SubstreamIdInner, |
57 | | }; |
58 | | |
59 | | use alloc::{boxed::Box, string::String, vec::Vec}; |
60 | | use core::{ |
61 | | fmt, |
62 | | num::{NonZeroU32, NonZeroUsize}, |
63 | | ops::{Add, Index, IndexMut, Sub}, |
64 | | time::Duration, |
65 | | }; |
66 | | use rand_chacha::rand_core::{RngCore as _, SeedableRng as _}; |
67 | | |
68 | | pub use substream::InboundTy; |
69 | | |
70 | | /// State machine of a fully-established connection. |
71 | | pub struct SingleStream<TNow, TSubUd> { |
72 | | /// Encryption layer applied directly on top of the incoming data and outgoing data. |
73 | | encryption: noise::Noise, |
74 | | |
75 | | /// Extra fields. Segregated in order to solve borrowing questions. |
76 | | inner: Box<Inner<TNow, TSubUd>>, |
77 | | } |
78 | | |
79 | | /// Extra fields. Segregated in order to solve borrowing questions. |
80 | | struct Inner<TNow, TSubUd> { |
81 | | /// State of the various substreams of the connection. |
82 | | /// Consists in a collection of substreams, each of which holding a [`substream::Substream`] |
83 | | /// object, or `None` if the substream has been reset. |
84 | | yamux: yamux::Yamux<TNow, Option<(substream::Substream<TNow>, Option<TSubUd>)>>, |
85 | | |
86 | | /// Substream in [`Inner::yamux`] used for outgoing pings. |
87 | | /// |
88 | | /// Because of the API of [`substream::Substream`] concerning pings, there is no need to |
89 | | /// handle situations where the substream fails to negotiate, as this is handled by making |
90 | | /// outgoing pings error. This substream is therefore constant. |
91 | | /// |
92 | | /// It is possible, however, that the remote resets the ping substream. In other words, this |
93 | | /// substream might not be found in [`Inner::yamux`]. When that happens, all outgoing pings |
94 | | /// are immediately considered as failed. |
95 | | outgoing_pings: yamux::SubstreamId, |
96 | | /// When to start the next ping attempt. |
97 | | next_ping: TNow, |
98 | | /// Source of randomness to generate ping payloads. |
99 | | /// |
100 | | /// Note that we use ChaCha20 because the rest of the code base also uses ChaCha20. This avoids |
101 | | /// unnecessary code being included in the binary and reduces the binary size. |
102 | | ping_payload_randomness: rand_chacha::ChaCha20Rng, |
103 | | |
104 | | /// See [`Config::max_inbound_substreams`]. |
105 | | max_inbound_substreams: usize, |
106 | | /// See [`Config::max_protocol_name_len`]. |
107 | | max_protocol_name_len: usize, |
108 | | /// See [`Config::ping_interval`]. |
109 | | ping_interval: Duration, |
110 | | /// See [`Config::ping_timeout`]. |
111 | | ping_timeout: Duration, |
112 | | } |
113 | | |
114 | | impl<TNow, TSubUd> SingleStream<TNow, TSubUd> |
115 | | where |
116 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
117 | | { |
118 | | /// Reads data coming from the socket, updates the internal state machine, and writes data |
119 | | /// destined to the socket through the [`ReadWrite`]. |
120 | | /// |
121 | | /// In order to avoid unnecessary memory allocations, only one [`Event`] is returned at a time. |
122 | | /// Consequently, this method returns as soon as an event is available, even if the buffers |
123 | | /// haven't finished being read. Call this method in a loop until the number of bytes read and |
124 | | /// written are both 0, and the returned [`Event`] is `None`. |
125 | | /// |
126 | | /// If an error is returned, the socket should be entirely shut down. |
127 | | // TODO: consider exposing an API more similar to the one of substream::Substream::read_write? |
128 | 389 | pub fn read_write( |
129 | 389 | mut self, |
130 | 389 | read_write: &'_ mut ReadWrite<TNow>, |
131 | 389 | ) -> Result<(SingleStream<TNow, TSubUd>, Option<Event<TSubUd>>), Error> { |
132 | 389 | // Start any outgoing ping if necessary. |
133 | 389 | if read_write.now >= self.inner.next_ping { |
134 | 2 | self.inner.next_ping = read_write.now.clone() + self.inner.ping_interval; |
135 | 2 | |
136 | 2 | // It might be that the remote has reset the ping substream, in which case the out ping |
137 | 2 | // substream no longer exists and we immediately consider the ping as failed. |
138 | 2 | if self.inner.yamux.has_substream(self.inner.outgoing_pings) { |
139 | 2 | let mut payload = [0u8; 32]; |
140 | 2 | self.inner.ping_payload_randomness.fill_bytes(&mut payload); |
141 | 2 | self.inner.yamux[self.inner.outgoing_pings] |
142 | 2 | .as_mut() |
143 | 2 | .unwrap() |
144 | 2 | .0 |
145 | 2 | .queue_ping(&payload, read_write.now.clone(), self.inner.ping_timeout); |
146 | 2 | self.inner |
147 | 2 | .yamux |
148 | 2 | .mark_substream_write_ready(self.inner.outgoing_pings); |
149 | 2 | } else { |
150 | 0 | return Ok((self, Some(Event::PingOutFailed))); |
151 | | } |
152 | 387 | } |
153 | 389 | read_write.wake_up_after(&self.inner.next_ping); |
154 | 389 | |
155 | 389 | // If we have both sent and received a GoAway frame, that means that no new substream |
156 | 389 | // can be opened. If in addition to this there is no substream in the connection, |
157 | 389 | // then we can safely close it as a normal termination. |
158 | 389 | // Note that, because we have no guarantee that the remote has received our GoAway |
159 | 389 | // frame yet, it is possible to receive requests for new substreams even after having |
160 | 389 | // sent the GoAway. Because we close the writing side, it is not possible to indicate |
161 | 389 | // to the remote that these new substreams are denied. However, this is not a problem |
162 | 389 | // as the remote interprets our GoAway frame as an automatic refusal of all its pending |
163 | 389 | // substream requests. |
164 | 389 | // TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121 |
165 | 389 | if (self.inner.yamux.len() |
166 | 389 | == if self.inner.yamux.has_substream(self.inner.outgoing_pings) { |
167 | 389 | 1 |
168 | | } else { |
169 | 0 | 0 |
170 | | }) |
171 | 49 | && self.inner.yamux.goaway_sent() |
172 | 0 | && self.inner.yamux.received_goaway().is_some() |
173 | 0 | { |
174 | 0 | read_write.close_write(); |
175 | 389 | } |
176 | | |
177 | | // Note that we treat the reading side being closed the same way as no data being |
178 | | // received. The fact that the remote has closed their writing side is no different |
179 | | // than them leaving their writing side open but no longer send any data at all. |
180 | | // The remote is free to close their writing side at any point if it judges that it |
181 | | // will no longer need to send anymore data. |
182 | | // Note, however, that in principle the remote should have sent a GoAway frame prior |
183 | | // to closing their writing side. But this is not something we check or really care |
184 | | // about. |
185 | | |
186 | | // Pass the `read_write` through the Noise state machine. |
187 | 389 | let mut decrypted_read_write = self |
188 | 389 | .encryption |
189 | 389 | .read_write(read_write) |
190 | 389 | .map_err(Error::Noise)?0 ; |
191 | | |
192 | | // Pass the Noise decrypted stream through the Yamux state machine. |
193 | 389 | let yamux_rw_outcome = self |
194 | 389 | .inner |
195 | 389 | .yamux |
196 | 389 | .read_write(&mut decrypted_read_write) |
197 | 389 | .map_err(Error::Yamux)?0 ; |
198 | | |
199 | 389 | match yamux_rw_outcome { |
200 | 40 | yamux::ReadWriteOutcome::Idle { yamux } => { |
201 | 40 | self.inner.yamux = yamux; |
202 | 40 | |
203 | 40 | // Nothing happened, and thus there is nothing more to do. |
204 | 40 | drop(decrypted_read_write); |
205 | 40 | return Ok((self, None)); |
206 | | } |
207 | 20 | yamux::ReadWriteOutcome::IncomingSubstream { mut yamux } => { |
208 | 20 | debug_assert!(!yamux.goaway_queued_or_sent()); |
209 | | |
210 | | // Receive a request from the remote for a new incoming substream. |
211 | | // These requests are automatically accepted unless the total limit to the |
212 | | // number of substreams has been reached. |
213 | | // Note that `num_inbound()` counts substreams that have been closed but not |
214 | | // yet removed from the state machine. This can affect the actual limit in a |
215 | | // subtle way. At the time of writing of this comment the limit should be |
216 | | // properly enforced, however it is not considered problematic if it weren't. |
217 | 20 | if yamux.num_inbound() >= self.inner.max_inbound_substreams { |
218 | 0 | // Can only error if there's no incoming substream, which we know for sure |
219 | 0 | // is the case here. |
220 | 0 | yamux |
221 | 0 | .reject_pending_substream() |
222 | 0 | .unwrap_or_else(|_| panic!()); Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_write0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_write0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_write0CsiUjFBJteJ7x_17smoldot_full_node |
223 | 20 | } else { |
224 | 20 | // Can only error if there's no incoming substream, which we know for sure |
225 | 20 | // is the case here. |
226 | 20 | yamux |
227 | 20 | .accept_pending_substream(Some(( |
228 | 20 | substream::Substream::ingoing(self.inner.max_protocol_name_len), |
229 | 20 | None, |
230 | 20 | ))) |
231 | 20 | .unwrap_or_else(|_| panic!()0 ); Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes_0CsiUjFBJteJ7x_17smoldot_full_node |
232 | 20 | } |
233 | | |
234 | 20 | self.inner.yamux = yamux; |
235 | 20 | |
236 | 20 | drop(decrypted_read_write); |
237 | 20 | return Ok((self, None)); |
238 | | } |
239 | | yamux::ReadWriteOutcome::ProcessSubstream { |
240 | 329 | mut substream_read_write, |
241 | 329 | } => { |
242 | 329 | // The Yamux state machine needs to process a substream. |
243 | 329 | |
244 | 329 | // Temporarily extract the substream's fields to put them back later. |
245 | 329 | let (state_machine, mut substream_user_data) = |
246 | 329 | substream_read_write.user_data_mut().take().unwrap(); |
247 | 329 | let (state_machine_update, event) = |
248 | 329 | state_machine.read_write(substream_read_write.read_write()); |
249 | 329 | |
250 | 329 | let event_to_yield = event.map(|ev| { |
251 | 34 | Self::pass_through_substream_event( |
252 | 34 | substream_read_write.substream_id(), |
253 | 34 | &mut substream_user_data, |
254 | 34 | ev, |
255 | 34 | ) |
256 | 329 | }); _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes0_0Bc_ Line | Count | Source | 250 | 34 | let event_to_yield = event.map(|ev| { | 251 | 34 | Self::pass_through_substream_event( | 252 | 34 | substream_read_write.substream_id(), | 253 | 34 | &mut substream_user_data, | 254 | 34 | ev, | 255 | 34 | ) | 256 | 34 | }); |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes0_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes0_0CsiUjFBJteJ7x_17smoldot_full_node |
257 | 329 | |
258 | 329 | match state_machine_update { |
259 | 324 | Some(s) => { |
260 | 324 | *substream_read_write.user_data_mut() = Some((s, substream_user_data)); |
261 | 324 | self.inner.yamux = substream_read_write.finish(); |
262 | 324 | } |
263 | 5 | None => { |
264 | 5 | self.inner.yamux = substream_read_write.reset(); |
265 | 5 | } |
266 | | } |
267 | | |
268 | 329 | if let Some(event_to_yield34 ) = event_to_yield { |
269 | 34 | drop(decrypted_read_write); |
270 | 34 | return Ok((self, Some(event_to_yield))); |
271 | 295 | } |
272 | | } |
273 | 0 | yamux::ReadWriteOutcome::StreamReset { yamux, .. } => { |
274 | 0 | self.inner.yamux = yamux; |
275 | 0 | decrypted_read_write.wake_up_asap(); |
276 | 0 | } |
277 | 0 | yamux::ReadWriteOutcome::GoAway { yamux, .. } => { |
278 | 0 | self.inner.yamux = yamux; |
279 | 0 | drop(decrypted_read_write); |
280 | 0 | return Ok((self, Some(Event::NewOutboundSubstreamsForbidden))); |
281 | | } |
282 | | yamux::ReadWriteOutcome::PingResponse { .. } => { |
283 | | // Can only happen if we send out Yamux pings, which we never do. |
284 | 0 | unreachable!() |
285 | | } |
286 | | } |
287 | | |
288 | 295 | drop(decrypted_read_write); |
289 | 295 | |
290 | 295 | // Substreams that have been closed or reset aren't immediately removed the yamux state |
291 | 295 | // machine. They must be removed manually, which is what is done here. |
292 | 295 | // TODO: could be optimized by doing it only through a Yamux event? this is the case for StreamReset but not for graceful streams closures |
293 | 295 | let dead_substream_ids = self |
294 | 295 | .inner |
295 | 295 | .yamux |
296 | 295 | .dead_substreams() |
297 | 295 | .map(|(id, death_ty, _)| (id, death_ty)2 ) _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes1_0Bc_ Line | Count | Source | 297 | 2 | .map(|(id, death_ty, _)| (id, death_ty)) |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes1_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes1_0CsiUjFBJteJ7x_17smoldot_full_node |
298 | 295 | .collect::<Vec<_>>(); |
299 | 297 | for (dead_substream_id, death_ty2 ) in dead_substream_ids { |
300 | 2 | match death_ty { |
301 | | yamux::DeadSubstreamTy::Reset => { |
302 | | // If the substream was reset by the remote, then the substream state |
303 | | // machine will still be `Some`. |
304 | 0 | if let Some((state_machine, mut user_data)) = |
305 | 2 | self.inner.yamux.remove_dead_substream(dead_substream_id) |
306 | | { |
307 | | // TODO: consider changing this `state_machine.reset()` function to be a state transition of the substream state machine (that doesn't take ownership), to simplify the implementation of both the substream state machine and this code |
308 | 0 | if let Some(event) = state_machine.reset() { |
309 | 0 | return Ok(( |
310 | 0 | self, |
311 | 0 | Some(Self::pass_through_substream_event( |
312 | 0 | dead_substream_id, |
313 | 0 | &mut user_data, |
314 | 0 | event, |
315 | 0 | )), |
316 | 0 | )); |
317 | 0 | } |
318 | 2 | }; |
319 | | |
320 | | // Removing a dead substream might lead to Yamux being able to process more |
321 | | // incoming data. As such, we loop again. |
322 | 2 | read_write.wake_up_asap(); |
323 | | } |
324 | 0 | yamux::DeadSubstreamTy::ClosedGracefully => { |
325 | 0 | self.inner.yamux.remove_dead_substream(dead_substream_id); |
326 | 0 | } |
327 | | } |
328 | | } |
329 | | |
330 | 295 | Ok((self, None)) |
331 | 389 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writeBa_ Line | Count | Source | 128 | 389 | pub fn read_write( | 129 | 389 | mut self, | 130 | 389 | read_write: &'_ mut ReadWrite<TNow>, | 131 | 389 | ) -> Result<(SingleStream<TNow, TSubUd>, Option<Event<TSubUd>>), Error> { | 132 | 389 | // Start any outgoing ping if necessary. | 133 | 389 | if read_write.now >= self.inner.next_ping { | 134 | 2 | self.inner.next_ping = read_write.now.clone() + self.inner.ping_interval; | 135 | 2 | | 136 | 2 | // It might be that the remote has reset the ping substream, in which case the out ping | 137 | 2 | // substream no longer exists and we immediately consider the ping as failed. | 138 | 2 | if self.inner.yamux.has_substream(self.inner.outgoing_pings) { | 139 | 2 | let mut payload = [0u8; 32]; | 140 | 2 | self.inner.ping_payload_randomness.fill_bytes(&mut payload); | 141 | 2 | self.inner.yamux[self.inner.outgoing_pings] | 142 | 2 | .as_mut() | 143 | 2 | .unwrap() | 144 | 2 | .0 | 145 | 2 | .queue_ping(&payload, read_write.now.clone(), self.inner.ping_timeout); | 146 | 2 | self.inner | 147 | 2 | .yamux | 148 | 2 | .mark_substream_write_ready(self.inner.outgoing_pings); | 149 | 2 | } else { | 150 | 0 | return Ok((self, Some(Event::PingOutFailed))); | 151 | | } | 152 | 387 | } | 153 | 389 | read_write.wake_up_after(&self.inner.next_ping); | 154 | 389 | | 155 | 389 | // If we have both sent and received a GoAway frame, that means that no new substream | 156 | 389 | // can be opened. If in addition to this there is no substream in the connection, | 157 | 389 | // then we can safely close it as a normal termination. | 158 | 389 | // Note that, because we have no guarantee that the remote has received our GoAway | 159 | 389 | // frame yet, it is possible to receive requests for new substreams even after having | 160 | 389 | // sent the GoAway. Because we close the writing side, it is not possible to indicate | 161 | 389 | // to the remote that these new substreams are denied. However, this is not a problem | 162 | 389 | // as the remote interprets our GoAway frame as an automatic refusal of all its pending | 163 | 389 | // substream requests. | 164 | 389 | // TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121 | 165 | 389 | if (self.inner.yamux.len() | 166 | 389 | == if self.inner.yamux.has_substream(self.inner.outgoing_pings) { | 167 | 389 | 1 | 168 | | } else { | 169 | 0 | 0 | 170 | | }) | 171 | 49 | && self.inner.yamux.goaway_sent() | 172 | 0 | && self.inner.yamux.received_goaway().is_some() | 173 | 0 | { | 174 | 0 | read_write.close_write(); | 175 | 389 | } | 176 | | | 177 | | // Note that we treat the reading side being closed the same way as no data being | 178 | | // received. The fact that the remote has closed their writing side is no different | 179 | | // than them leaving their writing side open but no longer send any data at all. | 180 | | // The remote is free to close their writing side at any point if it judges that it | 181 | | // will no longer need to send anymore data. | 182 | | // Note, however, that in principle the remote should have sent a GoAway frame prior | 183 | | // to closing their writing side. But this is not something we check or really care | 184 | | // about. | 185 | | | 186 | | // Pass the `read_write` through the Noise state machine. | 187 | 389 | let mut decrypted_read_write = self | 188 | 389 | .encryption | 189 | 389 | .read_write(read_write) | 190 | 389 | .map_err(Error::Noise)?0 ; | 191 | | | 192 | | // Pass the Noise decrypted stream through the Yamux state machine. | 193 | 389 | let yamux_rw_outcome = self | 194 | 389 | .inner | 195 | 389 | .yamux | 196 | 389 | .read_write(&mut decrypted_read_write) | 197 | 389 | .map_err(Error::Yamux)?0 ; | 198 | | | 199 | 389 | match yamux_rw_outcome { | 200 | 40 | yamux::ReadWriteOutcome::Idle { yamux } => { | 201 | 40 | self.inner.yamux = yamux; | 202 | 40 | | 203 | 40 | // Nothing happened, and thus there is nothing more to do. | 204 | 40 | drop(decrypted_read_write); | 205 | 40 | return Ok((self, None)); | 206 | | } | 207 | 20 | yamux::ReadWriteOutcome::IncomingSubstream { mut yamux } => { | 208 | 20 | debug_assert!(!yamux.goaway_queued_or_sent()); | 209 | | | 210 | | // Receive a request from the remote for a new incoming substream. | 211 | | // These requests are automatically accepted unless the total limit to the | 212 | | // number of substreams has been reached. | 213 | | // Note that `num_inbound()` counts substreams that have been closed but not | 214 | | // yet removed from the state machine. This can affect the actual limit in a | 215 | | // subtle way. At the time of writing of this comment the limit should be | 216 | | // properly enforced, however it is not considered problematic if it weren't. | 217 | 20 | if yamux.num_inbound() >= self.inner.max_inbound_substreams { | 218 | 0 | // Can only error if there's no incoming substream, which we know for sure | 219 | 0 | // is the case here. | 220 | 0 | yamux | 221 | 0 | .reject_pending_substream() | 222 | 0 | .unwrap_or_else(|_| panic!()); | 223 | 20 | } else { | 224 | 20 | // Can only error if there's no incoming substream, which we know for sure | 225 | 20 | // is the case here. | 226 | 20 | yamux | 227 | 20 | .accept_pending_substream(Some(( | 228 | 20 | substream::Substream::ingoing(self.inner.max_protocol_name_len), | 229 | 20 | None, | 230 | 20 | ))) | 231 | 20 | .unwrap_or_else(|_| panic!()); | 232 | 20 | } | 233 | | | 234 | 20 | self.inner.yamux = yamux; | 235 | 20 | | 236 | 20 | drop(decrypted_read_write); | 237 | 20 | return Ok((self, None)); | 238 | | } | 239 | | yamux::ReadWriteOutcome::ProcessSubstream { | 240 | 329 | mut substream_read_write, | 241 | 329 | } => { | 242 | 329 | // The Yamux state machine needs to process a substream. | 243 | 329 | | 244 | 329 | // Temporarily extract the substream's fields to put them back later. | 245 | 329 | let (state_machine, mut substream_user_data) = | 246 | 329 | substream_read_write.user_data_mut().take().unwrap(); | 247 | 329 | let (state_machine_update, event) = | 248 | 329 | state_machine.read_write(substream_read_write.read_write()); | 249 | 329 | | 250 | 329 | let event_to_yield = event.map(|ev| { | 251 | | Self::pass_through_substream_event( | 252 | | substream_read_write.substream_id(), | 253 | | &mut substream_user_data, | 254 | | ev, | 255 | | ) | 256 | 329 | }); | 257 | 329 | | 258 | 329 | match state_machine_update { | 259 | 324 | Some(s) => { | 260 | 324 | *substream_read_write.user_data_mut() = Some((s, substream_user_data)); | 261 | 324 | self.inner.yamux = substream_read_write.finish(); | 262 | 324 | } | 263 | 5 | None => { | 264 | 5 | self.inner.yamux = substream_read_write.reset(); | 265 | 5 | } | 266 | | } | 267 | | | 268 | 329 | if let Some(event_to_yield34 ) = event_to_yield { | 269 | 34 | drop(decrypted_read_write); | 270 | 34 | return Ok((self, Some(event_to_yield))); | 271 | 295 | } | 272 | | } | 273 | 0 | yamux::ReadWriteOutcome::StreamReset { yamux, .. } => { | 274 | 0 | self.inner.yamux = yamux; | 275 | 0 | decrypted_read_write.wake_up_asap(); | 276 | 0 | } | 277 | 0 | yamux::ReadWriteOutcome::GoAway { yamux, .. } => { | 278 | 0 | self.inner.yamux = yamux; | 279 | 0 | drop(decrypted_read_write); | 280 | 0 | return Ok((self, Some(Event::NewOutboundSubstreamsForbidden))); | 281 | | } | 282 | | yamux::ReadWriteOutcome::PingResponse { .. } => { | 283 | | // Can only happen if we send out Yamux pings, which we never do. | 284 | 0 | unreachable!() | 285 | | } | 286 | | } | 287 | | | 288 | 295 | drop(decrypted_read_write); | 289 | 295 | | 290 | 295 | // Substreams that have been closed or reset aren't immediately removed the yamux state | 291 | 295 | // machine. They must be removed manually, which is what is done here. | 292 | 295 | // TODO: could be optimized by doing it only through a Yamux event? this is the case for StreamReset but not for graceful streams closures | 293 | 295 | let dead_substream_ids = self | 294 | 295 | .inner | 295 | 295 | .yamux | 296 | 295 | .dead_substreams() | 297 | 295 | .map(|(id, death_ty, _)| (id, death_ty)) | 298 | 295 | .collect::<Vec<_>>(); | 299 | 297 | for (dead_substream_id, death_ty2 ) in dead_substream_ids { | 300 | 2 | match death_ty { | 301 | | yamux::DeadSubstreamTy::Reset => { | 302 | | // If the substream was reset by the remote, then the substream state | 303 | | // machine will still be `Some`. | 304 | 0 | if let Some((state_machine, mut user_data)) = | 305 | 2 | self.inner.yamux.remove_dead_substream(dead_substream_id) | 306 | | { | 307 | | // TODO: consider changing this `state_machine.reset()` function to be a state transition of the substream state machine (that doesn't take ownership), to simplify the implementation of both the substream state machine and this code | 308 | 0 | if let Some(event) = state_machine.reset() { | 309 | 0 | return Ok(( | 310 | 0 | self, | 311 | 0 | Some(Self::pass_through_substream_event( | 312 | 0 | dead_substream_id, | 313 | 0 | &mut user_data, | 314 | 0 | event, | 315 | 0 | )), | 316 | 0 | )); | 317 | 0 | } | 318 | 2 | }; | 319 | | | 320 | | // Removing a dead substream might lead to Yamux being able to process more | 321 | | // incoming data. As such, we loop again. | 322 | 2 | read_write.wake_up_asap(); | 323 | | } | 324 | 0 | yamux::DeadSubstreamTy::ClosedGracefully => { | 325 | 0 | self.inner.yamux.remove_dead_substream(dead_substream_id); | 326 | 0 | } | 327 | | } | 328 | | } | 329 | | | 330 | 295 | Ok((self, None)) | 331 | 389 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE10read_writeBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node |
332 | | |
333 | | /// Turns an event from the [`substream`] module into an [`Event`]. |
334 | 34 | fn pass_through_substream_event( |
335 | 34 | substream_id: yamux::SubstreamId, |
336 | 34 | substream_user_data: &mut Option<TSubUd>, |
337 | 34 | event: substream::Event, |
338 | 34 | ) -> Event<TSubUd> { |
339 | 34 | match event { |
340 | | substream::Event::InboundError { |
341 | 0 | error, |
342 | 0 | was_accepted: false, |
343 | 0 | } => Event::InboundError(error), |
344 | | substream::Event::InboundError { |
345 | | was_accepted: true, .. |
346 | 0 | } => Event::InboundAcceptedCancel { |
347 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
348 | 0 | user_data: substream_user_data.take().unwrap(), |
349 | 0 | // TODO: notify of the error? |
350 | 0 | }, |
351 | 20 | substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated { |
352 | 20 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
353 | 20 | protocol_name, |
354 | 20 | }, |
355 | 0 | substream::Event::InboundNegotiatedCancel => Event::InboundNegotiatedCancel { |
356 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
357 | 0 | }, |
358 | 2 | substream::Event::RequestIn { request } => Event::RequestIn { |
359 | 2 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
360 | 2 | request, |
361 | 2 | }, |
362 | 3 | substream::Event::Response { response } => Event::Response { |
363 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
364 | 3 | response, |
365 | 3 | user_data: substream_user_data.take().unwrap(), |
366 | 3 | }, |
367 | 3 | substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen { |
368 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
369 | 3 | handshake, |
370 | 3 | }, |
371 | 0 | substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel { |
372 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
373 | 0 | }, |
374 | 3 | substream::Event::NotificationIn { notification } => Event::NotificationIn { |
375 | 3 | notification, |
376 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
377 | 3 | }, |
378 | 0 | substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose { |
379 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
380 | 0 | outcome, |
381 | 0 | user_data: substream_user_data.take().unwrap(), |
382 | 0 | }, |
383 | 3 | substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult { |
384 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
385 | 3 | result: match result { |
386 | 1 | Ok(r) => Ok(r), |
387 | 2 | Err(err) => Err((err, substream_user_data.take().unwrap())), |
388 | | }, |
389 | | }, |
390 | | substream::Event::NotificationsOutCloseDemanded => { |
391 | 0 | Event::NotificationsOutCloseDemanded { |
392 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
393 | 0 | } |
394 | | } |
395 | 0 | substream::Event::NotificationsOutReset => Event::NotificationsOutReset { |
396 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), |
397 | 0 | user_data: substream_user_data.take().unwrap(), |
398 | 0 | }, |
399 | 0 | substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time }, |
400 | | substream::Event::PingOutError { .. } => { |
401 | | // Because ping events are automatically generated by the external API without any |
402 | | // guarantee, it is safe to merge multiple failed pings into one. |
403 | 0 | Event::PingOutFailed |
404 | | } |
405 | | } |
406 | 34 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28pass_through_substream_eventBa_ Line | Count | Source | 334 | 34 | fn pass_through_substream_event( | 335 | 34 | substream_id: yamux::SubstreamId, | 336 | 34 | substream_user_data: &mut Option<TSubUd>, | 337 | 34 | event: substream::Event, | 338 | 34 | ) -> Event<TSubUd> { | 339 | 34 | match event { | 340 | | substream::Event::InboundError { | 341 | 0 | error, | 342 | 0 | was_accepted: false, | 343 | 0 | } => Event::InboundError(error), | 344 | | substream::Event::InboundError { | 345 | | was_accepted: true, .. | 346 | 0 | } => Event::InboundAcceptedCancel { | 347 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 348 | 0 | user_data: substream_user_data.take().unwrap(), | 349 | 0 | // TODO: notify of the error? | 350 | 0 | }, | 351 | 20 | substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated { | 352 | 20 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 353 | 20 | protocol_name, | 354 | 20 | }, | 355 | 0 | substream::Event::InboundNegotiatedCancel => Event::InboundNegotiatedCancel { | 356 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 357 | 0 | }, | 358 | 2 | substream::Event::RequestIn { request } => Event::RequestIn { | 359 | 2 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 360 | 2 | request, | 361 | 2 | }, | 362 | 3 | substream::Event::Response { response } => Event::Response { | 363 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 364 | 3 | response, | 365 | 3 | user_data: substream_user_data.take().unwrap(), | 366 | 3 | }, | 367 | 3 | substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen { | 368 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 369 | 3 | handshake, | 370 | 3 | }, | 371 | 0 | substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel { | 372 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 373 | 0 | }, | 374 | 3 | substream::Event::NotificationIn { notification } => Event::NotificationIn { | 375 | 3 | notification, | 376 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 377 | 3 | }, | 378 | 0 | substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose { | 379 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 380 | 0 | outcome, | 381 | 0 | user_data: substream_user_data.take().unwrap(), | 382 | 0 | }, | 383 | 3 | substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult { | 384 | 3 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 385 | 3 | result: match result { | 386 | 1 | Ok(r) => Ok(r), | 387 | 2 | Err(err) => Err((err, substream_user_data.take().unwrap())), | 388 | | }, | 389 | | }, | 390 | | substream::Event::NotificationsOutCloseDemanded => { | 391 | 0 | Event::NotificationsOutCloseDemanded { | 392 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 393 | 0 | } | 394 | | } | 395 | 0 | substream::Event::NotificationsOutReset => Event::NotificationsOutReset { | 396 | 0 | id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)), | 397 | 0 | user_data: substream_user_data.take().unwrap(), | 398 | 0 | }, | 399 | 0 | substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time }, | 400 | | substream::Event::PingOutError { .. } => { | 401 | | // Because ping events are automatically generated by the external API without any | 402 | | // guarantee, it is safe to merge multiple failed pings into one. | 403 | 0 | Event::PingOutFailed | 404 | | } | 405 | | } | 406 | 34 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28pass_through_substream_eventCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28pass_through_substream_eventBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28pass_through_substream_eventCsiUjFBJteJ7x_17smoldot_full_node |
407 | | |
408 | | /// Close the incoming substreams, automatically denying any new substream request from the |
409 | | /// remote. |
410 | | /// |
411 | | /// Note that this does not prevent incoming-substreams-related events |
412 | | /// (such as [`Event::RequestIn`]) from being generated, as it is possible that the remote has |
413 | | /// already opened a substream but has no sent all the necessary handshake messages yet. |
414 | | /// |
415 | | /// # Panic |
416 | | /// |
417 | | /// Panic if this function has been called before. It is illegal to call |
418 | | /// [`SingleStream::deny_new_incoming_substreams`] more than one on the same connections. |
419 | | /// |
420 | 0 | pub fn deny_new_incoming_substreams(&mut self) { |
421 | 0 | // TODO: arbitrary yamux error code |
422 | 0 | self.inner |
423 | 0 | .yamux |
424 | 0 | .send_goaway(yamux::GoAwayErrorCode::NormalTermination) |
425 | 0 | .unwrap() |
426 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28deny_new_incoming_substreamsBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28deny_new_incoming_substreamsBa_ |
427 | | |
428 | | /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`]. |
429 | | /// |
430 | | /// The new value only applies to substreams opened after this function has been called. |
431 | 0 | pub fn set_max_protocol_name_len(&mut self, new_value: usize) { |
432 | 0 | self.inner.max_protocol_name_len = new_value; |
433 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE25set_max_protocol_name_lenBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE25set_max_protocol_name_lenBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsiUjFBJteJ7x_17smoldot_full_node |
434 | | |
435 | | /// Sends a request to the remote. |
436 | | /// |
437 | | /// This method only inserts the request into the connection object. Use |
438 | | /// [`SingleStream::read_write`] in order to actually send out the request. |
439 | | /// |
440 | | /// Assuming that the remote is using the same implementation, an [`Event::RequestIn`] will |
441 | | /// be generated on its side. |
442 | | /// |
443 | | /// If `request` is `None`, then no request is sent to the remote at all. If `request` is |
444 | | /// `Some`, then a (potentially-empty) request is sent. If `Some(&[])` is provided, a |
445 | | /// length-prefix containing a 0 is sent to the remote. |
446 | | /// |
447 | | /// After the remote has sent back a response, an [`Event::Response`] event will be generated |
448 | | /// locally. The `user_data` parameter will be passed back. |
449 | | /// |
450 | | /// The timeout is the time between the moment the substream is opened and the moment the |
451 | | /// response is sent back. If the emitter doesn't send the request or if the receiver doesn't |
452 | | /// answer during this time window, the request is considered failed. |
453 | | /// |
454 | | /// # Panic |
455 | | /// |
456 | | /// Panics if a [`Event::NewOutboundSubstreamsForbidden`] event has been generated in the past. |
457 | | /// |
458 | 3 | pub fn add_request( |
459 | 3 | &mut self, |
460 | 3 | protocol_name: String, |
461 | 3 | request: Option<Vec<u8>>, |
462 | 3 | timeout: TNow, |
463 | 3 | max_response_size: usize, |
464 | 3 | user_data: TSubUd, |
465 | 3 | ) -> SubstreamId { |
466 | 3 | let substream_id = self |
467 | 3 | .inner |
468 | 3 | .yamux |
469 | 3 | .open_substream(Some(( |
470 | 3 | substream::Substream::request_out( |
471 | 3 | protocol_name, |
472 | 3 | timeout, |
473 | 3 | request, |
474 | 3 | max_response_size, |
475 | 3 | ), |
476 | 3 | Some(user_data), |
477 | 3 | ))) |
478 | 3 | .unwrap(); // TODO: consider not panicking |
479 | 3 | |
480 | 3 | // TODO: we add some bytes due to the length prefix, this is a bit hacky as we should ask this information from the substream |
481 | 3 | self.inner.yamux.add_remote_window_saturating( |
482 | 3 | substream_id, |
483 | 3 | u64::try_from(max_response_size) |
484 | 3 | .unwrap_or(u64::MAX) |
485 | 3 | .saturating_add(64) |
486 | 3 | .saturating_sub(yamux::NEW_SUBSTREAMS_FRAME_SIZE), |
487 | 3 | ); |
488 | 3 | |
489 | 3 | SubstreamId(SubstreamIdInner::SingleStream(substream_id)) |
490 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE11add_requestBa_ Line | Count | Source | 458 | 3 | pub fn add_request( | 459 | 3 | &mut self, | 460 | 3 | protocol_name: String, | 461 | 3 | request: Option<Vec<u8>>, | 462 | 3 | timeout: TNow, | 463 | 3 | max_response_size: usize, | 464 | 3 | user_data: TSubUd, | 465 | 3 | ) -> SubstreamId { | 466 | 3 | let substream_id = self | 467 | 3 | .inner | 468 | 3 | .yamux | 469 | 3 | .open_substream(Some(( | 470 | 3 | substream::Substream::request_out( | 471 | 3 | protocol_name, | 472 | 3 | timeout, | 473 | 3 | request, | 474 | 3 | max_response_size, | 475 | 3 | ), | 476 | 3 | Some(user_data), | 477 | 3 | ))) | 478 | 3 | .unwrap(); // TODO: consider not panicking | 479 | 3 | | 480 | 3 | // TODO: we add some bytes due to the length prefix, this is a bit hacky as we should ask this information from the substream | 481 | 3 | self.inner.yamux.add_remote_window_saturating( | 482 | 3 | substream_id, | 483 | 3 | u64::try_from(max_response_size) | 484 | 3 | .unwrap_or(u64::MAX) | 485 | 3 | .saturating_add(64) | 486 | 3 | .saturating_sub(yamux::NEW_SUBSTREAMS_FRAME_SIZE), | 487 | 3 | ); | 488 | 3 | | 489 | 3 | SubstreamId(SubstreamIdInner::SingleStream(substream_id)) | 490 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE11add_requestBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsiUjFBJteJ7x_17smoldot_full_node |
491 | | |
492 | | /// Opens a outgoing substream with the given protocol, destined for a stream of |
493 | | /// notifications. |
494 | | /// |
495 | | /// The remote must first accept (or reject) the substream before notifications can be sent |
496 | | /// on it. |
497 | | /// |
498 | | /// This method only inserts the opening handshake into the connection object. Use |
499 | | /// [`SingleStream::read_write`] in order to actually send out the request. |
500 | | /// |
501 | | /// Assuming that the remote is using the same implementation, an |
502 | | /// [`Event::NotificationsInOpen`] will be generated on its side. |
503 | | /// |
504 | | /// # Panic |
505 | | /// |
506 | | /// Panics if a [`Event::NewOutboundSubstreamsForbidden`] event has been generated in the past. |
507 | | /// |
508 | 3 | pub fn open_notifications_substream( |
509 | 3 | &mut self, |
510 | 3 | protocol_name: String, |
511 | 3 | handshake: Vec<u8>, |
512 | 3 | max_handshake_size: usize, |
513 | 3 | timeout: TNow, |
514 | 3 | user_data: TSubUd, |
515 | 3 | ) -> SubstreamId { |
516 | 3 | let substream = self |
517 | 3 | .inner |
518 | 3 | .yamux |
519 | 3 | .open_substream(Some(( |
520 | 3 | substream::Substream::notifications_out( |
521 | 3 | timeout, |
522 | 3 | protocol_name, |
523 | 3 | handshake, |
524 | 3 | max_handshake_size, |
525 | 3 | ), |
526 | 3 | Some(user_data), |
527 | 3 | ))) |
528 | 3 | .unwrap(); // TODO: consider not panicking |
529 | 3 | |
530 | 3 | SubstreamId(SubstreamIdInner::SingleStream(substream)) |
531 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28open_notifications_substreamBa_ Line | Count | Source | 508 | 3 | pub fn open_notifications_substream( | 509 | 3 | &mut self, | 510 | 3 | protocol_name: String, | 511 | 3 | handshake: Vec<u8>, | 512 | 3 | max_handshake_size: usize, | 513 | 3 | timeout: TNow, | 514 | 3 | user_data: TSubUd, | 515 | 3 | ) -> SubstreamId { | 516 | 3 | let substream = self | 517 | 3 | .inner | 518 | 3 | .yamux | 519 | 3 | .open_substream(Some(( | 520 | 3 | substream::Substream::notifications_out( | 521 | 3 | timeout, | 522 | 3 | protocol_name, | 523 | 3 | handshake, | 524 | 3 | max_handshake_size, | 525 | 3 | ), | 526 | 3 | Some(user_data), | 527 | 3 | ))) | 528 | 3 | .unwrap(); // TODO: consider not panicking | 529 | 3 | | 530 | 3 | SubstreamId(SubstreamIdInner::SingleStream(substream)) | 531 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28open_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
532 | | |
533 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol |
534 | | /// name and indicate the type of the protocol. |
535 | | /// |
536 | | /// # Panic |
537 | | /// |
538 | | /// Panics if the substream is not in the correct state. |
539 | | /// |
540 | 19 | pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) { |
541 | 19 | let substream_id = match substream_id.0 { |
542 | 19 | SubstreamIdInner::SingleStream(id) => id, |
543 | 0 | _ => panic!(), |
544 | | }; |
545 | | |
546 | 19 | let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap(); |
547 | 19 | substream.accept_inbound(ty); |
548 | 19 | debug_assert!(ud.is_none()); |
549 | 19 | *ud = Some(user_data); |
550 | 19 | self.inner.yamux.mark_substream_write_ready(substream_id); |
551 | 19 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE14accept_inboundBa_ Line | Count | Source | 540 | 19 | pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) { | 541 | 19 | let substream_id = match substream_id.0 { | 542 | 19 | SubstreamIdInner::SingleStream(id) => id, | 543 | 0 | _ => panic!(), | 544 | | }; | 545 | | | 546 | 19 | let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap(); | 547 | 19 | substream.accept_inbound(ty); | 548 | 19 | debug_assert!(ud.is_none()); | 549 | 19 | *ud = Some(user_data); | 550 | 19 | self.inner.yamux.mark_substream_write_ready(substream_id); | 551 | 19 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE14accept_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node |
552 | | |
553 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the |
554 | | /// protocol name as not supported. |
555 | | /// |
556 | | /// # Panic |
557 | | /// |
558 | | /// Panics if the substream is not in the correct state. |
559 | | /// |
560 | 1 | pub fn reject_inbound(&mut self, substream_id: SubstreamId) { |
561 | 1 | let substream_id = match substream_id.0 { |
562 | 1 | SubstreamIdInner::SingleStream(id) => id, |
563 | 0 | _ => panic!(), |
564 | | }; |
565 | | |
566 | 1 | let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap(); |
567 | 1 | substream.reject_inbound(); |
568 | 1 | debug_assert!(ud.is_none()); |
569 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); |
570 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE14reject_inboundBa_ Line | Count | Source | 560 | 1 | pub fn reject_inbound(&mut self, substream_id: SubstreamId) { | 561 | 1 | let substream_id = match substream_id.0 { | 562 | 1 | SubstreamIdInner::SingleStream(id) => id, | 563 | 0 | _ => panic!(), | 564 | | }; | 565 | | | 566 | 1 | let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap(); | 567 | 1 | substream.reject_inbound(); | 568 | 1 | debug_assert!(ud.is_none()); | 569 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); | 570 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE14reject_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node |
571 | | |
572 | | /// Accepts an inbound notifications protocol. Must be called in response to a |
573 | | /// [`Event::NotificationsInOpen`]. |
574 | | /// |
575 | | /// #Â Panic |
576 | | /// |
577 | | /// Panics if the substream id is not valid or the substream is of the wrong type. |
578 | | /// |
579 | 1 | pub fn accept_in_notifications_substream( |
580 | 1 | &mut self, |
581 | 1 | substream_id: SubstreamId, |
582 | 1 | handshake: Vec<u8>, |
583 | 1 | max_notification_size: usize, |
584 | 1 | ) { |
585 | 1 | let substream_id = match substream_id.0 { |
586 | 1 | SubstreamIdInner::SingleStream(id) => id, |
587 | 0 | _ => panic!(), |
588 | | }; |
589 | | |
590 | 1 | self.inner.yamux[substream_id] |
591 | 1 | .as_mut() |
592 | 1 | .unwrap() |
593 | 1 | .0 |
594 | 1 | .accept_in_notifications_substream(handshake, max_notification_size); |
595 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); |
596 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33accept_in_notifications_substreamBa_ Line | Count | Source | 579 | 1 | pub fn accept_in_notifications_substream( | 580 | 1 | &mut self, | 581 | 1 | substream_id: SubstreamId, | 582 | 1 | handshake: Vec<u8>, | 583 | 1 | max_notification_size: usize, | 584 | 1 | ) { | 585 | 1 | let substream_id = match substream_id.0 { | 586 | 1 | SubstreamIdInner::SingleStream(id) => id, | 587 | 0 | _ => panic!(), | 588 | | }; | 589 | | | 590 | 1 | self.inner.yamux[substream_id] | 591 | 1 | .as_mut() | 592 | 1 | .unwrap() | 593 | 1 | .0 | 594 | 1 | .accept_in_notifications_substream(handshake, max_notification_size); | 595 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); | 596 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33accept_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
597 | | |
598 | | /// Rejects an inbound notifications protocol. Must be called in response to a |
599 | | /// [`Event::NotificationsInOpen`]. |
600 | | /// |
601 | | /// #Â Panic |
602 | | /// |
603 | | /// Panics if the substream id is not valid or the substream is of the wrong type. |
604 | | /// |
605 | 1 | pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) { |
606 | 1 | let substream_id = match substream_id.0 { |
607 | 1 | SubstreamIdInner::SingleStream(id) => id, |
608 | 0 | _ => panic!(), |
609 | | }; |
610 | | |
611 | 1 | self.inner.yamux[substream_id] |
612 | 1 | .as_mut() |
613 | 1 | .unwrap() |
614 | 1 | .0 |
615 | 1 | .reject_in_notifications_substream(); |
616 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); |
617 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33reject_in_notifications_substreamBa_ Line | Count | Source | 605 | 1 | pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) { | 606 | 1 | let substream_id = match substream_id.0 { | 607 | 1 | SubstreamIdInner::SingleStream(id) => id, | 608 | 0 | _ => panic!(), | 609 | | }; | 610 | | | 611 | 1 | self.inner.yamux[substream_id] | 612 | 1 | .as_mut() | 613 | 1 | .unwrap() | 614 | 1 | .0 | 615 | 1 | .reject_in_notifications_substream(); | 616 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); | 617 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33reject_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
618 | | |
619 | | /// Queues a notification to be written out on the given substream. |
620 | | /// |
621 | | /// # About back-pressure |
622 | | /// |
623 | | /// This method unconditionally queues up data. You must be aware that the remote, however, |
624 | | /// can decide to delay indefinitely the sending of that data, which can potentially lead to |
625 | | /// an unbounded increase in memory. |
626 | | /// |
627 | | /// As such, you are encouraged to call this method only if the amount of queued data (as |
628 | | /// determined by calling [`SingleStream::notification_substream_queued_bytes`]) is below a |
629 | | /// certain threshold. If above, the notification should be silently discarded. |
630 | | /// |
631 | | /// # Panic |
632 | | /// |
633 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
634 | | /// notifications substream isn't in the appropriate state. |
635 | | /// |
636 | 3 | pub fn write_notification_unbounded( |
637 | 3 | &mut self, |
638 | 3 | substream_id: SubstreamId, |
639 | 3 | notification: Vec<u8>, |
640 | 3 | ) { |
641 | 3 | let substream_id = match substream_id.0 { |
642 | 3 | SubstreamIdInner::SingleStream(id) => id, |
643 | 0 | _ => panic!(), |
644 | | }; |
645 | | |
646 | 3 | self.inner.yamux[substream_id] |
647 | 3 | .as_mut() |
648 | 3 | .unwrap() |
649 | 3 | .0 |
650 | 3 | .write_notification_unbounded(notification); |
651 | 3 | self.inner.yamux.mark_substream_write_ready(substream_id); |
652 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28write_notification_unboundedBa_ Line | Count | Source | 636 | 3 | pub fn write_notification_unbounded( | 637 | 3 | &mut self, | 638 | 3 | substream_id: SubstreamId, | 639 | 3 | notification: Vec<u8>, | 640 | 3 | ) { | 641 | 3 | let substream_id = match substream_id.0 { | 642 | 3 | SubstreamIdInner::SingleStream(id) => id, | 643 | 0 | _ => panic!(), | 644 | | }; | 645 | | | 646 | 3 | self.inner.yamux[substream_id] | 647 | 3 | .as_mut() | 648 | 3 | .unwrap() | 649 | 3 | .0 | 650 | 3 | .write_notification_unbounded(notification); | 651 | 3 | self.inner.yamux.mark_substream_write_ready(substream_id); | 652 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28write_notification_unboundedBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsiUjFBJteJ7x_17smoldot_full_node |
653 | | |
654 | | /// Returns the number of bytes waiting to be sent out on that substream. |
655 | | /// |
656 | | /// See the documentation of [`SingleStream::write_notification_unbounded`] for context. |
657 | | /// |
658 | | /// # Panic |
659 | | /// |
660 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
661 | | /// notifications substream isn't in the appropriate state. |
662 | | /// |
663 | 0 | pub fn notification_substream_queued_bytes(&self, substream_id: SubstreamId) -> usize { |
664 | 0 | let substream_id = match substream_id.0 { |
665 | 0 | SubstreamIdInner::SingleStream(id) => id, |
666 | 0 | _ => panic!(), |
667 | | }; |
668 | | |
669 | | // Note that this doesn't take into account data that the Yamux or Noise state machines |
670 | | // have extracted from the substream but hasn't sent out yet, because the objective of this |
671 | | // function is to provide a hint about when to stop sending more data, and the size of the |
672 | | // data that Noise and Yamux have extracted is always bounded anyway. It's not worth the |
673 | | // effort of reporting a 100% accurate information when a 100% accurate information isn't |
674 | | // needed. |
675 | 0 | self.inner.yamux[substream_id] |
676 | 0 | .as_ref() |
677 | 0 | .unwrap() |
678 | 0 | .0 |
679 | 0 | .notification_substream_queued_bytes() |
680 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE35notification_substream_queued_bytesBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE35notification_substream_queued_bytesBa_ |
681 | | |
682 | | /// Closes a notifications substream opened after a successful |
683 | | /// [`Event::NotificationsOutResult`]. |
684 | | /// |
685 | | /// This can be done even when in the negotiation phase, in other words before the remote has |
686 | | /// accepted/refused the substream. |
687 | | /// |
688 | | /// # Panic |
689 | | /// |
690 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
691 | | /// notifications substream isn't in the appropriate state. |
692 | | /// |
693 | 0 | pub fn close_out_notifications_substream(&mut self, substream_id: SubstreamId) { |
694 | 0 | let substream_id = match substream_id.0 { |
695 | 0 | SubstreamIdInner::SingleStream(id) => id, |
696 | 0 | _ => panic!(), |
697 | | }; |
698 | | |
699 | 0 | if !self.inner.yamux.has_substream(substream_id) { |
700 | 0 | panic!() |
701 | 0 | } |
702 | 0 |
|
703 | 0 | self.inner.yamux[substream_id] |
704 | 0 | .as_mut() |
705 | 0 | .unwrap() |
706 | 0 | .0 |
707 | 0 | .close_out_notifications_substream(); |
708 | 0 | self.inner.yamux.mark_substream_write_ready(substream_id); |
709 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33close_out_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33close_out_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
710 | | |
711 | | /// Closes a notifications substream that was accepted using |
712 | | /// [`SingleStream::accept_in_notifications_substream`]. |
713 | | /// |
714 | | /// # Panic |
715 | | /// |
716 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
717 | | /// notifications substream isn't in the appropriate state. |
718 | | /// |
719 | 0 | pub fn close_in_notifications_substream(&mut self, substream_id: SubstreamId, timeout: TNow) { |
720 | 0 | let substream_id = match substream_id.0 { |
721 | 0 | SubstreamIdInner::SingleStream(id) => id, |
722 | 0 | _ => panic!(), |
723 | | }; |
724 | | |
725 | 0 | if !self.inner.yamux.has_substream(substream_id) { |
726 | 0 | panic!() |
727 | 0 | } |
728 | 0 |
|
729 | 0 | self.inner.yamux[substream_id] |
730 | 0 | .as_mut() |
731 | 0 | .unwrap() |
732 | 0 | .0 |
733 | 0 | .close_in_notifications_substream(timeout); |
734 | 0 | self.inner.yamux.mark_substream_write_ready(substream_id); |
735 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE32close_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE32close_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
736 | | |
737 | | /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`]. |
738 | | /// |
739 | | /// Passing an `Err` corresponds, on the other side, to a |
740 | | /// [`substream::RequestError::SubstreamClosed`]. |
741 | | /// |
742 | | /// Returns an error if the [`SubstreamId`] is invalid. |
743 | 1 | pub fn respond_in_request( |
744 | 1 | &mut self, |
745 | 1 | substream_id: SubstreamId, |
746 | 1 | response: Result<Vec<u8>, ()>, |
747 | 1 | ) -> Result<(), RespondInRequestError> { |
748 | 1 | let substream_id = match substream_id.0 { |
749 | 1 | SubstreamIdInner::SingleStream(id) => id, |
750 | 0 | _ => return Err(RespondInRequestError::SubstreamClosed), |
751 | | }; |
752 | | |
753 | 1 | if !self.inner.yamux.has_substream(substream_id) { |
754 | 0 | return Err(RespondInRequestError::SubstreamClosed); |
755 | 1 | } |
756 | 1 | |
757 | 1 | self.inner.yamux[substream_id] |
758 | 1 | .as_mut() |
759 | 1 | .unwrap() |
760 | 1 | .0 |
761 | 1 | .respond_in_request(response)?0 ; |
762 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); |
763 | 1 | Ok(()) |
764 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE18respond_in_requestBa_ Line | Count | Source | 743 | 1 | pub fn respond_in_request( | 744 | 1 | &mut self, | 745 | 1 | substream_id: SubstreamId, | 746 | 1 | response: Result<Vec<u8>, ()>, | 747 | 1 | ) -> Result<(), RespondInRequestError> { | 748 | 1 | let substream_id = match substream_id.0 { | 749 | 1 | SubstreamIdInner::SingleStream(id) => id, | 750 | 0 | _ => return Err(RespondInRequestError::SubstreamClosed), | 751 | | }; | 752 | | | 753 | 1 | if !self.inner.yamux.has_substream(substream_id) { | 754 | 0 | return Err(RespondInRequestError::SubstreamClosed); | 755 | 1 | } | 756 | 1 | | 757 | 1 | self.inner.yamux[substream_id] | 758 | 1 | .as_mut() | 759 | 1 | .unwrap() | 760 | 1 | .0 | 761 | 1 | .respond_in_request(response)?0 ; | 762 | 1 | self.inner.yamux.mark_substream_write_ready(substream_id); | 763 | 1 | Ok(()) | 764 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE18respond_in_requestBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node |
765 | | } |
766 | | |
767 | | impl<TNow, TSubUd> Index<SubstreamId> for SingleStream<TNow, TSubUd> { |
768 | | type Output = TSubUd; |
769 | | |
770 | 0 | fn index(&self, substream_id: SubstreamId) -> &Self::Output { |
771 | 0 | let substream_id = match substream_id.0 { |
772 | 0 | SubstreamIdInner::SingleStream(id) => id, |
773 | 0 | _ => panic!(), |
774 | | }; |
775 | | |
776 | 0 | self.inner.yamux[substream_id] |
777 | 0 | .as_ref() |
778 | 0 | .unwrap() |
779 | 0 | .1 |
780 | 0 | .as_ref() |
781 | 0 | .unwrap() |
782 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_ Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB1H_3ops5index5IndexNtB6_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_ Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB2h_3ops5index5IndexNtB6_11SubstreamIdE5indexCsiUjFBJteJ7x_17smoldot_full_node |
783 | | } |
784 | | |
785 | | impl<TNow, TSubUd> IndexMut<SubstreamId> for SingleStream<TNow, TSubUd> { |
786 | 0 | fn index_mut(&mut self, substream_id: SubstreamId) -> &mut Self::Output { |
787 | 0 | let substream_id = match substream_id.0 { |
788 | 0 | SubstreamIdInner::SingleStream(id) => id, |
789 | 0 | _ => panic!(), |
790 | | }; |
791 | | |
792 | 0 | self.inner.yamux[substream_id] |
793 | 0 | .as_mut() |
794 | 0 | .unwrap() |
795 | 0 | .1 |
796 | 0 | .as_mut() |
797 | 0 | .unwrap() |
798 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams0_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_ Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams0_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_ |
799 | | } |
800 | | |
801 | | impl<TNow, TSubUd> fmt::Debug for SingleStream<TNow, TSubUd> |
802 | | where |
803 | | TSubUd: fmt::Debug, |
804 | | { |
805 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
806 | 0 | f.debug_map() |
807 | 0 | .entries(self.inner.yamux.user_datas()) |
808 | 0 | .finish() |
809 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams1_0ppEINtB5_12SingleStreamppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams1_0ppEINtB5_12SingleStreamppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ |
810 | | } |
811 | | |
812 | | /// Error during a connection. The connection should be shut down. |
813 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXs5_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs5_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
814 | | pub enum Error { |
815 | | /// Error in the noise cipher. Data has most likely been corrupted. |
816 | | #[display(fmt = "Noise error: {_0}")] |
817 | | Noise(noise::CipherError), |
818 | | /// Error while encoding noise data. |
819 | | #[display(fmt = "{_0}")] |
820 | | NoiseEncrypt(noise::EncryptError), |
821 | | /// Error in the Yamux multiplexing protocol. |
822 | | #[display(fmt = "Yamux error: {_0}")] |
823 | | Yamux(yamux::Error), |
824 | | } |
825 | | |
826 | | /// Successfully negotiated connection. Ready to be turned into a [`SingleStream`]. |
827 | | pub struct ConnectionPrototype { |
828 | | encryption: noise::Noise, |
829 | | } |
830 | | |
831 | | impl ConnectionPrototype { |
832 | | /// Builds a new [`ConnectionPrototype`] of a connection using the Noise and Yamux protocols. |
833 | 16 | pub(crate) fn from_noise_yamux(encryption: noise::Noise) -> Self { |
834 | 16 | ConnectionPrototype { encryption } |
835 | 16 | } _RNvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype16from_noise_yamux Line | Count | Source | 833 | 16 | pub(crate) fn from_noise_yamux(encryption: noise::Noise) -> Self { | 834 | 16 | ConnectionPrototype { encryption } | 835 | 16 | } |
Unexecuted instantiation: _RNvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype16from_noise_yamux |
836 | | |
837 | | /// Extracts the Noise state machine from this prototype. |
838 | 0 | pub fn into_noise_state_machine(self) -> noise::Noise { |
839 | 0 | self.encryption |
840 | 0 | } Unexecuted instantiation: _RNvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype24into_noise_state_machine Unexecuted instantiation: _RNvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype24into_noise_state_machine |
841 | | |
842 | | /// Turns this prototype into an actual connection. |
843 | 14 | pub fn into_connection<TNow, TSubUd>(self, config: Config<TNow>) -> SingleStream<TNow, TSubUd> |
844 | 14 | where |
845 | 14 | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
846 | 14 | { |
847 | 14 | let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed); |
848 | 14 | |
849 | 14 | let mut yamux = yamux::Yamux::new(yamux::Config { |
850 | 14 | is_initiator: self.encryption.is_initiator(), |
851 | 14 | capacity: config.substreams_capacity, |
852 | 14 | randomness_seed: { |
853 | 14 | let mut seed = [0; 32]; |
854 | 14 | randomness.fill_bytes(&mut seed); |
855 | 14 | seed |
856 | 14 | }, |
857 | 14 | max_out_data_frame_size: NonZeroU32::new(8192).unwrap(), // TODO: make configurable? |
858 | 14 | max_simultaneous_queued_pongs: NonZeroUsize::new(4).unwrap(), |
859 | 14 | max_simultaneous_rst_substreams: NonZeroUsize::new(1024).unwrap(), |
860 | 14 | }); |
861 | 14 | |
862 | 14 | let outgoing_pings = yamux |
863 | 14 | .open_substream(Some(( |
864 | 14 | substream::Substream::ping_out(config.ping_protocol.clone()), |
865 | 14 | None, |
866 | 14 | ))) |
867 | 14 | // Can only panic if a `GoAway` has been received, or if there are too many substreams |
868 | 14 | // already open, which we know for sure can't happen here |
869 | 14 | .unwrap_or_else(|_| panic!()0 ); Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationuE0Bg_ Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationINtNtB28_6option6OptionNtNtBe_10collection11SubstreamIdEE0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionppE0Bg_ Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBe_10collection11SubstreamIdEE0CsiUjFBJteJ7x_17smoldot_full_node |
870 | 14 | |
871 | 14 | SingleStream { |
872 | 14 | encryption: self.encryption, |
873 | 14 | inner: Box::new(Inner { |
874 | 14 | yamux, |
875 | 14 | outgoing_pings, |
876 | 14 | next_ping: config.first_out_ping, |
877 | 14 | ping_payload_randomness: randomness, |
878 | 14 | max_inbound_substreams: config.max_inbound_substreams, |
879 | 14 | max_protocol_name_len: config.max_protocol_name_len, |
880 | 14 | ping_interval: config.ping_interval, |
881 | 14 | ping_timeout: config.ping_timeout, |
882 | 14 | }), |
883 | 14 | } |
884 | 14 | } _RINvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationuEBe_ Line | Count | Source | 843 | 14 | pub fn into_connection<TNow, TSubUd>(self, config: Config<TNow>) -> SingleStream<TNow, TSubUd> | 844 | 14 | where | 845 | 14 | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, | 846 | 14 | { | 847 | 14 | let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed); | 848 | 14 | | 849 | 14 | let mut yamux = yamux::Yamux::new(yamux::Config { | 850 | 14 | is_initiator: self.encryption.is_initiator(), | 851 | 14 | capacity: config.substreams_capacity, | 852 | 14 | randomness_seed: { | 853 | 14 | let mut seed = [0; 32]; | 854 | 14 | randomness.fill_bytes(&mut seed); | 855 | 14 | seed | 856 | 14 | }, | 857 | 14 | max_out_data_frame_size: NonZeroU32::new(8192).unwrap(), // TODO: make configurable? | 858 | 14 | max_simultaneous_queued_pongs: NonZeroUsize::new(4).unwrap(), | 859 | 14 | max_simultaneous_rst_substreams: NonZeroUsize::new(1024).unwrap(), | 860 | 14 | }); | 861 | 14 | | 862 | 14 | let outgoing_pings = yamux | 863 | 14 | .open_substream(Some(( | 864 | 14 | substream::Substream::ping_out(config.ping_protocol.clone()), | 865 | 14 | None, | 866 | 14 | ))) | 867 | 14 | // Can only panic if a `GoAway` has been received, or if there are too many substreams | 868 | 14 | // already open, which we know for sure can't happen here | 869 | 14 | .unwrap_or_else(|_| panic!()); | 870 | 14 | | 871 | 14 | SingleStream { | 872 | 14 | encryption: self.encryption, | 873 | 14 | inner: Box::new(Inner { | 874 | 14 | yamux, | 875 | 14 | outgoing_pings, | 876 | 14 | next_ping: config.first_out_ping, | 877 | 14 | ping_payload_randomness: randomness, | 878 | 14 | max_inbound_substreams: config.max_inbound_substreams, | 879 | 14 | max_protocol_name_len: config.max_protocol_name_len, | 880 | 14 | ping_interval: config.ping_interval, | 881 | 14 | ping_timeout: config.ping_timeout, | 882 | 14 | }), | 883 | 14 | } | 884 | 14 | } |
Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationINtNtB26_6option6OptionNtNtBc_10collection11SubstreamIdEECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionppEBe_ Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBc_10collection11SubstreamIdEECsiUjFBJteJ7x_17smoldot_full_node |
885 | | } |
886 | | |
887 | | impl fmt::Debug for ConnectionPrototype { |
888 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
889 | 0 | f.debug_struct("ConnectionPrototype").finish() |
890 | 0 | } Unexecuted instantiation: _RNvXs3_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototypeNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs3_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototypeNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
891 | | } |