/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/substream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! Individual substream within an established connection. |
19 | | //! |
20 | | //! This module contains the [`Substream`] struct, a state machine containing the state of a |
21 | | //! single substream. When the remote sends data on that substream, or when the remote is ready to |
22 | | //! accept more data on that substream, the state machine can be updated by calling |
23 | | //! [`Substream::read_write`]. This optionally produces an event that indicates what happened on |
24 | | //! the substream as a result of the call. |
25 | | |
26 | | use crate::libp2p::{connection::multistream_select, read_write}; |
27 | | use crate::util::leb128; |
28 | | |
29 | | use alloc::{borrow::ToOwned as _, collections::VecDeque, string::String, vec::Vec}; |
30 | | use core::{ |
31 | | fmt, mem, |
32 | | num::NonZeroUsize, |
33 | | ops::{Add, Sub}, |
34 | | time::Duration, |
35 | | }; |
36 | | |
37 | | /// State machine containing the state of a single substream of an established connection. |
38 | | pub struct Substream<TNow> { |
39 | | inner: SubstreamInner<TNow>, |
40 | | } |
41 | | |
42 | | enum SubstreamInner<TNow> { |
43 | | /// Protocol negotiation in progress in an incoming substream. |
44 | | InboundNegotiating(multistream_select::InProgress<String>, bool), |
45 | | /// Protocol negotiation in an incoming substream is in progress, and an |
46 | | /// [`Event::InboundNegotiated`] has been emitted. Now waiting for the API user to indicate |
47 | | /// whether the protocol is supported and if so the type of substream. |
48 | | InboundNegotiatingApiWait(multistream_select::ListenerAcceptOrDeny<String>), |
49 | | /// Protocol negotiation in an incoming substream is in progress, and the API user has |
50 | | /// indicated that the given protocol is supported. Finishing the handshake before switching |
51 | | /// to a different state. |
52 | | InboundNegotiatingAccept(multistream_select::InProgress<String>, InboundTy), |
53 | | /// Incoming substream has failed to negotiate a protocol. Waiting for a close from the remote. |
54 | | /// In order to save a round-trip time, the remote might assume that the protocol negotiation |
55 | | /// has succeeded. As such, it might send additional data on this substream that should be |
56 | | /// ignored. |
57 | | InboundFailed, |
58 | | |
59 | | /// Failure to negotiate an outbound notifications substream. |
60 | | NotificationsOutNegotiationFailed, |
61 | | /// A notifications protocol is being negotiated or has been negotiated on a substream. Either |
62 | | /// a successful handshake or an abrupt closing is now expected. |
63 | | NotificationsOutHandshakeRecv { |
64 | | /// When the opening will time out in the absence of response. |
65 | | timeout: TNow, |
66 | | /// State of the protocol negotiation. `None` if the handshake has already finished. |
67 | | negotiation: Option<multistream_select::InProgress<String>>, |
68 | | /// Size of the remote handshake, if known. If `Some`, we have already extracted the length |
69 | | /// from the incoming buffer. |
70 | | handshake_in_size: Option<usize>, |
71 | | /// Maximum allowed size of the remote handshake. |
72 | | handshake_in_max_size: usize, |
73 | | /// Handshake payload to write out. |
74 | | handshake_out: VecDeque<u8>, |
75 | | }, |
76 | | /// A notifications protocol has been negotiated, and the remote accepted it. Can now send |
77 | | /// notifications. |
78 | | NotificationsOut { |
79 | | /// Notifications to write out. |
80 | | notifications: VecDeque<u8>, |
81 | | /// If `true`, we have reported a [`Event::NotificationsOutCloseDemanded`] event in the |
82 | | /// past and shouldn't report one again. |
83 | | close_demanded_by_remote: bool, |
84 | | }, |
85 | | /// A notifications protocol has been closed. Waiting for the remote to close it as well. |
86 | | NotificationsOutClosed, |
87 | | |
88 | | /// A notifications protocol has been negotiated on an incoming substream. A handshake from |
89 | | /// the remote is expected. |
90 | | NotificationsInHandshake { |
91 | | /// Size of the handshake, if known. If `Some`, we have already extracted the length |
92 | | /// from the incoming buffer. |
93 | | handshake_size: Option<usize>, |
94 | | /// Maximum allowed size of the handshake. |
95 | | handshake_max_size: usize, |
96 | | }, |
97 | | /// A handshake on a notifications protocol has been received. Now waiting for an action from |
98 | | /// the API user. |
99 | | NotificationsInWait, |
100 | | /// API user has refused an incoming substream. Waiting for a close from the remote. |
101 | | /// In order to save a round-trip time, the remote might assume that the protocol negotiation |
102 | | /// has succeeded. As such, it might send additional data on this substream that should be |
103 | | /// ignored. |
104 | | NotificationsInRefused, |
105 | | /// A notifications protocol has been negotiated on a substream. Remote can now send |
106 | | /// notifications. |
107 | | NotificationsIn { |
108 | | /// If `Some`, the local node wants to shut down the substream. If the given timeout is |
109 | | /// reached, the closing is forced. |
110 | | close_desired_timeout: Option<TNow>, |
111 | | /// Size of the next notification, if known. If `Some`, we have already extracted the |
112 | | /// length from the incoming buffer. |
113 | | next_notification_size: Option<usize>, |
114 | | /// Handshake payload to write out. |
115 | | handshake: VecDeque<u8>, |
116 | | /// Maximum size, in bytes, allowed for each notification. |
117 | | max_notification_size: usize, |
118 | | }, |
119 | | /// An inbound notifications protocol was open, but then the remote closed its writing side. |
120 | | NotificationsInClosed, |
121 | | |
122 | | /// Outgoing request. |
123 | | RequestOut { |
124 | | /// When the request will time out in the absence of response. |
125 | | timeout: TNow, |
126 | | /// State of the protocol negotiation. `None` if the negotiation has finished. |
127 | | negotiation: Option<multistream_select::InProgress<String>>, |
128 | | /// Request payload to write out. |
129 | | request: VecDeque<u8>, |
130 | | /// Size of the response, if known. If `Some`, we have already extracted the length |
131 | | /// from the incoming buffer. |
132 | | response_size: Option<usize>, |
133 | | /// Maximum allowed size of the response. |
134 | | response_max_size: usize, |
135 | | }, |
136 | | |
137 | | /// A request-response protocol has been negotiated on an inbound substream. A request is now |
138 | | /// expected. |
139 | | RequestInRecv { |
140 | | /// Size of the request, if known. If `Some`, we have already extracted the length |
141 | | /// from the incoming buffer. |
142 | | request_size: Option<usize>, |
143 | | /// Maximum allowed size of the request. |
144 | | request_max_size: usize, |
145 | | }, |
146 | | /// Similar to [`SubstreamInner::RequestInRecv`], but doesn't expect any request body. |
147 | | /// Immediately reports an event and switches to [`SubstreamInner::RequestInApiWait`]. |
148 | | RequestInRecvEmpty, |
149 | | /// A request has been sent by the remote. API user must now send back the response. |
150 | | RequestInApiWait, |
151 | | /// A request has been sent by the remote. Sending back the response. |
152 | | RequestInRespond { |
153 | | /// Response being sent back. |
154 | | response: VecDeque<u8>, |
155 | | }, |
156 | | |
157 | | /// Inbound ping substream. Waiting for the ping payload to be received. |
158 | | PingIn { payload_out: VecDeque<u8> }, |
159 | | |
160 | | /// Failed to negotiate a protocol for an outgoing ping substream. |
161 | | PingOutFailed { |
162 | | /// FIFO queue of pings that will immediately fail. |
163 | | queued_pings: smallvec::SmallVec<[Option<(TNow, Duration)>; 1]>, |
164 | | }, |
165 | | /// Outbound ping substream. |
166 | | PingOut { |
167 | | /// State of the protocol negotiation. `None` if the handshake is already finished. |
168 | | negotiation: Option<multistream_select::InProgress<String>>, |
169 | | /// Payload of the queued pings that remains to write out. |
170 | | outgoing_payload: VecDeque<u8>, |
171 | | /// Data waiting to be received from the remote. Any mismatch will cause an error. |
172 | | /// Contains even the data that is still queued in `outgoing_payload`. |
173 | | expected_payload: VecDeque<Vec<u8>>, |
174 | | /// FIFO queue of pings waiting to be answered. For each ping, when the ping was queued |
175 | | /// and after how long it will time out, or `None` if the timeout has already occurred. |
176 | | queued_pings: smallvec::SmallVec<[Option<(TNow, Duration)>; 1]>, |
177 | | }, |
178 | | } |
179 | | |
180 | | impl<TNow> Substream<TNow> |
181 | | where |
182 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
183 | | { |
184 | | /// Initializes an new `ingoing` substream. |
185 | | /// |
186 | | /// After the remote has requested a protocol, an [`Event::InboundNegotiated`] event will be |
187 | | /// generated, after which [`Substream::accept_inbound`] or [`Substream::reject_inbound`] must |
188 | | /// be called in order to indicate whether the protocol is accepted, and if so the nature of |
189 | | /// the negotiated protocol. |
190 | | /// A [`Event::InboundError`] can also be generated, either before or after the |
191 | | /// [`Event::InboundNegotiated`], but always before any [`Event::NotificationsInOpen`]. |
192 | | /// |
193 | | /// If [`InboundTy::Notifications`] is passed, then a [`Event::NotificationsInOpen`] will be |
194 | | /// generated (unless an error happens, in which case [`Event::InboundError`]). |
195 | | /// In response, the API user must call either [`Substream::accept_in_notifications_substream`] |
196 | | /// or [`Substream::reject_in_notifications_substream`]. Before one of these two methods is |
197 | | /// called, it is possible for an [`Event::NotificationsInOpenCancel`] to be generated, in |
198 | | /// which case the inbound request is canceled and the substream closed. |
199 | | /// After [`Substream::accept_in_notifications_substream`] is called, zero or more |
200 | | /// [`Event::NotificationIn`] will be generated, until a [`Event::NotificationsInClose`] which |
201 | | /// indicates the end of the substream. |
202 | | /// |
203 | | /// If [`InboundTy::Request`] is passed, then a [`Event::RequestIn`] will be generated, after |
204 | | /// which the API user must call [`Substream::respond_in_request`]. An [`Event::InboundError`] |
205 | | /// can happen at any point. |
206 | | /// |
207 | | /// This flow is also true if you call [`Substream::reset`] at any point. |
208 | 20 | pub fn ingoing(max_protocol_name_len: usize) -> Self { |
209 | 20 | let negotiation = |
210 | 20 | multistream_select::InProgress::new(multistream_select::Config::Listener { |
211 | 20 | max_protocol_name_len, |
212 | 20 | }); |
213 | 20 | |
214 | 20 | Substream { |
215 | 20 | inner: SubstreamInner::InboundNegotiating(negotiation, false), |
216 | 20 | } |
217 | 20 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE7ingoingBa_ Line | Count | Source | 208 | 20 | pub fn ingoing(max_protocol_name_len: usize) -> Self { | 209 | 20 | let negotiation = | 210 | 20 | multistream_select::InProgress::new(multistream_select::Config::Listener { | 211 | 20 | max_protocol_name_len, | 212 | 20 | }); | 213 | 20 | | 214 | 20 | Substream { | 215 | 20 | inner: SubstreamInner::InboundNegotiating(negotiation, false), | 216 | 20 | } | 217 | 20 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE7ingoingCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE7ingoingBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE7ingoingCsiUjFBJteJ7x_17smoldot_full_node |
218 | | |
219 | | /// Initializes an outgoing notifications substream. |
220 | | /// |
221 | | /// After the remote has sent back a handshake or after an error occurred, an |
222 | | /// [`Event::NotificationsOutResult`] event will be generated locally. |
223 | | /// |
224 | | /// If this event contains an `Ok`, then [`Substream::write_notification_unbounded`], |
225 | | /// [`Substream::notification_substream_queued_bytes`] and |
226 | | /// [`Substream::close_out_notifications_substream`] can be used, and |
227 | | /// [`Event::NotificationsOutCloseDemanded`] and [`Event::NotificationsOutReset`] can be |
228 | | /// generated. |
229 | 3 | pub fn notifications_out( |
230 | 3 | timeout: TNow, |
231 | 3 | requested_protocol: String, |
232 | 3 | handshake: Vec<u8>, |
233 | 3 | max_handshake_size: usize, |
234 | 3 | ) -> Self { |
235 | 3 | // TODO: check `handshake < max_handshake_size`? |
236 | 3 | |
237 | 3 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { |
238 | 3 | requested_protocol, |
239 | 3 | }); |
240 | 3 | |
241 | 3 | let handshake_out = { |
242 | 3 | let handshake_len = handshake.len(); |
243 | 3 | leb128::encode_usize(handshake_len) |
244 | 3 | .chain(handshake) |
245 | 3 | .collect::<VecDeque<_>>() |
246 | 3 | }; |
247 | 3 | |
248 | 3 | Substream { |
249 | 3 | inner: SubstreamInner::NotificationsOutHandshakeRecv { |
250 | 3 | timeout, |
251 | 3 | negotiation: Some(negotiation), |
252 | 3 | handshake_in_size: None, |
253 | 3 | handshake_in_max_size: max_handshake_size, |
254 | 3 | handshake_out, |
255 | 3 | }, |
256 | 3 | } |
257 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE17notifications_outBa_ Line | Count | Source | 229 | 3 | pub fn notifications_out( | 230 | 3 | timeout: TNow, | 231 | 3 | requested_protocol: String, | 232 | 3 | handshake: Vec<u8>, | 233 | 3 | max_handshake_size: usize, | 234 | 3 | ) -> Self { | 235 | 3 | // TODO: check `handshake < max_handshake_size`? | 236 | 3 | | 237 | 3 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { | 238 | 3 | requested_protocol, | 239 | 3 | }); | 240 | 3 | | 241 | 3 | let handshake_out = { | 242 | 3 | let handshake_len = handshake.len(); | 243 | 3 | leb128::encode_usize(handshake_len) | 244 | 3 | .chain(handshake) | 245 | 3 | .collect::<VecDeque<_>>() | 246 | 3 | }; | 247 | 3 | | 248 | 3 | Substream { | 249 | 3 | inner: SubstreamInner::NotificationsOutHandshakeRecv { | 250 | 3 | timeout, | 251 | 3 | negotiation: Some(negotiation), | 252 | 3 | handshake_in_size: None, | 253 | 3 | handshake_in_max_size: max_handshake_size, | 254 | 3 | handshake_out, | 255 | 3 | }, | 256 | 3 | } | 257 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE17notifications_outCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE17notifications_outBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17notifications_outCsiUjFBJteJ7x_17smoldot_full_node |
258 | | |
259 | | /// Initializes an outgoing request substream. |
260 | | /// |
261 | | /// After the remote has sent back a response or after an error occurred, an [`Event::Response`] |
262 | | /// event will be generated locally. The `user_data` parameter will be passed back. |
263 | | /// |
264 | | /// If the `request` is `None`, then nothing at all will be written out, not even a length |
265 | | /// prefix. If the `request` is `Some`, then a length prefix will be written out. Consequently, |
266 | | /// `Some(&[])` writes a single `0` for the request. |
267 | 3 | pub fn request_out( |
268 | 3 | requested_protocol: String, |
269 | 3 | timeout: TNow, |
270 | 3 | request: Option<Vec<u8>>, |
271 | 3 | max_response_size: usize, |
272 | 3 | ) -> Self { |
273 | 3 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { |
274 | 3 | requested_protocol, |
275 | 3 | }); |
276 | | |
277 | 3 | let request_payload = if let Some(request) = request { |
278 | 3 | let request_len = request.len(); |
279 | 3 | leb128::encode_usize(request_len) |
280 | 3 | .chain(request) |
281 | 3 | .collect::<VecDeque<_>>() |
282 | | } else { |
283 | 0 | VecDeque::new() |
284 | | }; |
285 | | |
286 | 3 | Substream { |
287 | 3 | inner: SubstreamInner::RequestOut { |
288 | 3 | timeout, |
289 | 3 | negotiation: Some(negotiation), |
290 | 3 | request: request_payload, |
291 | 3 | response_size: None, |
292 | 3 | response_max_size: max_response_size, |
293 | 3 | }, |
294 | 3 | } |
295 | 3 | |
296 | 3 | // TODO: somehow do substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size |
297 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11request_outBa_ Line | Count | Source | 267 | 3 | pub fn request_out( | 268 | 3 | requested_protocol: String, | 269 | 3 | timeout: TNow, | 270 | 3 | request: Option<Vec<u8>>, | 271 | 3 | max_response_size: usize, | 272 | 3 | ) -> Self { | 273 | 3 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { | 274 | 3 | requested_protocol, | 275 | 3 | }); | 276 | | | 277 | 3 | let request_payload = if let Some(request) = request { | 278 | 3 | let request_len = request.len(); | 279 | 3 | leb128::encode_usize(request_len) | 280 | 3 | .chain(request) | 281 | 3 | .collect::<VecDeque<_>>() | 282 | | } else { | 283 | 0 | VecDeque::new() | 284 | | }; | 285 | | | 286 | 3 | Substream { | 287 | 3 | inner: SubstreamInner::RequestOut { | 288 | 3 | timeout, | 289 | 3 | negotiation: Some(negotiation), | 290 | 3 | request: request_payload, | 291 | 3 | response_size: None, | 292 | 3 | response_max_size: max_response_size, | 293 | 3 | }, | 294 | 3 | } | 295 | 3 | | 296 | 3 | // TODO: somehow do substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size | 297 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11request_outCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE11request_outBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11request_outCsiUjFBJteJ7x_17smoldot_full_node |
298 | | |
299 | | /// Initializes an outgoing ping substream. |
300 | | /// |
301 | | /// Call [`Substream::queue_ping`] in order to queue an outgoing ping on this substream. This |
302 | | /// can be done at any time, even immediately after this function has returned. |
303 | | /// |
304 | | /// The substream will attempt to negotiate the ping protocol. No event is reported if the |
305 | | /// protocol fails to negotiate. Instead, outgoing pings will be transparently failing. |
306 | | /// |
307 | | /// > Note: At the time of the writing of this comment, no API method exists to close an |
308 | | /// > outgoing ping substream. |
309 | 14 | pub fn ping_out(ping_protocol_name: String) -> Self { |
310 | 14 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { |
311 | 14 | requested_protocol: ping_protocol_name, |
312 | 14 | }); |
313 | 14 | |
314 | 14 | Substream { |
315 | 14 | inner: SubstreamInner::PingOut { |
316 | 14 | negotiation: Some(negotiation), |
317 | 14 | outgoing_payload: VecDeque::with_capacity(32), |
318 | 14 | expected_payload: VecDeque::with_capacity(32), |
319 | 14 | queued_pings: smallvec::SmallVec::new(), |
320 | 14 | }, |
321 | 14 | } |
322 | 14 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE8ping_outBa_ Line | Count | Source | 309 | 14 | pub fn ping_out(ping_protocol_name: String) -> Self { | 310 | 14 | let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer { | 311 | 14 | requested_protocol: ping_protocol_name, | 312 | 14 | }); | 313 | 14 | | 314 | 14 | Substream { | 315 | 14 | inner: SubstreamInner::PingOut { | 316 | 14 | negotiation: Some(negotiation), | 317 | 14 | outgoing_payload: VecDeque::with_capacity(32), | 318 | 14 | expected_payload: VecDeque::with_capacity(32), | 319 | 14 | queued_pings: smallvec::SmallVec::new(), | 320 | 14 | }, | 321 | 14 | } | 322 | 14 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE8ping_outCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE8ping_outBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE8ping_outCsiUjFBJteJ7x_17smoldot_full_node |
323 | | |
324 | | /// Reads data coming from the socket, updates the internal state machine, and writes data |
325 | | /// destined to the socket through the [`read_write::ReadWrite`]. |
326 | | /// |
327 | | /// If both the reading side and the writing side are closed and no other event can happen, or |
328 | | /// if at any point a protocol error happens, then `None` is returned. In that case, the |
329 | | /// substream must be reset if it is not closed. |
330 | 329 | pub fn read_write( |
331 | 329 | self, |
332 | 329 | read_write: &'_ mut read_write::ReadWrite<TNow>, |
333 | 329 | ) -> (Option<Self>, Option<Event>) { |
334 | 329 | let (me, event) = self.read_write2(read_write); |
335 | 329 | (me.map(|inner| Substream { inner }324 ), event) _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_write0Bc_ Line | Count | Source | 335 | 324 | (me.map(|inner| Substream { inner }), event) |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE10read_write0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_write0CsiUjFBJteJ7x_17smoldot_full_node |
336 | 329 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeBa_ Line | Count | Source | 330 | 329 | pub fn read_write( | 331 | 329 | self, | 332 | 329 | read_write: &'_ mut read_write::ReadWrite<TNow>, | 333 | 329 | ) -> (Option<Self>, Option<Event>) { | 334 | 329 | let (me, event) = self.read_write2(read_write); | 335 | 329 | (me.map(|inner| Substream { inner }), event) | 336 | 329 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE10read_writeBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_writeCsiUjFBJteJ7x_17smoldot_full_node |
337 | | |
338 | 329 | fn read_write2( |
339 | 329 | self, |
340 | 329 | read_write: &'_ mut read_write::ReadWrite<TNow>, |
341 | 329 | ) -> (Option<SubstreamInner<TNow>>, Option<Event>) { |
342 | 329 | match self.inner { |
343 | 102 | SubstreamInner::InboundNegotiating(nego, was_rejected_already) => { |
344 | 102 | match nego.read_write(read_write) { |
345 | 81 | Ok(multistream_select::Negotiation::InProgress(nego)) => ( |
346 | 81 | Some(SubstreamInner::InboundNegotiating( |
347 | 81 | nego, |
348 | 81 | was_rejected_already, |
349 | 81 | )), |
350 | 81 | None, |
351 | 81 | ), |
352 | 20 | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(accept_deny)) => { |
353 | 20 | // TODO: maybe avoid cloning the protocol name? |
354 | 20 | let protocol = accept_deny.requested_protocol().to_owned(); |
355 | 20 | ( |
356 | 20 | Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)), |
357 | 20 | Some(Event::InboundNegotiated(protocol)), |
358 | 20 | ) |
359 | | } |
360 | | Ok(multistream_select::Negotiation::Success) => { |
361 | | // Unreachable, as we expect a `ListenerAcceptOrDeny`. |
362 | 0 | unreachable!() |
363 | | } |
364 | | Ok(multistream_select::Negotiation::NotAvailable) => { |
365 | | // Unreachable in listener mode. |
366 | 0 | unreachable!() |
367 | | } |
368 | 1 | Err(_) if was_rejected_already => { |
369 | 1 | // If the negotiation was already rejected once, it is likely that the |
370 | 1 | // multistream-select protocol error is due to the fact that the remote |
371 | 1 | // assumes that the multistream-select negotiation always succeeds. As such, |
372 | 1 | // we treat this situation as "negotiation has failed gracefully". |
373 | 1 | (Some(SubstreamInner::InboundFailed), None) |
374 | | } |
375 | 0 | Err(err) => ( |
376 | 0 | None, |
377 | 0 | Some(Event::InboundError { |
378 | 0 | error: InboundError::NegotiationError(err), |
379 | 0 | was_accepted: false, |
380 | 0 | }), |
381 | 0 | ), |
382 | | } |
383 | | } |
384 | 0 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => ( |
385 | 0 | Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)), |
386 | 0 | None, |
387 | 0 | ), |
388 | 18 | SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty) => { |
389 | 18 | match nego.read_write(read_write) { |
390 | 0 | Ok(multistream_select::Negotiation::InProgress(nego)) => ( |
391 | 0 | Some(SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty)), |
392 | 0 | None, |
393 | 0 | ), |
394 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { |
395 | | // Can't be reached again, as we have already accepted the protocol. |
396 | 0 | unreachable!() |
397 | | } |
398 | 18 | Ok(multistream_select::Negotiation::Success) => match inbound_ty { |
399 | 13 | InboundTy::Ping => ( |
400 | 13 | Some(SubstreamInner::PingIn { |
401 | 13 | payload_out: VecDeque::with_capacity(32), |
402 | 13 | }), |
403 | 13 | None, |
404 | 13 | ), |
405 | 3 | InboundTy::Notifications { max_handshake_size } => ( |
406 | 3 | Some(SubstreamInner::NotificationsInHandshake { |
407 | 3 | handshake_size: None, |
408 | 3 | handshake_max_size: max_handshake_size, |
409 | 3 | }), |
410 | 3 | None, |
411 | 3 | ), |
412 | 2 | InboundTy::Request { request_max_size } => { |
413 | 2 | if let Some(request_max_size) = request_max_size { |
414 | 2 | ( |
415 | 2 | Some(SubstreamInner::RequestInRecv { |
416 | 2 | request_max_size, |
417 | 2 | request_size: None, |
418 | 2 | }), |
419 | 2 | None, |
420 | 2 | ) |
421 | | } else { |
422 | 0 | (Some(SubstreamInner::RequestInRecvEmpty), None) |
423 | | } |
424 | | } |
425 | | }, |
426 | | Ok(multistream_select::Negotiation::NotAvailable) => { |
427 | | // Unreachable in listener mode. |
428 | 0 | unreachable!() |
429 | | } |
430 | 0 | Err(err) => ( |
431 | 0 | None, |
432 | 0 | Some(Event::InboundError { |
433 | 0 | error: InboundError::NegotiationError(err), |
434 | 0 | was_accepted: true, |
435 | 0 | }), |
436 | 0 | ), |
437 | | } |
438 | | } |
439 | | SubstreamInner::InboundFailed => { |
440 | | // Substream is an inbound substream that has failed to negotiate a |
441 | | // protocol. The substream is expected to close soon, but the remote might |
442 | | // have been eagerly sending data (assuming that the negotiation would |
443 | | // succeed), which should be silently discarded. |
444 | 1 | read_write.discard_all_incoming(); |
445 | 1 | read_write.close_write(); |
446 | 1 | if read_write.is_dead() { |
447 | 1 | (None, None) |
448 | | } else { |
449 | 0 | (Some(SubstreamInner::InboundFailed), None) |
450 | | } |
451 | | } |
452 | | |
453 | | SubstreamInner::NotificationsOutNegotiationFailed => { |
454 | | // Substream has failed to negotiate a protocol. The substream is expected to |
455 | | // close soon. |
456 | 0 | read_write.discard_all_incoming(); |
457 | 0 | read_write.close_write(); |
458 | 0 | ( |
459 | 0 | if read_write.is_dead() { |
460 | 0 | None |
461 | | } else { |
462 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed) |
463 | | }, |
464 | 0 | None, |
465 | | ) |
466 | | } |
467 | | SubstreamInner::NotificationsOutHandshakeRecv { |
468 | 30 | timeout, |
469 | 30 | mut negotiation, |
470 | 30 | mut handshake_in_size, |
471 | 30 | handshake_in_max_size, |
472 | 30 | mut handshake_out, |
473 | 30 | } => { |
474 | 30 | if timeout < read_write.now { |
475 | 1 | read_write.wake_up_asap(); |
476 | 1 | return ( |
477 | 1 | Some(SubstreamInner::NotificationsOutNegotiationFailed), |
478 | 1 | Some(Event::NotificationsOutResult { |
479 | 1 | result: Err(NotificationsOutErr::Timeout), |
480 | 1 | }), |
481 | 1 | ); |
482 | 29 | } |
483 | | |
484 | 29 | if let Some(extracted_negotiation25 ) = negotiation.take() { |
485 | 25 | match extracted_negotiation.read_write(read_write) { |
486 | 23 | Ok(multistream_select::Negotiation::InProgress(nego)) => { |
487 | 23 | negotiation = Some(nego) |
488 | | } |
489 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { |
490 | | // Never happens when dialing. |
491 | 0 | unreachable!() |
492 | | } |
493 | 2 | Ok(multistream_select::Negotiation::Success) => {} |
494 | | Ok(multistream_select::Negotiation::NotAvailable) => { |
495 | 0 | read_write.wake_up_asap(); |
496 | 0 | return ( |
497 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed), |
498 | 0 | Some(Event::NotificationsOutResult { |
499 | 0 | result: Err(NotificationsOutErr::ProtocolNotAvailable), |
500 | 0 | }), |
501 | 0 | ); |
502 | | } |
503 | 0 | Err(err) => { |
504 | 0 | return ( |
505 | 0 | None, |
506 | 0 | Some(Event::NotificationsOutResult { |
507 | 0 | result: Err(NotificationsOutErr::NegotiationError(err)), |
508 | 0 | }), |
509 | 0 | ) |
510 | | } |
511 | | } |
512 | 4 | } |
513 | | |
514 | 29 | if negotiation |
515 | 29 | .as_ref() |
516 | 29 | .map_or(true, |n| n.can_write_protocol_data()23 ) _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write20Bc_ Line | Count | Source | 516 | 23 | .map_or(true, |n| n.can_write_protocol_data()) |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write20CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write20Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write20CsiUjFBJteJ7x_17smoldot_full_node |
517 | 17 | { |
518 | 17 | read_write.write_from_vec_deque(&mut handshake_out); |
519 | 17 | }12 |
520 | | |
521 | 29 | if negotiation.is_none() { |
522 | 6 | if read_write.expected_incoming_bytes.is_none() { |
523 | 1 | read_write.wake_up_asap(); |
524 | 1 | return ( |
525 | 1 | Some(SubstreamInner::NotificationsOutNegotiationFailed), |
526 | 1 | Some(Event::NotificationsOutResult { |
527 | 1 | result: Err(NotificationsOutErr::RefusedHandshake), |
528 | 1 | }), |
529 | 1 | ); |
530 | 5 | } |
531 | 5 | |
532 | 5 | // Don't actually process incoming data before handshake is sent out, in order |
533 | 5 | // to not accidentally perform a state transition. |
534 | 5 | if !handshake_out.is_empty() { |
535 | 0 | return ( |
536 | 0 | Some(SubstreamInner::NotificationsOutHandshakeRecv { |
537 | 0 | timeout, |
538 | 0 | negotiation, |
539 | 0 | handshake_in_size, |
540 | 0 | handshake_in_max_size, |
541 | 0 | handshake_out, |
542 | 0 | }), |
543 | 0 | None, |
544 | 0 | ); |
545 | 5 | } |
546 | | |
547 | 5 | if let Some(handshake_in_size1 ) = handshake_in_size { |
548 | 1 | match read_write.incoming_bytes_take(handshake_in_size) { |
549 | 1 | Ok(Some(remote_handshake)) => { |
550 | 1 | read_write.wake_up_asap(); |
551 | 1 | return ( |
552 | 1 | Some(SubstreamInner::NotificationsOut { |
553 | 1 | notifications: VecDeque::new(), |
554 | 1 | close_demanded_by_remote: false, |
555 | 1 | }), |
556 | 1 | Some(Event::NotificationsOutResult { |
557 | 1 | result: Ok(remote_handshake), |
558 | 1 | }), |
559 | 1 | ); |
560 | | } |
561 | 0 | Ok(None) => {} |
562 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { |
563 | 0 | read_write.wake_up_asap(); |
564 | 0 | return ( |
565 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed), |
566 | 0 | Some(Event::NotificationsOutResult { |
567 | 0 | result: Err(NotificationsOutErr::RefusedHandshake), |
568 | 0 | }), |
569 | 0 | ); |
570 | | } |
571 | | } |
572 | | } else { |
573 | 4 | match read_write.incoming_bytes_take_leb128(handshake_in_max_size) { |
574 | 1 | Ok(Some(s)) => handshake_in_size = Some(s), |
575 | 3 | Ok(None) => {} |
576 | 0 | Err(error) => return ( |
577 | 0 | None, |
578 | 0 | Some(Event::Response { |
579 | 0 | response: Err(match error { |
580 | | read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => { |
581 | 0 | RequestError::ResponseInvalidLeb128 |
582 | | } |
583 | | read_write::IncomingBytesTakeLeb128Error::ReadClosed => { |
584 | 0 | RequestError::SubstreamClosed |
585 | | } |
586 | | read_write::IncomingBytesTakeLeb128Error::TooLarge => { |
587 | 0 | RequestError::ResponseTooLarge |
588 | | } |
589 | | }), |
590 | | }), |
591 | | ), |
592 | | } |
593 | | } |
594 | 23 | } |
595 | | |
596 | 27 | read_write.wake_up_after(&timeout); |
597 | 27 | |
598 | 27 | ( |
599 | 27 | Some(SubstreamInner::NotificationsOutHandshakeRecv { |
600 | 27 | timeout, |
601 | 27 | negotiation, |
602 | 27 | handshake_in_size, |
603 | 27 | handshake_in_max_size, |
604 | 27 | handshake_out, |
605 | 27 | }), |
606 | 27 | None, |
607 | 27 | ) |
608 | | } |
609 | | SubstreamInner::NotificationsOut { |
610 | 3 | mut notifications, |
611 | 3 | close_demanded_by_remote, |
612 | 3 | } => { |
613 | 3 | // Receiving data on an outgoing substream is forbidden by the protocol. |
614 | 3 | read_write.discard_all_incoming(); |
615 | 3 | read_write.write_from_vec_deque(&mut notifications); |
616 | 3 | |
617 | 3 | // If this debug assertion fails, it means that `expected_incoming_bytes` was `None` in |
618 | 3 | // the past then became `Some` again. |
619 | 3 | debug_assert!( |
620 | 3 | !close_demanded_by_remote || read_write.expected_incoming_bytes.is_none()0 |
621 | | ); |
622 | | |
623 | 3 | if !close_demanded_by_remote && read_write.expected_incoming_bytes.is_none() { |
624 | 0 | read_write.wake_up_asap(); |
625 | 0 | return ( |
626 | 0 | Some(SubstreamInner::NotificationsOut { |
627 | 0 | notifications, |
628 | 0 | close_demanded_by_remote: true, |
629 | 0 | }), |
630 | 0 | Some(Event::NotificationsOutCloseDemanded), |
631 | 0 | ); |
632 | 3 | } |
633 | 3 | |
634 | 3 | ( |
635 | 3 | Some(SubstreamInner::NotificationsOut { |
636 | 3 | notifications, |
637 | 3 | close_demanded_by_remote, |
638 | 3 | }), |
639 | 3 | None, |
640 | 3 | ) |
641 | | } |
642 | | SubstreamInner::NotificationsOutClosed => { |
643 | 0 | read_write.discard_all_incoming(); |
644 | 0 | read_write.close_write(); |
645 | 0 | ( |
646 | 0 | if read_write.is_dead() { |
647 | 0 | None |
648 | | } else { |
649 | 0 | Some(SubstreamInner::NotificationsOutClosed) |
650 | | }, |
651 | 0 | None, |
652 | | ) |
653 | | } |
654 | | |
655 | | SubstreamInner::RequestOut { |
656 | 27 | timeout, |
657 | 27 | mut negotiation, |
658 | 27 | mut request, |
659 | 27 | mut response_size, |
660 | 27 | response_max_size, |
661 | 27 | } => { |
662 | 27 | // Note that this might trigger timeouts for requests whose response is available |
663 | 27 | // in `incoming_buffer`. This is intentional, as from the perspective of |
664 | 27 | // `read_write` the response arrived after the timeout. It is the responsibility |
665 | 27 | // of the user to call `read_write` in an appropriate way for this to not happen. |
666 | 27 | if timeout < read_write.now { |
667 | 1 | read_write.close_write(); |
668 | 1 | return ( |
669 | 1 | None, |
670 | 1 | Some(Event::Response { |
671 | 1 | response: Err(RequestError::Timeout), |
672 | 1 | }), |
673 | 1 | ); |
674 | 26 | } |
675 | | |
676 | 26 | if let Some(extracted_nego25 ) = negotiation.take() { |
677 | 25 | match extracted_nego.read_write(read_write) { |
678 | 23 | Ok(multistream_select::Negotiation::InProgress(nego)) => { |
679 | 23 | negotiation = Some(nego) |
680 | | } |
681 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { |
682 | | // Never happens when dialing. |
683 | 0 | unreachable!() |
684 | | } |
685 | 1 | Ok(multistream_select::Negotiation::Success) => {} |
686 | | Ok(multistream_select::Negotiation::NotAvailable) => { |
687 | 1 | return ( |
688 | 1 | None, |
689 | 1 | Some(Event::Response { |
690 | 1 | response: Err(RequestError::ProtocolNotAvailable), |
691 | 1 | }), |
692 | 1 | ) |
693 | | } |
694 | 0 | Err(err) => { |
695 | 0 | return ( |
696 | 0 | None, |
697 | 0 | Some(Event::Response { |
698 | 0 | response: Err(RequestError::NegotiationError(err)), |
699 | 0 | }), |
700 | 0 | ) |
701 | | } |
702 | | } |
703 | 1 | } |
704 | | |
705 | 25 | if negotiation |
706 | 25 | .as_ref() |
707 | 25 | .map_or(true, |n| n.can_write_protocol_data()23 ) _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s_0Bc_ Line | Count | Source | 707 | 23 | .map_or(true, |n| n.can_write_protocol_data()) |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s_0CsiUjFBJteJ7x_17smoldot_full_node |
708 | | { |
709 | 13 | if request.is_empty() { |
710 | 10 | read_write.close_write(); |
711 | 10 | } else { |
712 | 3 | read_write.write_from_vec_deque(&mut request); |
713 | 3 | } |
714 | 12 | } |
715 | | |
716 | 25 | if negotiation.is_none() { |
717 | 2 | if let Some(response_size0 ) = response_size { |
718 | 0 | match read_write.incoming_bytes_take(response_size) { |
719 | 0 | Ok(Some(response)) => { |
720 | 0 | return ( |
721 | 0 | None, |
722 | 0 | Some(Event::Response { |
723 | 0 | response: Ok(response), |
724 | 0 | }), |
725 | 0 | ); |
726 | | } |
727 | 0 | Ok(None) => {} |
728 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { |
729 | 0 | return ( |
730 | 0 | None, |
731 | 0 | Some(Event::Response { |
732 | 0 | response: Err(RequestError::SubstreamClosed), |
733 | 0 | }), |
734 | 0 | ) |
735 | | } |
736 | | } |
737 | | } else { |
738 | 2 | match read_write.incoming_bytes_take_leb128(response_max_size) { |
739 | 0 | Ok(Some(s)) => response_size = Some(s), |
740 | 1 | Ok(None) => {} |
741 | 1 | Err(error) => return ( |
742 | 1 | None, |
743 | 1 | Some(Event::Response { |
744 | 1 | response: Err(match error { |
745 | | read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => { |
746 | 0 | RequestError::ResponseInvalidLeb128 |
747 | | } |
748 | | read_write::IncomingBytesTakeLeb128Error::ReadClosed => { |
749 | 1 | RequestError::SubstreamClosed |
750 | | } |
751 | | read_write::IncomingBytesTakeLeb128Error::TooLarge => { |
752 | 0 | RequestError::ResponseTooLarge |
753 | | } |
754 | | }), |
755 | | }), |
756 | | ), |
757 | | } |
758 | | } |
759 | 23 | } |
760 | | |
761 | 24 | read_write.wake_up_after(&timeout); |
762 | 24 | |
763 | 24 | ( |
764 | 24 | Some(SubstreamInner::RequestOut { |
765 | 24 | timeout, |
766 | 24 | negotiation, |
767 | 24 | request, |
768 | 24 | response_size, |
769 | 24 | response_max_size, |
770 | 24 | }), |
771 | 24 | None, |
772 | 24 | ) |
773 | | } |
774 | | |
775 | | SubstreamInner::RequestInRecv { |
776 | 4 | mut request_size, |
777 | 4 | request_max_size, |
778 | | } => { |
779 | 4 | if let Some(request_size2 ) = request_size { |
780 | 2 | match read_write.incoming_bytes_take(request_size) { |
781 | 2 | Ok(Some(request)) => { |
782 | 2 | return ( |
783 | 2 | Some(SubstreamInner::RequestInApiWait), |
784 | 2 | Some(Event::RequestIn { request }), |
785 | 2 | ); |
786 | | } |
787 | 0 | Ok(None) => {} |
788 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { |
789 | 0 | return ( |
790 | 0 | None, |
791 | 0 | Some(Event::InboundError { |
792 | 0 | error: InboundError::SubstreamClosed, |
793 | 0 | was_accepted: true, |
794 | 0 | }), |
795 | 0 | ) |
796 | | } |
797 | | } |
798 | | } else { |
799 | 2 | match read_write.incoming_bytes_take_leb128(request_max_size) { |
800 | 2 | Ok(Some(s)) => request_size = Some(s), |
801 | 0 | Ok(None) => {} |
802 | 0 | Err(error) => { |
803 | 0 | return ( |
804 | 0 | None, |
805 | 0 | Some(Event::InboundError { |
806 | 0 | error: InboundError::RequestInLebError(error), |
807 | 0 | was_accepted: true, |
808 | 0 | }), |
809 | 0 | ) |
810 | | } |
811 | | } |
812 | | } |
813 | | |
814 | 2 | ( |
815 | 2 | Some(SubstreamInner::RequestInRecv { |
816 | 2 | request_max_size, |
817 | 2 | request_size, |
818 | 2 | }), |
819 | 2 | None, |
820 | 2 | ) |
821 | | } |
822 | 0 | SubstreamInner::RequestInRecvEmpty => ( |
823 | 0 | Some(SubstreamInner::RequestInApiWait), |
824 | 0 | Some(Event::RequestIn { |
825 | 0 | request: Vec::new(), |
826 | 0 | }), |
827 | 0 | ), |
828 | 0 | SubstreamInner::RequestInApiWait => (Some(SubstreamInner::RequestInApiWait), None), |
829 | 1 | SubstreamInner::RequestInRespond { mut response } => { |
830 | 1 | if response.is_empty() { |
831 | 1 | read_write.close_write(); |
832 | 1 | (None, None) |
833 | | } else { |
834 | 0 | read_write.write_from_vec_deque(&mut response); |
835 | 0 | (Some(SubstreamInner::RequestInRespond { response }), None) |
836 | | } |
837 | | } |
838 | | |
839 | | SubstreamInner::NotificationsInHandshake { |
840 | 6 | handshake_max_size, |
841 | 6 | mut handshake_size, |
842 | | } => { |
843 | 6 | if let Some(handshake_size3 ) = handshake_size { |
844 | 3 | match read_write.incoming_bytes_take(handshake_size) { |
845 | 3 | Ok(Some(handshake)) => { |
846 | 3 | return ( |
847 | 3 | Some(SubstreamInner::NotificationsInWait), |
848 | 3 | Some(Event::NotificationsInOpen { handshake }), |
849 | 3 | ); |
850 | | } |
851 | 0 | Ok(None) => {} |
852 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { |
853 | 0 | return ( |
854 | 0 | None, |
855 | 0 | Some(Event::InboundError { |
856 | 0 | error: InboundError::SubstreamClosed, |
857 | 0 | was_accepted: true, |
858 | 0 | }), |
859 | 0 | ) |
860 | | } |
861 | | } |
862 | | } else { |
863 | 3 | match read_write.incoming_bytes_take_leb128(handshake_max_size) { |
864 | 3 | Ok(Some(s)) => handshake_size = Some(s), |
865 | 0 | Ok(None) => {} |
866 | 0 | Err(error) => { |
867 | 0 | return ( |
868 | 0 | None, |
869 | 0 | Some(Event::InboundError { |
870 | 0 | error: InboundError::NotificationsInError { error }, |
871 | 0 | was_accepted: true, |
872 | 0 | }), |
873 | 0 | ) |
874 | | } |
875 | | } |
876 | | } |
877 | | |
878 | 3 | ( |
879 | 3 | Some(SubstreamInner::NotificationsInHandshake { |
880 | 3 | handshake_max_size, |
881 | 3 | handshake_size, |
882 | 3 | }), |
883 | 3 | None, |
884 | 3 | ) |
885 | | } |
886 | | SubstreamInner::NotificationsInWait => { |
887 | | // Incoming data isn't processed, potentially back-pressuring it. |
888 | 0 | if read_write.expected_incoming_bytes.is_some() { |
889 | 0 | (Some(SubstreamInner::NotificationsInWait), None) |
890 | | } else { |
891 | 0 | read_write.wake_up_asap(); |
892 | 0 | ( |
893 | 0 | Some(SubstreamInner::NotificationsInRefused), |
894 | 0 | Some(Event::NotificationsInOpenCancel), |
895 | 0 | ) |
896 | | } |
897 | | } |
898 | | SubstreamInner::NotificationsInRefused => { |
899 | 2 | read_write.discard_all_incoming(); |
900 | 2 | read_write.close_write(); |
901 | 2 | ( |
902 | 2 | if read_write.is_dead() { |
903 | 0 | None |
904 | | } else { |
905 | 2 | Some(SubstreamInner::NotificationsInRefused) |
906 | | }, |
907 | 2 | None, |
908 | | ) |
909 | | } |
910 | | SubstreamInner::NotificationsIn { |
911 | 9 | close_desired_timeout, |
912 | 9 | mut next_notification_size, |
913 | 9 | mut handshake, |
914 | 9 | max_notification_size, |
915 | 9 | } => { |
916 | 9 | read_write.write_from_vec_deque(&mut handshake); |
917 | 9 | |
918 | 9 | if close_desired_timeout |
919 | 9 | .as_ref() |
920 | 9 | .map_or(false, |timeout| *timeout >= read_write.now0 ) Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s0_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s0_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s0_0CsiUjFBJteJ7x_17smoldot_full_node |
921 | | { |
922 | 0 | read_write.wake_up_asap(); |
923 | 0 | return ( |
924 | 0 | Some(SubstreamInner::NotificationsInClosed), |
925 | 0 | Some(Event::NotificationsInClose { |
926 | 0 | outcome: Err(NotificationsInClosedErr::CloseDesiredTimeout), |
927 | 0 | }), |
928 | 0 | ); |
929 | 9 | } |
930 | 9 | |
931 | 9 | if close_desired_timeout.is_some() && handshake.is_empty()0 { |
932 | 0 | read_write.close_write(); |
933 | 9 | } |
934 | | |
935 | 9 | let mut notification = None; |
936 | | |
937 | 9 | if let Some(sz3 ) = next_notification_size { |
938 | 3 | match read_write.incoming_bytes_take(sz) { |
939 | 3 | Ok(Some(notif)) => { |
940 | 3 | read_write.wake_up_asap(); |
941 | 3 | notification = Some(notif); |
942 | 3 | next_notification_size = None; |
943 | 3 | } |
944 | 0 | Ok(None) => {} |
945 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { |
946 | 0 | read_write.wake_up_asap(); |
947 | 0 | return ( |
948 | 0 | Some(SubstreamInner::NotificationsInClosed), |
949 | 0 | Some(Event::NotificationsInClose { |
950 | 0 | outcome: Err(NotificationsInClosedErr::SubstreamClosed), |
951 | 0 | }), |
952 | 0 | ); |
953 | | } |
954 | | } |
955 | | } else { |
956 | 6 | match read_write.incoming_bytes_take_leb128(max_notification_size) { |
957 | 3 | Ok(Some(s)) => next_notification_size = Some(s), |
958 | 3 | Ok(None) => {} |
959 | 0 | Err(error) => { |
960 | 0 | read_write.wake_up_asap(); |
961 | 0 | return ( |
962 | 0 | Some(SubstreamInner::NotificationsInClosed), |
963 | 0 | Some(Event::NotificationsInClose { |
964 | 0 | outcome: Err(NotificationsInClosedErr::ProtocolError(error)), |
965 | 0 | }), |
966 | 0 | ); |
967 | | } |
968 | | } |
969 | | } |
970 | | |
971 | 9 | ( |
972 | 9 | Some(SubstreamInner::NotificationsIn { |
973 | 9 | close_desired_timeout, |
974 | 9 | next_notification_size, |
975 | 9 | handshake, |
976 | 9 | max_notification_size, |
977 | 9 | }), |
978 | 9 | notification.map(|n| Event::NotificationIn { notification: n }3 ), _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s1_0Bc_ Line | Count | Source | 978 | 3 | notification.map(|n| Event::NotificationIn { notification: n }), |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s1_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s1_0CsiUjFBJteJ7x_17smoldot_full_node |
979 | 9 | ) |
980 | | } |
981 | | SubstreamInner::NotificationsInClosed => { |
982 | 0 | read_write.discard_all_incoming(); |
983 | 0 | read_write.close_write(); |
984 | 0 | ( |
985 | 0 | if read_write.is_dead() { |
986 | 0 | None |
987 | | } else { |
988 | 0 | Some(SubstreamInner::NotificationsInClosed) |
989 | | }, |
990 | 0 | None, |
991 | | ) |
992 | | } |
993 | | |
994 | 24 | SubstreamInner::PingIn { mut payload_out } => { |
995 | 24 | // Inbound ping substream. |
996 | 24 | // The ping protocol consists in sending 32 bytes of data, which the remote has |
997 | 24 | // to send back. |
998 | 24 | read_write.write_from_vec_deque(&mut payload_out); |
999 | 24 | if payload_out.is_empty() { |
1000 | 24 | if let Ok(Some(ping0 )) = read_write.incoming_bytes_take(32) { |
1001 | 0 | payload_out.extend(ping); |
1002 | 24 | } |
1003 | 0 | } |
1004 | | |
1005 | 24 | (Some(SubstreamInner::PingIn { payload_out }), None) |
1006 | | } |
1007 | | |
1008 | 0 | SubstreamInner::PingOutFailed { mut queued_pings } => { |
1009 | 0 | read_write.close_write(); |
1010 | 0 | if !queued_pings.is_empty() { |
1011 | 0 | queued_pings.remove(0); |
1012 | 0 | read_write.wake_up_asap(); |
1013 | 0 | ( |
1014 | 0 | Some(SubstreamInner::PingOutFailed { queued_pings }), |
1015 | 0 | Some(Event::PingOutError { |
1016 | 0 | num_pings: NonZeroUsize::new(1).unwrap(), |
1017 | 0 | }), |
1018 | 0 | ) |
1019 | | } else { |
1020 | 0 | (Some(SubstreamInner::PingOutFailed { queued_pings }), None) |
1021 | | } |
1022 | | } |
1023 | | SubstreamInner::PingOut { |
1024 | 102 | mut negotiation, |
1025 | 102 | mut queued_pings, |
1026 | 102 | mut outgoing_payload, |
1027 | 102 | mut expected_payload, |
1028 | | } => { |
1029 | 102 | if let Some(extracted_negotiation90 ) = negotiation.take() { |
1030 | 90 | match extracted_negotiation.read_write(read_write) { |
1031 | 78 | Ok(multistream_select::Negotiation::InProgress(nego)) => { |
1032 | 78 | negotiation = Some(nego) |
1033 | | } |
1034 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { |
1035 | | // Never happens when dialing. |
1036 | 0 | unreachable!() |
1037 | | } |
1038 | 12 | Ok(multistream_select::Negotiation::Success) => {} |
1039 | | Ok(multistream_select::Negotiation::NotAvailable) => { |
1040 | 0 | read_write.wake_up_asap(); |
1041 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); |
1042 | | } |
1043 | | Err(_) => { |
1044 | 0 | read_write.wake_up_asap(); |
1045 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); |
1046 | | } |
1047 | | } |
1048 | 12 | } |
1049 | | |
1050 | 102 | if negotiation |
1051 | 102 | .as_ref() |
1052 | 102 | .map_or(true, |n| n.can_write_protocol_data()78 ) _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s2_0Bc_ Line | Count | Source | 1052 | 78 | .map_or(true, |n| n.can_write_protocol_data()) |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s2_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s2_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s2_0CsiUjFBJteJ7x_17smoldot_full_node |
1053 | 48 | { |
1054 | 48 | read_write.write_from_vec_deque(&mut outgoing_payload); |
1055 | 54 | } |
1056 | | |
1057 | | // We check the timeouts before checking the incoming data, as otherwise pings |
1058 | | // might succeed after their timeout. |
1059 | 102 | for timeout6 in queued_pings.iter_mut() { |
1060 | 6 | if timeout.as_ref().map_or(false, |(when_started, timeout)| { |
1061 | 6 | (read_write.now.clone() - when_started.clone()) >= *timeout |
1062 | 6 | }) { _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s3_0Bc_ Line | Count | Source | 1060 | 6 | if timeout.as_ref().map_or(false, |(when_started, timeout)| { | 1061 | 6 | (read_write.now.clone() - when_started.clone()) >= *timeout | 1062 | 6 | }) { |
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s3_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s3_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s3_0CsiUjFBJteJ7x_17smoldot_full_node |
1063 | 0 | *timeout = None; |
1064 | 0 | read_write.wake_up_asap(); |
1065 | 0 | return ( |
1066 | 0 | Some(SubstreamInner::PingOut { |
1067 | 0 | negotiation, |
1068 | 0 | expected_payload, |
1069 | 0 | outgoing_payload, |
1070 | 0 | queued_pings, |
1071 | 0 | }), |
1072 | 0 | Some(Event::PingOutError { |
1073 | 0 | num_pings: NonZeroUsize::new(1).unwrap(), |
1074 | 0 | }), |
1075 | 0 | ); |
1076 | 6 | } |
1077 | | |
1078 | 6 | if let Some((when_started, timeout)) = timeout { |
1079 | 6 | read_write.wake_up_after(&(when_started.clone() + *timeout)); |
1080 | 6 | }0 |
1081 | | } |
1082 | | |
1083 | 102 | if negotiation.is_none() { |
1084 | 24 | if let Ok(Some(pong0 )) = read_write.incoming_bytes_take(32) { |
1085 | 0 | if expected_payload |
1086 | 0 | .pop_front() |
1087 | 0 | .map_or(true, |expected| pong != *expected) Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s4_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s4_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s4_0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s4_0CsiUjFBJteJ7x_17smoldot_full_node |
1088 | | { |
1089 | 0 | read_write.wake_up_asap(); |
1090 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); |
1091 | 0 | } |
1092 | 0 | if let Some((when_started, _)) = queued_pings.remove(0) { |
1093 | 0 | return ( |
1094 | 0 | Some(SubstreamInner::PingOut { |
1095 | 0 | negotiation, |
1096 | 0 | expected_payload, |
1097 | 0 | outgoing_payload, |
1098 | 0 | queued_pings, |
1099 | 0 | }), |
1100 | 0 | Some(Event::PingOutSuccess { |
1101 | 0 | ping_time: read_write.now.clone() - when_started, |
1102 | 0 | }), |
1103 | 0 | ); |
1104 | 0 | } |
1105 | 24 | } |
1106 | 78 | } |
1107 | | |
1108 | 102 | ( |
1109 | 102 | Some(SubstreamInner::PingOut { |
1110 | 102 | negotiation, |
1111 | 102 | expected_payload, |
1112 | 102 | outgoing_payload, |
1113 | 102 | queued_pings, |
1114 | 102 | }), |
1115 | 102 | None, |
1116 | 102 | ) |
1117 | | } |
1118 | | } |
1119 | 329 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2Ba_ Line | Count | Source | 338 | 329 | fn read_write2( | 339 | 329 | self, | 340 | 329 | read_write: &'_ mut read_write::ReadWrite<TNow>, | 341 | 329 | ) -> (Option<SubstreamInner<TNow>>, Option<Event>) { | 342 | 329 | match self.inner { | 343 | 102 | SubstreamInner::InboundNegotiating(nego, was_rejected_already) => { | 344 | 102 | match nego.read_write(read_write) { | 345 | 81 | Ok(multistream_select::Negotiation::InProgress(nego)) => ( | 346 | 81 | Some(SubstreamInner::InboundNegotiating( | 347 | 81 | nego, | 348 | 81 | was_rejected_already, | 349 | 81 | )), | 350 | 81 | None, | 351 | 81 | ), | 352 | 20 | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(accept_deny)) => { | 353 | 20 | // TODO: maybe avoid cloning the protocol name? | 354 | 20 | let protocol = accept_deny.requested_protocol().to_owned(); | 355 | 20 | ( | 356 | 20 | Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)), | 357 | 20 | Some(Event::InboundNegotiated(protocol)), | 358 | 20 | ) | 359 | | } | 360 | | Ok(multistream_select::Negotiation::Success) => { | 361 | | // Unreachable, as we expect a `ListenerAcceptOrDeny`. | 362 | 0 | unreachable!() | 363 | | } | 364 | | Ok(multistream_select::Negotiation::NotAvailable) => { | 365 | | // Unreachable in listener mode. | 366 | 0 | unreachable!() | 367 | | } | 368 | 1 | Err(_) if was_rejected_already => { | 369 | 1 | // If the negotiation was already rejected once, it is likely that the | 370 | 1 | // multistream-select protocol error is due to the fact that the remote | 371 | 1 | // assumes that the multistream-select negotiation always succeeds. As such, | 372 | 1 | // we treat this situation as "negotiation has failed gracefully". | 373 | 1 | (Some(SubstreamInner::InboundFailed), None) | 374 | | } | 375 | 0 | Err(err) => ( | 376 | 0 | None, | 377 | 0 | Some(Event::InboundError { | 378 | 0 | error: InboundError::NegotiationError(err), | 379 | 0 | was_accepted: false, | 380 | 0 | }), | 381 | 0 | ), | 382 | | } | 383 | | } | 384 | 0 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => ( | 385 | 0 | Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)), | 386 | 0 | None, | 387 | 0 | ), | 388 | 18 | SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty) => { | 389 | 18 | match nego.read_write(read_write) { | 390 | 0 | Ok(multistream_select::Negotiation::InProgress(nego)) => ( | 391 | 0 | Some(SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty)), | 392 | 0 | None, | 393 | 0 | ), | 394 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { | 395 | | // Can't be reached again, as we have already accepted the protocol. | 396 | 0 | unreachable!() | 397 | | } | 398 | 18 | Ok(multistream_select::Negotiation::Success) => match inbound_ty { | 399 | 13 | InboundTy::Ping => ( | 400 | 13 | Some(SubstreamInner::PingIn { | 401 | 13 | payload_out: VecDeque::with_capacity(32), | 402 | 13 | }), | 403 | 13 | None, | 404 | 13 | ), | 405 | 3 | InboundTy::Notifications { max_handshake_size } => ( | 406 | 3 | Some(SubstreamInner::NotificationsInHandshake { | 407 | 3 | handshake_size: None, | 408 | 3 | handshake_max_size: max_handshake_size, | 409 | 3 | }), | 410 | 3 | None, | 411 | 3 | ), | 412 | 2 | InboundTy::Request { request_max_size } => { | 413 | 2 | if let Some(request_max_size) = request_max_size { | 414 | 2 | ( | 415 | 2 | Some(SubstreamInner::RequestInRecv { | 416 | 2 | request_max_size, | 417 | 2 | request_size: None, | 418 | 2 | }), | 419 | 2 | None, | 420 | 2 | ) | 421 | | } else { | 422 | 0 | (Some(SubstreamInner::RequestInRecvEmpty), None) | 423 | | } | 424 | | } | 425 | | }, | 426 | | Ok(multistream_select::Negotiation::NotAvailable) => { | 427 | | // Unreachable in listener mode. | 428 | 0 | unreachable!() | 429 | | } | 430 | 0 | Err(err) => ( | 431 | 0 | None, | 432 | 0 | Some(Event::InboundError { | 433 | 0 | error: InboundError::NegotiationError(err), | 434 | 0 | was_accepted: true, | 435 | 0 | }), | 436 | 0 | ), | 437 | | } | 438 | | } | 439 | | SubstreamInner::InboundFailed => { | 440 | | // Substream is an inbound substream that has failed to negotiate a | 441 | | // protocol. The substream is expected to close soon, but the remote might | 442 | | // have been eagerly sending data (assuming that the negotiation would | 443 | | // succeed), which should be silently discarded. | 444 | 1 | read_write.discard_all_incoming(); | 445 | 1 | read_write.close_write(); | 446 | 1 | if read_write.is_dead() { | 447 | 1 | (None, None) | 448 | | } else { | 449 | 0 | (Some(SubstreamInner::InboundFailed), None) | 450 | | } | 451 | | } | 452 | | | 453 | | SubstreamInner::NotificationsOutNegotiationFailed => { | 454 | | // Substream has failed to negotiate a protocol. The substream is expected to | 455 | | // close soon. | 456 | 0 | read_write.discard_all_incoming(); | 457 | 0 | read_write.close_write(); | 458 | 0 | ( | 459 | 0 | if read_write.is_dead() { | 460 | 0 | None | 461 | | } else { | 462 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed) | 463 | | }, | 464 | 0 | None, | 465 | | ) | 466 | | } | 467 | | SubstreamInner::NotificationsOutHandshakeRecv { | 468 | 30 | timeout, | 469 | 30 | mut negotiation, | 470 | 30 | mut handshake_in_size, | 471 | 30 | handshake_in_max_size, | 472 | 30 | mut handshake_out, | 473 | 30 | } => { | 474 | 30 | if timeout < read_write.now { | 475 | 1 | read_write.wake_up_asap(); | 476 | 1 | return ( | 477 | 1 | Some(SubstreamInner::NotificationsOutNegotiationFailed), | 478 | 1 | Some(Event::NotificationsOutResult { | 479 | 1 | result: Err(NotificationsOutErr::Timeout), | 480 | 1 | }), | 481 | 1 | ); | 482 | 29 | } | 483 | | | 484 | 29 | if let Some(extracted_negotiation25 ) = negotiation.take() { | 485 | 25 | match extracted_negotiation.read_write(read_write) { | 486 | 23 | Ok(multistream_select::Negotiation::InProgress(nego)) => { | 487 | 23 | negotiation = Some(nego) | 488 | | } | 489 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { | 490 | | // Never happens when dialing. | 491 | 0 | unreachable!() | 492 | | } | 493 | 2 | Ok(multistream_select::Negotiation::Success) => {} | 494 | | Ok(multistream_select::Negotiation::NotAvailable) => { | 495 | 0 | read_write.wake_up_asap(); | 496 | 0 | return ( | 497 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed), | 498 | 0 | Some(Event::NotificationsOutResult { | 499 | 0 | result: Err(NotificationsOutErr::ProtocolNotAvailable), | 500 | 0 | }), | 501 | 0 | ); | 502 | | } | 503 | 0 | Err(err) => { | 504 | 0 | return ( | 505 | 0 | None, | 506 | 0 | Some(Event::NotificationsOutResult { | 507 | 0 | result: Err(NotificationsOutErr::NegotiationError(err)), | 508 | 0 | }), | 509 | 0 | ) | 510 | | } | 511 | | } | 512 | 4 | } | 513 | | | 514 | 29 | if negotiation | 515 | 29 | .as_ref() | 516 | 29 | .map_or(true, |n| n.can_write_protocol_data()) | 517 | 17 | { | 518 | 17 | read_write.write_from_vec_deque(&mut handshake_out); | 519 | 17 | }12 | 520 | | | 521 | 29 | if negotiation.is_none() { | 522 | 6 | if read_write.expected_incoming_bytes.is_none() { | 523 | 1 | read_write.wake_up_asap(); | 524 | 1 | return ( | 525 | 1 | Some(SubstreamInner::NotificationsOutNegotiationFailed), | 526 | 1 | Some(Event::NotificationsOutResult { | 527 | 1 | result: Err(NotificationsOutErr::RefusedHandshake), | 528 | 1 | }), | 529 | 1 | ); | 530 | 5 | } | 531 | 5 | | 532 | 5 | // Don't actually process incoming data before handshake is sent out, in order | 533 | 5 | // to not accidentally perform a state transition. | 534 | 5 | if !handshake_out.is_empty() { | 535 | 0 | return ( | 536 | 0 | Some(SubstreamInner::NotificationsOutHandshakeRecv { | 537 | 0 | timeout, | 538 | 0 | negotiation, | 539 | 0 | handshake_in_size, | 540 | 0 | handshake_in_max_size, | 541 | 0 | handshake_out, | 542 | 0 | }), | 543 | 0 | None, | 544 | 0 | ); | 545 | 5 | } | 546 | | | 547 | 5 | if let Some(handshake_in_size1 ) = handshake_in_size { | 548 | 1 | match read_write.incoming_bytes_take(handshake_in_size) { | 549 | 1 | Ok(Some(remote_handshake)) => { | 550 | 1 | read_write.wake_up_asap(); | 551 | 1 | return ( | 552 | 1 | Some(SubstreamInner::NotificationsOut { | 553 | 1 | notifications: VecDeque::new(), | 554 | 1 | close_demanded_by_remote: false, | 555 | 1 | }), | 556 | 1 | Some(Event::NotificationsOutResult { | 557 | 1 | result: Ok(remote_handshake), | 558 | 1 | }), | 559 | 1 | ); | 560 | | } | 561 | 0 | Ok(None) => {} | 562 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { | 563 | 0 | read_write.wake_up_asap(); | 564 | 0 | return ( | 565 | 0 | Some(SubstreamInner::NotificationsOutNegotiationFailed), | 566 | 0 | Some(Event::NotificationsOutResult { | 567 | 0 | result: Err(NotificationsOutErr::RefusedHandshake), | 568 | 0 | }), | 569 | 0 | ); | 570 | | } | 571 | | } | 572 | | } else { | 573 | 4 | match read_write.incoming_bytes_take_leb128(handshake_in_max_size) { | 574 | 1 | Ok(Some(s)) => handshake_in_size = Some(s), | 575 | 3 | Ok(None) => {} | 576 | 0 | Err(error) => return ( | 577 | 0 | None, | 578 | 0 | Some(Event::Response { | 579 | 0 | response: Err(match error { | 580 | | read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => { | 581 | 0 | RequestError::ResponseInvalidLeb128 | 582 | | } | 583 | | read_write::IncomingBytesTakeLeb128Error::ReadClosed => { | 584 | 0 | RequestError::SubstreamClosed | 585 | | } | 586 | | read_write::IncomingBytesTakeLeb128Error::TooLarge => { | 587 | 0 | RequestError::ResponseTooLarge | 588 | | } | 589 | | }), | 590 | | }), | 591 | | ), | 592 | | } | 593 | | } | 594 | 23 | } | 595 | | | 596 | 27 | read_write.wake_up_after(&timeout); | 597 | 27 | | 598 | 27 | ( | 599 | 27 | Some(SubstreamInner::NotificationsOutHandshakeRecv { | 600 | 27 | timeout, | 601 | 27 | negotiation, | 602 | 27 | handshake_in_size, | 603 | 27 | handshake_in_max_size, | 604 | 27 | handshake_out, | 605 | 27 | }), | 606 | 27 | None, | 607 | 27 | ) | 608 | | } | 609 | | SubstreamInner::NotificationsOut { | 610 | 3 | mut notifications, | 611 | 3 | close_demanded_by_remote, | 612 | 3 | } => { | 613 | 3 | // Receiving data on an outgoing substream is forbidden by the protocol. | 614 | 3 | read_write.discard_all_incoming(); | 615 | 3 | read_write.write_from_vec_deque(&mut notifications); | 616 | 3 | | 617 | 3 | // If this debug assertion fails, it means that `expected_incoming_bytes` was `None` in | 618 | 3 | // the past then became `Some` again. | 619 | 3 | debug_assert!( | 620 | 3 | !close_demanded_by_remote || read_write.expected_incoming_bytes.is_none()0 | 621 | | ); | 622 | | | 623 | 3 | if !close_demanded_by_remote && read_write.expected_incoming_bytes.is_none() { | 624 | 0 | read_write.wake_up_asap(); | 625 | 0 | return ( | 626 | 0 | Some(SubstreamInner::NotificationsOut { | 627 | 0 | notifications, | 628 | 0 | close_demanded_by_remote: true, | 629 | 0 | }), | 630 | 0 | Some(Event::NotificationsOutCloseDemanded), | 631 | 0 | ); | 632 | 3 | } | 633 | 3 | | 634 | 3 | ( | 635 | 3 | Some(SubstreamInner::NotificationsOut { | 636 | 3 | notifications, | 637 | 3 | close_demanded_by_remote, | 638 | 3 | }), | 639 | 3 | None, | 640 | 3 | ) | 641 | | } | 642 | | SubstreamInner::NotificationsOutClosed => { | 643 | 0 | read_write.discard_all_incoming(); | 644 | 0 | read_write.close_write(); | 645 | 0 | ( | 646 | 0 | if read_write.is_dead() { | 647 | 0 | None | 648 | | } else { | 649 | 0 | Some(SubstreamInner::NotificationsOutClosed) | 650 | | }, | 651 | 0 | None, | 652 | | ) | 653 | | } | 654 | | | 655 | | SubstreamInner::RequestOut { | 656 | 27 | timeout, | 657 | 27 | mut negotiation, | 658 | 27 | mut request, | 659 | 27 | mut response_size, | 660 | 27 | response_max_size, | 661 | 27 | } => { | 662 | 27 | // Note that this might trigger timeouts for requests whose response is available | 663 | 27 | // in `incoming_buffer`. This is intentional, as from the perspective of | 664 | 27 | // `read_write` the response arrived after the timeout. It is the responsibility | 665 | 27 | // of the user to call `read_write` in an appropriate way for this to not happen. | 666 | 27 | if timeout < read_write.now { | 667 | 1 | read_write.close_write(); | 668 | 1 | return ( | 669 | 1 | None, | 670 | 1 | Some(Event::Response { | 671 | 1 | response: Err(RequestError::Timeout), | 672 | 1 | }), | 673 | 1 | ); | 674 | 26 | } | 675 | | | 676 | 26 | if let Some(extracted_nego25 ) = negotiation.take() { | 677 | 25 | match extracted_nego.read_write(read_write) { | 678 | 23 | Ok(multistream_select::Negotiation::InProgress(nego)) => { | 679 | 23 | negotiation = Some(nego) | 680 | | } | 681 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { | 682 | | // Never happens when dialing. | 683 | 0 | unreachable!() | 684 | | } | 685 | 1 | Ok(multistream_select::Negotiation::Success) => {} | 686 | | Ok(multistream_select::Negotiation::NotAvailable) => { | 687 | 1 | return ( | 688 | 1 | None, | 689 | 1 | Some(Event::Response { | 690 | 1 | response: Err(RequestError::ProtocolNotAvailable), | 691 | 1 | }), | 692 | 1 | ) | 693 | | } | 694 | 0 | Err(err) => { | 695 | 0 | return ( | 696 | 0 | None, | 697 | 0 | Some(Event::Response { | 698 | 0 | response: Err(RequestError::NegotiationError(err)), | 699 | 0 | }), | 700 | 0 | ) | 701 | | } | 702 | | } | 703 | 1 | } | 704 | | | 705 | 25 | if negotiation | 706 | 25 | .as_ref() | 707 | 25 | .map_or(true, |n| n.can_write_protocol_data()) | 708 | | { | 709 | 13 | if request.is_empty() { | 710 | 10 | read_write.close_write(); | 711 | 10 | } else { | 712 | 3 | read_write.write_from_vec_deque(&mut request); | 713 | 3 | } | 714 | 12 | } | 715 | | | 716 | 25 | if negotiation.is_none() { | 717 | 2 | if let Some(response_size0 ) = response_size { | 718 | 0 | match read_write.incoming_bytes_take(response_size) { | 719 | 0 | Ok(Some(response)) => { | 720 | 0 | return ( | 721 | 0 | None, | 722 | 0 | Some(Event::Response { | 723 | 0 | response: Ok(response), | 724 | 0 | }), | 725 | 0 | ); | 726 | | } | 727 | 0 | Ok(None) => {} | 728 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { | 729 | 0 | return ( | 730 | 0 | None, | 731 | 0 | Some(Event::Response { | 732 | 0 | response: Err(RequestError::SubstreamClosed), | 733 | 0 | }), | 734 | 0 | ) | 735 | | } | 736 | | } | 737 | | } else { | 738 | 2 | match read_write.incoming_bytes_take_leb128(response_max_size) { | 739 | 0 | Ok(Some(s)) => response_size = Some(s), | 740 | 1 | Ok(None) => {} | 741 | 1 | Err(error) => return ( | 742 | 1 | None, | 743 | 1 | Some(Event::Response { | 744 | 1 | response: Err(match error { | 745 | | read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => { | 746 | 0 | RequestError::ResponseInvalidLeb128 | 747 | | } | 748 | | read_write::IncomingBytesTakeLeb128Error::ReadClosed => { | 749 | 1 | RequestError::SubstreamClosed | 750 | | } | 751 | | read_write::IncomingBytesTakeLeb128Error::TooLarge => { | 752 | 0 | RequestError::ResponseTooLarge | 753 | | } | 754 | | }), | 755 | | }), | 756 | | ), | 757 | | } | 758 | | } | 759 | 23 | } | 760 | | | 761 | 24 | read_write.wake_up_after(&timeout); | 762 | 24 | | 763 | 24 | ( | 764 | 24 | Some(SubstreamInner::RequestOut { | 765 | 24 | timeout, | 766 | 24 | negotiation, | 767 | 24 | request, | 768 | 24 | response_size, | 769 | 24 | response_max_size, | 770 | 24 | }), | 771 | 24 | None, | 772 | 24 | ) | 773 | | } | 774 | | | 775 | | SubstreamInner::RequestInRecv { | 776 | 4 | mut request_size, | 777 | 4 | request_max_size, | 778 | | } => { | 779 | 4 | if let Some(request_size2 ) = request_size { | 780 | 2 | match read_write.incoming_bytes_take(request_size) { | 781 | 2 | Ok(Some(request)) => { | 782 | 2 | return ( | 783 | 2 | Some(SubstreamInner::RequestInApiWait), | 784 | 2 | Some(Event::RequestIn { request }), | 785 | 2 | ); | 786 | | } | 787 | 0 | Ok(None) => {} | 788 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { | 789 | 0 | return ( | 790 | 0 | None, | 791 | 0 | Some(Event::InboundError { | 792 | 0 | error: InboundError::SubstreamClosed, | 793 | 0 | was_accepted: true, | 794 | 0 | }), | 795 | 0 | ) | 796 | | } | 797 | | } | 798 | | } else { | 799 | 2 | match read_write.incoming_bytes_take_leb128(request_max_size) { | 800 | 2 | Ok(Some(s)) => request_size = Some(s), | 801 | 0 | Ok(None) => {} | 802 | 0 | Err(error) => { | 803 | 0 | return ( | 804 | 0 | None, | 805 | 0 | Some(Event::InboundError { | 806 | 0 | error: InboundError::RequestInLebError(error), | 807 | 0 | was_accepted: true, | 808 | 0 | }), | 809 | 0 | ) | 810 | | } | 811 | | } | 812 | | } | 813 | | | 814 | 2 | ( | 815 | 2 | Some(SubstreamInner::RequestInRecv { | 816 | 2 | request_max_size, | 817 | 2 | request_size, | 818 | 2 | }), | 819 | 2 | None, | 820 | 2 | ) | 821 | | } | 822 | 0 | SubstreamInner::RequestInRecvEmpty => ( | 823 | 0 | Some(SubstreamInner::RequestInApiWait), | 824 | 0 | Some(Event::RequestIn { | 825 | 0 | request: Vec::new(), | 826 | 0 | }), | 827 | 0 | ), | 828 | 0 | SubstreamInner::RequestInApiWait => (Some(SubstreamInner::RequestInApiWait), None), | 829 | 1 | SubstreamInner::RequestInRespond { mut response } => { | 830 | 1 | if response.is_empty() { | 831 | 1 | read_write.close_write(); | 832 | 1 | (None, None) | 833 | | } else { | 834 | 0 | read_write.write_from_vec_deque(&mut response); | 835 | 0 | (Some(SubstreamInner::RequestInRespond { response }), None) | 836 | | } | 837 | | } | 838 | | | 839 | | SubstreamInner::NotificationsInHandshake { | 840 | 6 | handshake_max_size, | 841 | 6 | mut handshake_size, | 842 | | } => { | 843 | 6 | if let Some(handshake_size3 ) = handshake_size { | 844 | 3 | match read_write.incoming_bytes_take(handshake_size) { | 845 | 3 | Ok(Some(handshake)) => { | 846 | 3 | return ( | 847 | 3 | Some(SubstreamInner::NotificationsInWait), | 848 | 3 | Some(Event::NotificationsInOpen { handshake }), | 849 | 3 | ); | 850 | | } | 851 | 0 | Ok(None) => {} | 852 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { | 853 | 0 | return ( | 854 | 0 | None, | 855 | 0 | Some(Event::InboundError { | 856 | 0 | error: InboundError::SubstreamClosed, | 857 | 0 | was_accepted: true, | 858 | 0 | }), | 859 | 0 | ) | 860 | | } | 861 | | } | 862 | | } else { | 863 | 3 | match read_write.incoming_bytes_take_leb128(handshake_max_size) { | 864 | 3 | Ok(Some(s)) => handshake_size = Some(s), | 865 | 0 | Ok(None) => {} | 866 | 0 | Err(error) => { | 867 | 0 | return ( | 868 | 0 | None, | 869 | 0 | Some(Event::InboundError { | 870 | 0 | error: InboundError::NotificationsInError { error }, | 871 | 0 | was_accepted: true, | 872 | 0 | }), | 873 | 0 | ) | 874 | | } | 875 | | } | 876 | | } | 877 | | | 878 | 3 | ( | 879 | 3 | Some(SubstreamInner::NotificationsInHandshake { | 880 | 3 | handshake_max_size, | 881 | 3 | handshake_size, | 882 | 3 | }), | 883 | 3 | None, | 884 | 3 | ) | 885 | | } | 886 | | SubstreamInner::NotificationsInWait => { | 887 | | // Incoming data isn't processed, potentially back-pressuring it. | 888 | 0 | if read_write.expected_incoming_bytes.is_some() { | 889 | 0 | (Some(SubstreamInner::NotificationsInWait), None) | 890 | | } else { | 891 | 0 | read_write.wake_up_asap(); | 892 | 0 | ( | 893 | 0 | Some(SubstreamInner::NotificationsInRefused), | 894 | 0 | Some(Event::NotificationsInOpenCancel), | 895 | 0 | ) | 896 | | } | 897 | | } | 898 | | SubstreamInner::NotificationsInRefused => { | 899 | 2 | read_write.discard_all_incoming(); | 900 | 2 | read_write.close_write(); | 901 | 2 | ( | 902 | 2 | if read_write.is_dead() { | 903 | 0 | None | 904 | | } else { | 905 | 2 | Some(SubstreamInner::NotificationsInRefused) | 906 | | }, | 907 | 2 | None, | 908 | | ) | 909 | | } | 910 | | SubstreamInner::NotificationsIn { | 911 | 9 | close_desired_timeout, | 912 | 9 | mut next_notification_size, | 913 | 9 | mut handshake, | 914 | 9 | max_notification_size, | 915 | 9 | } => { | 916 | 9 | read_write.write_from_vec_deque(&mut handshake); | 917 | 9 | | 918 | 9 | if close_desired_timeout | 919 | 9 | .as_ref() | 920 | 9 | .map_or(false, |timeout| *timeout >= read_write.now) | 921 | | { | 922 | 0 | read_write.wake_up_asap(); | 923 | 0 | return ( | 924 | 0 | Some(SubstreamInner::NotificationsInClosed), | 925 | 0 | Some(Event::NotificationsInClose { | 926 | 0 | outcome: Err(NotificationsInClosedErr::CloseDesiredTimeout), | 927 | 0 | }), | 928 | 0 | ); | 929 | 9 | } | 930 | 9 | | 931 | 9 | if close_desired_timeout.is_some() && handshake.is_empty()0 { | 932 | 0 | read_write.close_write(); | 933 | 9 | } | 934 | | | 935 | 9 | let mut notification = None; | 936 | | | 937 | 9 | if let Some(sz3 ) = next_notification_size { | 938 | 3 | match read_write.incoming_bytes_take(sz) { | 939 | 3 | Ok(Some(notif)) => { | 940 | 3 | read_write.wake_up_asap(); | 941 | 3 | notification = Some(notif); | 942 | 3 | next_notification_size = None; | 943 | 3 | } | 944 | 0 | Ok(None) => {} | 945 | | Err(read_write::IncomingBytesTakeError::ReadClosed) => { | 946 | 0 | read_write.wake_up_asap(); | 947 | 0 | return ( | 948 | 0 | Some(SubstreamInner::NotificationsInClosed), | 949 | 0 | Some(Event::NotificationsInClose { | 950 | 0 | outcome: Err(NotificationsInClosedErr::SubstreamClosed), | 951 | 0 | }), | 952 | 0 | ); | 953 | | } | 954 | | } | 955 | | } else { | 956 | 6 | match read_write.incoming_bytes_take_leb128(max_notification_size) { | 957 | 3 | Ok(Some(s)) => next_notification_size = Some(s), | 958 | 3 | Ok(None) => {} | 959 | 0 | Err(error) => { | 960 | 0 | read_write.wake_up_asap(); | 961 | 0 | return ( | 962 | 0 | Some(SubstreamInner::NotificationsInClosed), | 963 | 0 | Some(Event::NotificationsInClose { | 964 | 0 | outcome: Err(NotificationsInClosedErr::ProtocolError(error)), | 965 | 0 | }), | 966 | 0 | ); | 967 | | } | 968 | | } | 969 | | } | 970 | | | 971 | 9 | ( | 972 | 9 | Some(SubstreamInner::NotificationsIn { | 973 | 9 | close_desired_timeout, | 974 | 9 | next_notification_size, | 975 | 9 | handshake, | 976 | 9 | max_notification_size, | 977 | 9 | }), | 978 | 9 | notification.map(|n| Event::NotificationIn { notification: n }), | 979 | 9 | ) | 980 | | } | 981 | | SubstreamInner::NotificationsInClosed => { | 982 | 0 | read_write.discard_all_incoming(); | 983 | 0 | read_write.close_write(); | 984 | 0 | ( | 985 | 0 | if read_write.is_dead() { | 986 | 0 | None | 987 | | } else { | 988 | 0 | Some(SubstreamInner::NotificationsInClosed) | 989 | | }, | 990 | 0 | None, | 991 | | ) | 992 | | } | 993 | | | 994 | 24 | SubstreamInner::PingIn { mut payload_out } => { | 995 | 24 | // Inbound ping substream. | 996 | 24 | // The ping protocol consists in sending 32 bytes of data, which the remote has | 997 | 24 | // to send back. | 998 | 24 | read_write.write_from_vec_deque(&mut payload_out); | 999 | 24 | if payload_out.is_empty() { | 1000 | 24 | if let Ok(Some(ping0 )) = read_write.incoming_bytes_take(32) { | 1001 | 0 | payload_out.extend(ping); | 1002 | 24 | } | 1003 | 0 | } | 1004 | | | 1005 | 24 | (Some(SubstreamInner::PingIn { payload_out }), None) | 1006 | | } | 1007 | | | 1008 | 0 | SubstreamInner::PingOutFailed { mut queued_pings } => { | 1009 | 0 | read_write.close_write(); | 1010 | 0 | if !queued_pings.is_empty() { | 1011 | 0 | queued_pings.remove(0); | 1012 | 0 | read_write.wake_up_asap(); | 1013 | 0 | ( | 1014 | 0 | Some(SubstreamInner::PingOutFailed { queued_pings }), | 1015 | 0 | Some(Event::PingOutError { | 1016 | 0 | num_pings: NonZeroUsize::new(1).unwrap(), | 1017 | 0 | }), | 1018 | 0 | ) | 1019 | | } else { | 1020 | 0 | (Some(SubstreamInner::PingOutFailed { queued_pings }), None) | 1021 | | } | 1022 | | } | 1023 | | SubstreamInner::PingOut { | 1024 | 102 | mut negotiation, | 1025 | 102 | mut queued_pings, | 1026 | 102 | mut outgoing_payload, | 1027 | 102 | mut expected_payload, | 1028 | | } => { | 1029 | 102 | if let Some(extracted_negotiation90 ) = negotiation.take() { | 1030 | 90 | match extracted_negotiation.read_write(read_write) { | 1031 | 78 | Ok(multistream_select::Negotiation::InProgress(nego)) => { | 1032 | 78 | negotiation = Some(nego) | 1033 | | } | 1034 | | Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => { | 1035 | | // Never happens when dialing. | 1036 | 0 | unreachable!() | 1037 | | } | 1038 | 12 | Ok(multistream_select::Negotiation::Success) => {} | 1039 | | Ok(multistream_select::Negotiation::NotAvailable) => { | 1040 | 0 | read_write.wake_up_asap(); | 1041 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); | 1042 | | } | 1043 | | Err(_) => { | 1044 | 0 | read_write.wake_up_asap(); | 1045 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); | 1046 | | } | 1047 | | } | 1048 | 12 | } | 1049 | | | 1050 | 102 | if negotiation | 1051 | 102 | .as_ref() | 1052 | 102 | .map_or(true, |n| n.can_write_protocol_data()) | 1053 | 48 | { | 1054 | 48 | read_write.write_from_vec_deque(&mut outgoing_payload); | 1055 | 54 | } | 1056 | | | 1057 | | // We check the timeouts before checking the incoming data, as otherwise pings | 1058 | | // might succeed after their timeout. | 1059 | 102 | for timeout6 in queued_pings.iter_mut() { | 1060 | 6 | if timeout.as_ref().map_or(false, |(when_started, timeout)| { | 1061 | | (read_write.now.clone() - when_started.clone()) >= *timeout | 1062 | 6 | }) { | 1063 | 0 | *timeout = None; | 1064 | 0 | read_write.wake_up_asap(); | 1065 | 0 | return ( | 1066 | 0 | Some(SubstreamInner::PingOut { | 1067 | 0 | negotiation, | 1068 | 0 | expected_payload, | 1069 | 0 | outgoing_payload, | 1070 | 0 | queued_pings, | 1071 | 0 | }), | 1072 | 0 | Some(Event::PingOutError { | 1073 | 0 | num_pings: NonZeroUsize::new(1).unwrap(), | 1074 | 0 | }), | 1075 | 0 | ); | 1076 | 6 | } | 1077 | | | 1078 | 6 | if let Some((when_started, timeout)) = timeout { | 1079 | 6 | read_write.wake_up_after(&(when_started.clone() + *timeout)); | 1080 | 6 | }0 | 1081 | | } | 1082 | | | 1083 | 102 | if negotiation.is_none() { | 1084 | 24 | if let Ok(Some(pong0 )) = read_write.incoming_bytes_take(32) { | 1085 | 0 | if expected_payload | 1086 | 0 | .pop_front() | 1087 | 0 | .map_or(true, |expected| pong != *expected) | 1088 | | { | 1089 | 0 | read_write.wake_up_asap(); | 1090 | 0 | return (Some(SubstreamInner::PingOutFailed { queued_pings }), None); | 1091 | 0 | } | 1092 | 0 | if let Some((when_started, _)) = queued_pings.remove(0) { | 1093 | 0 | return ( | 1094 | 0 | Some(SubstreamInner::PingOut { | 1095 | 0 | negotiation, | 1096 | 0 | expected_payload, | 1097 | 0 | outgoing_payload, | 1098 | 0 | queued_pings, | 1099 | 0 | }), | 1100 | 0 | Some(Event::PingOutSuccess { | 1101 | 0 | ping_time: read_write.now.clone() - when_started, | 1102 | 0 | }), | 1103 | 0 | ); | 1104 | 0 | } | 1105 | 24 | } | 1106 | 78 | } | 1107 | | | 1108 | 102 | ( | 1109 | 102 | Some(SubstreamInner::PingOut { | 1110 | 102 | negotiation, | 1111 | 102 | expected_payload, | 1112 | 102 | outgoing_payload, | 1113 | 102 | queued_pings, | 1114 | 102 | }), | 1115 | 102 | None, | 1116 | 102 | ) | 1117 | | } | 1118 | | } | 1119 | 329 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE11read_write2Ba_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2CsiUjFBJteJ7x_17smoldot_full_node |
1120 | | |
1121 | 0 | pub fn reset(self) -> Option<Event> { |
1122 | 0 | match self.inner { |
1123 | 0 | SubstreamInner::InboundNegotiating(_, _) => None, |
1124 | 0 | SubstreamInner::InboundNegotiatingAccept(_, _) => None, |
1125 | 0 | SubstreamInner::InboundNegotiatingApiWait(_) => Some(Event::InboundNegotiatedCancel), |
1126 | 0 | SubstreamInner::InboundFailed => None, |
1127 | 0 | SubstreamInner::RequestOut { .. } => Some(Event::Response { |
1128 | 0 | response: Err(RequestError::SubstreamReset), |
1129 | 0 | }), |
1130 | 0 | SubstreamInner::NotificationsInHandshake { .. } => None, |
1131 | 0 | SubstreamInner::NotificationsInWait { .. } => Some(Event::NotificationsInOpenCancel), |
1132 | 0 | SubstreamInner::NotificationsIn { .. } => Some(Event::NotificationsInClose { |
1133 | 0 | outcome: Err(NotificationsInClosedErr::SubstreamReset), |
1134 | 0 | }), |
1135 | 0 | SubstreamInner::NotificationsInRefused => None, |
1136 | 0 | SubstreamInner::NotificationsInClosed => None, |
1137 | | SubstreamInner::NotificationsOutHandshakeRecv { .. } => { |
1138 | 0 | Some(Event::NotificationsOutResult { |
1139 | 0 | result: Err(NotificationsOutErr::SubstreamReset), |
1140 | 0 | }) |
1141 | | } |
1142 | 0 | SubstreamInner::NotificationsOutNegotiationFailed => None, |
1143 | 0 | SubstreamInner::NotificationsOut { .. } => Some(Event::NotificationsOutReset), |
1144 | 0 | SubstreamInner::NotificationsOutClosed { .. } => None, |
1145 | 0 | SubstreamInner::PingIn { .. } => None, |
1146 | 0 | SubstreamInner::RequestInRecv { .. } => None, |
1147 | 0 | SubstreamInner::RequestInRecvEmpty { .. } => None, |
1148 | 0 | SubstreamInner::RequestInApiWait => None, |
1149 | 0 | SubstreamInner::RequestInRespond { .. } => None, |
1150 | 0 | SubstreamInner::PingOut { queued_pings, .. } |
1151 | 0 | | SubstreamInner::PingOutFailed { queued_pings, .. } => { |
1152 | 0 | NonZeroUsize::new(queued_pings.len()) |
1153 | 0 | .map(|num_pings| Event::PingOutError { num_pings }) Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5reset0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5reset0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE5reset0Bc_ Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE5reset0CsiUjFBJteJ7x_17smoldot_full_node |
1154 | | } |
1155 | | } |
1156 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5resetBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5resetCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE5resetBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE5resetCsiUjFBJteJ7x_17smoldot_full_node |
1157 | | |
1158 | | /// Accepts an inbound notifications protocol. Must be called in response to a |
1159 | | /// [`Event::NotificationsInOpen`]. |
1160 | | /// |
1161 | | /// #Â Panic |
1162 | | /// |
1163 | | /// Panics if this substream is not of the correct type. |
1164 | | /// |
1165 | 1 | pub fn accept_in_notifications_substream( |
1166 | 1 | &mut self, |
1167 | 1 | handshake: Vec<u8>, |
1168 | 1 | max_notification_size: usize, |
1169 | 1 | ) { |
1170 | 1 | if let SubstreamInner::NotificationsInWait = &mut self.inner { |
1171 | 1 | self.inner = SubstreamInner::NotificationsIn { |
1172 | 1 | close_desired_timeout: None, |
1173 | 1 | next_notification_size: None, |
1174 | 1 | handshake: { |
1175 | 1 | let handshake_len = handshake.len(); |
1176 | 1 | leb128::encode_usize(handshake_len) |
1177 | 1 | .chain(handshake) |
1178 | 1 | .collect::<VecDeque<_>>() |
1179 | 1 | }, |
1180 | 1 | max_notification_size, |
1181 | 1 | } |
1182 | 0 | } |
1183 | | |
1184 | | // TODO: too defensive, should be } else { panic!() } |
1185 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33accept_in_notifications_substreamBa_ Line | Count | Source | 1165 | 1 | pub fn accept_in_notifications_substream( | 1166 | 1 | &mut self, | 1167 | 1 | handshake: Vec<u8>, | 1168 | 1 | max_notification_size: usize, | 1169 | 1 | ) { | 1170 | 1 | if let SubstreamInner::NotificationsInWait = &mut self.inner { | 1171 | 1 | self.inner = SubstreamInner::NotificationsIn { | 1172 | 1 | close_desired_timeout: None, | 1173 | 1 | next_notification_size: None, | 1174 | 1 | handshake: { | 1175 | 1 | let handshake_len = handshake.len(); | 1176 | 1 | leb128::encode_usize(handshake_len) | 1177 | 1 | .chain(handshake) | 1178 | 1 | .collect::<VecDeque<_>>() | 1179 | 1 | }, | 1180 | 1 | max_notification_size, | 1181 | 1 | } | 1182 | 0 | } | 1183 | | | 1184 | | // TODO: too defensive, should be } else { panic!() } | 1185 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33accept_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33accept_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1186 | | |
1187 | | /// Rejects an inbound notifications protocol. Must be called in response to a |
1188 | | /// [`Event::NotificationsInOpen`]. |
1189 | | /// |
1190 | | /// #Â Panic |
1191 | | /// |
1192 | | /// Panics if this substream is not of the correct type. |
1193 | | /// |
1194 | 1 | pub fn reject_in_notifications_substream(&mut self) { |
1195 | 1 | match &mut self.inner { |
1196 | 1 | SubstreamInner::NotificationsInWait { .. } => { |
1197 | 1 | self.inner = SubstreamInner::NotificationsInRefused; |
1198 | 1 | } |
1199 | 0 | _ => panic!(), |
1200 | | } |
1201 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33reject_in_notifications_substreamBa_ Line | Count | Source | 1194 | 1 | pub fn reject_in_notifications_substream(&mut self) { | 1195 | 1 | match &mut self.inner { | 1196 | 1 | SubstreamInner::NotificationsInWait { .. } => { | 1197 | 1 | self.inner = SubstreamInner::NotificationsInRefused; | 1198 | 1 | } | 1199 | 0 | _ => panic!(), | 1200 | | } | 1201 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33reject_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33reject_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1202 | | |
1203 | | /// Queues a notification to be written out on the given substream. |
1204 | | /// |
1205 | | /// # Panic |
1206 | | /// |
1207 | | /// Panics if the substream isn't a notifications substream, or if the notifications substream |
1208 | | /// isn't in the appropriate state. |
1209 | | /// |
1210 | 3 | pub fn write_notification_unbounded(&mut self, notification: Vec<u8>) { |
1211 | 3 | match &mut self.inner { |
1212 | 3 | SubstreamInner::NotificationsOut { notifications, .. } => { |
1213 | 3 | // TODO: expensive copying? |
1214 | 3 | notifications.extend(leb128::encode_usize(notification.len())); |
1215 | 3 | notifications.extend(notification); |
1216 | 3 | } |
1217 | 0 | _ => panic!(), |
1218 | | } |
1219 | 3 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE28write_notification_unboundedBa_ Line | Count | Source | 1210 | 3 | pub fn write_notification_unbounded(&mut self, notification: Vec<u8>) { | 1211 | 3 | match &mut self.inner { | 1212 | 3 | SubstreamInner::NotificationsOut { notifications, .. } => { | 1213 | 3 | // TODO: expensive copying? | 1214 | 3 | notifications.extend(leb128::encode_usize(notification.len())); | 1215 | 3 | notifications.extend(notification); | 1216 | 3 | } | 1217 | 0 | _ => panic!(), | 1218 | | } | 1219 | 3 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE28write_notification_unboundedBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE28write_notification_unboundedCsiUjFBJteJ7x_17smoldot_full_node |
1220 | | |
1221 | | /// Returns the number of bytes waiting to be sent out on that substream. |
1222 | | /// |
1223 | | /// See the documentation of [`Substream::write_notification_unbounded`] for context. |
1224 | | /// |
1225 | | /// # Panic |
1226 | | /// |
1227 | | /// Panics if the substream isn't a notifications substream, or if the notifications substream |
1228 | | /// isn't in the appropriate state. |
1229 | | /// |
1230 | 0 | pub fn notification_substream_queued_bytes(&self) -> usize { |
1231 | 0 | match &self.inner { |
1232 | 0 | SubstreamInner::NotificationsOut { notifications, .. } => notifications.len(), |
1233 | 0 | _ => panic!(), |
1234 | | } |
1235 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE35notification_substream_queued_bytesBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE35notification_substream_queued_bytesBa_ |
1236 | | |
1237 | | /// Closes a outgoing notifications substream opened after a successful |
1238 | | /// [`Event::NotificationsOutResult`]. |
1239 | | /// |
1240 | | /// This can be done even when in the negotiation phase, in other words before the remote has |
1241 | | /// accepted/refused the substream. |
1242 | | /// |
1243 | | /// # Panic |
1244 | | /// |
1245 | | /// Panics if the substream isn't a notifications substream, or if the notifications substream |
1246 | | /// isn't in the appropriate state. |
1247 | | /// |
1248 | 0 | pub fn close_out_notifications_substream(&mut self) { |
1249 | 0 | match &mut self.inner { |
1250 | | SubstreamInner::NotificationsOutHandshakeRecv { .. } |
1251 | 0 | | SubstreamInner::NotificationsOut { .. } => { |
1252 | 0 | self.inner = SubstreamInner::NotificationsOutClosed; |
1253 | 0 | } |
1254 | 0 | _ => panic!(), |
1255 | | }; |
1256 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33close_out_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33close_out_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33close_out_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1257 | | |
1258 | | /// Closes an ingoing notifications substream that was accepted using |
1259 | | /// [`Substream::accept_in_notifications_substream`]. |
1260 | | /// |
1261 | | /// Notifications can continue to be received. Calling this function only asynchronously |
1262 | | /// signals to the remote that the substream should be closed. The closing is enforced only |
1263 | | /// after the given timeout elapses. |
1264 | | /// |
1265 | | /// # Panic |
1266 | | /// |
1267 | | /// Panics if the substream isn't a notifications substream, or if the notifications substream |
1268 | | /// isn't in the appropriate state. |
1269 | | /// |
1270 | 0 | pub fn close_in_notifications_substream(&mut self, timeout: TNow) { |
1271 | 0 | match &mut self.inner { |
1272 | 0 | SubstreamInner::NotificationsIn { |
1273 | 0 | close_desired_timeout, |
1274 | 0 | .. |
1275 | 0 | } if close_desired_timeout.is_none() => { |
1276 | 0 | *close_desired_timeout = Some(timeout); |
1277 | 0 | } |
1278 | 0 | _ => panic!(), |
1279 | | }; |
1280 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE32close_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE32close_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE32close_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node |
1281 | | |
1282 | | /// Queues a ping on the given substream. Must be passed a randomly-generated payload of 32 |
1283 | | /// bytes, the time after which this ping is considered as failed. |
1284 | | /// |
1285 | | /// # Panic |
1286 | | /// |
1287 | | /// Panics if the substream isn't an outgoing ping substream. |
1288 | | /// |
1289 | 2 | pub fn queue_ping(&mut self, payload: &[u8; 32], now: TNow, timeout: Duration) { |
1290 | 2 | match &mut self.inner { |
1291 | 2 | SubstreamInner::PingOut { queued_pings, .. } |
1292 | 2 | | SubstreamInner::PingOutFailed { queued_pings0 , .. } => { |
1293 | 2 | queued_pings.push(Some((now, timeout))); |
1294 | 2 | } |
1295 | 0 | _ => panic!(), |
1296 | | } |
1297 | | |
1298 | 2 | match &mut self.inner { |
1299 | | SubstreamInner::PingOut { |
1300 | 2 | outgoing_payload, |
1301 | 2 | expected_payload, |
1302 | 2 | .. |
1303 | 2 | } => { |
1304 | 2 | outgoing_payload.extend(payload.iter().copied()); |
1305 | 2 | expected_payload.push_back(payload.to_vec()); |
1306 | 2 | } |
1307 | 0 | SubstreamInner::PingOutFailed { .. } => {} |
1308 | 0 | _ => panic!(), |
1309 | | } |
1310 | 2 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10queue_pingBa_ Line | Count | Source | 1289 | 2 | pub fn queue_ping(&mut self, payload: &[u8; 32], now: TNow, timeout: Duration) { | 1290 | 2 | match &mut self.inner { | 1291 | 2 | SubstreamInner::PingOut { queued_pings, .. } | 1292 | 2 | | SubstreamInner::PingOutFailed { queued_pings0 , .. } => { | 1293 | 2 | queued_pings.push(Some((now, timeout))); | 1294 | 2 | } | 1295 | 0 | _ => panic!(), | 1296 | | } | 1297 | | | 1298 | 2 | match &mut self.inner { | 1299 | | SubstreamInner::PingOut { | 1300 | 2 | outgoing_payload, | 1301 | 2 | expected_payload, | 1302 | 2 | .. | 1303 | 2 | } => { | 1304 | 2 | outgoing_payload.extend(payload.iter().copied()); | 1305 | 2 | expected_payload.push_back(payload.to_vec()); | 1306 | 2 | } | 1307 | 0 | SubstreamInner::PingOutFailed { .. } => {} | 1308 | 0 | _ => panic!(), | 1309 | | } | 1310 | 2 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10queue_pingCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE10queue_pingBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10queue_pingCsiUjFBJteJ7x_17smoldot_full_node |
1311 | | |
1312 | | /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`]. |
1313 | | /// |
1314 | | /// Passing an `Err` corresponds, on the other side, to a [`RequestError::SubstreamClosed`]. |
1315 | 1 | pub fn respond_in_request( |
1316 | 1 | &mut self, |
1317 | 1 | response: Result<Vec<u8>, ()>, |
1318 | 1 | ) -> Result<(), RespondInRequestError> { |
1319 | 1 | match &mut self.inner { |
1320 | | SubstreamInner::RequestInApiWait => { |
1321 | 1 | self.inner = SubstreamInner::RequestInRespond { |
1322 | 1 | response: if let Ok(response0 ) = response { |
1323 | 0 | let response_len = response.len(); |
1324 | 0 | leb128::encode_usize(response_len).chain(response).collect() |
1325 | | } else { |
1326 | | // An error is indicated by closing the substream without even sending |
1327 | | // back the length of the response. |
1328 | 1 | VecDeque::new() |
1329 | | }, |
1330 | | }; |
1331 | | |
1332 | 1 | Ok(()) |
1333 | | } |
1334 | | // TODO: handle substream closed |
1335 | 0 | _ => panic!(), |
1336 | | } |
1337 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestBa_ Line | Count | Source | 1315 | 1 | pub fn respond_in_request( | 1316 | 1 | &mut self, | 1317 | 1 | response: Result<Vec<u8>, ()>, | 1318 | 1 | ) -> Result<(), RespondInRequestError> { | 1319 | 1 | match &mut self.inner { | 1320 | | SubstreamInner::RequestInApiWait => { | 1321 | 1 | self.inner = SubstreamInner::RequestInRespond { | 1322 | 1 | response: if let Ok(response0 ) = response { | 1323 | 0 | let response_len = response.len(); | 1324 | 0 | leb128::encode_usize(response_len).chain(response).collect() | 1325 | | } else { | 1326 | | // An error is indicated by closing the substream without even sending | 1327 | | // back the length of the response. | 1328 | 1 | VecDeque::new() | 1329 | | }, | 1330 | | }; | 1331 | | | 1332 | 1 | Ok(()) | 1333 | | } | 1334 | | // TODO: handle substream closed | 1335 | 0 | _ => panic!(), | 1336 | | } | 1337 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE18respond_in_requestBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node |
1338 | | |
1339 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol |
1340 | | /// name and indicate the type of the protocol. |
1341 | | /// |
1342 | | /// # Panic |
1343 | | /// |
1344 | | /// Panics if the substream is not in the correct state. |
1345 | | /// |
1346 | 19 | pub fn accept_inbound(&mut self, ty: InboundTy) { |
1347 | 19 | match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) { |
1348 | 19 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => { |
1349 | 19 | self.inner = SubstreamInner::InboundNegotiatingAccept(accept_deny.accept(), ty) |
1350 | | } |
1351 | 0 | _ => panic!(), |
1352 | | } |
1353 | 19 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundBa_ Line | Count | Source | 1346 | 19 | pub fn accept_inbound(&mut self, ty: InboundTy) { | 1347 | 19 | match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) { | 1348 | 19 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => { | 1349 | 19 | self.inner = SubstreamInner::InboundNegotiatingAccept(accept_deny.accept(), ty) | 1350 | | } | 1351 | 0 | _ => panic!(), | 1352 | | } | 1353 | 19 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE14accept_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node |
1354 | | |
1355 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the |
1356 | | /// protocol name as not supported. |
1357 | | /// |
1358 | | /// # Panic |
1359 | | /// |
1360 | | /// Panics if the substream is not in the correct state. |
1361 | | /// |
1362 | 1 | pub fn reject_inbound(&mut self) { |
1363 | 1 | match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) { |
1364 | 1 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => { |
1365 | 1 | self.inner = SubstreamInner::InboundNegotiating(accept_deny.reject(), true) |
1366 | | } |
1367 | 0 | _ => panic!(), |
1368 | | } |
1369 | 1 | } _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundBa_ Line | Count | Source | 1362 | 1 | pub fn reject_inbound(&mut self) { | 1363 | 1 | match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) { | 1364 | 1 | SubstreamInner::InboundNegotiatingApiWait(accept_deny) => { | 1365 | 1 | self.inner = SubstreamInner::InboundNegotiating(accept_deny.reject(), true) | 1366 | | } | 1367 | 0 | _ => panic!(), | 1368 | | } | 1369 | 1 | } |
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE14reject_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node |
1370 | | } |
1371 | | |
1372 | | impl<TNow> fmt::Debug for Substream<TNow> { |
1373 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
1374 | 0 | match &self.inner { |
1375 | 0 | SubstreamInner::InboundFailed => f.debug_tuple("incoming-negotiation-failed").finish(), |
1376 | | SubstreamInner::InboundNegotiating(_, _) => { |
1377 | 0 | f.debug_tuple("incoming-negotiating").finish() |
1378 | | } |
1379 | | SubstreamInner::InboundNegotiatingAccept(_, _) => { |
1380 | 0 | f.debug_tuple("incoming-negotiating-after-accept").finish() |
1381 | | } |
1382 | | SubstreamInner::InboundNegotiatingApiWait(..) => { |
1383 | 0 | f.debug_tuple("incoming-negotiated-api-wait").finish() |
1384 | | } |
1385 | | SubstreamInner::NotificationsOutHandshakeRecv { .. } => { |
1386 | 0 | f.debug_tuple("notifications-out-handshake-recv").finish() |
1387 | | } |
1388 | 0 | SubstreamInner::NotificationsOutNegotiationFailed { .. } => f |
1389 | 0 | .debug_tuple("notifications-out-negotiation-failed") |
1390 | 0 | .finish(), |
1391 | 0 | SubstreamInner::NotificationsOut { .. } => f.debug_tuple("notifications-out").finish(), |
1392 | | SubstreamInner::NotificationsOutClosed { .. } => { |
1393 | 0 | f.debug_tuple("notifications-out-closed").finish() |
1394 | | } |
1395 | | SubstreamInner::NotificationsInHandshake { .. } => { |
1396 | 0 | f.debug_tuple("notifications-in-handshake").finish() |
1397 | | } |
1398 | | SubstreamInner::NotificationsInWait { .. } => { |
1399 | 0 | f.debug_tuple("notifications-in-wait").finish() |
1400 | | } |
1401 | 0 | SubstreamInner::NotificationsIn { .. } => f.debug_tuple("notifications-in").finish(), |
1402 | | SubstreamInner::NotificationsInRefused => { |
1403 | 0 | f.debug_tuple("notifications-in-refused").finish() |
1404 | | } |
1405 | | SubstreamInner::NotificationsInClosed => { |
1406 | 0 | f.debug_tuple("notifications-in-closed").finish() |
1407 | | } |
1408 | 0 | SubstreamInner::RequestOut { .. } => f.debug_tuple("request-out").finish(), |
1409 | | SubstreamInner::RequestInRecv { .. } | SubstreamInner::RequestInRecvEmpty { .. } => { |
1410 | 0 | f.debug_tuple("request-in").finish() |
1411 | | } |
1412 | 0 | SubstreamInner::RequestInRespond { .. } => f.debug_tuple("request-in-respond").finish(), |
1413 | 0 | SubstreamInner::RequestInApiWait => f.debug_tuple("request-in").finish(), |
1414 | 0 | SubstreamInner::PingIn { .. } => f.debug_tuple("ping-in").finish(), |
1415 | 0 | SubstreamInner::PingOutFailed { .. } => f.debug_tuple("ping-out-failed").finish(), |
1416 | 0 | SubstreamInner::PingOut { .. } => f.debug_tuple("ping-out").finish(), |
1417 | | } |
1418 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreams_0pEINtB5_9SubstreampENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreams_0pEINtB5_9SubstreampENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ |
1419 | | } |
1420 | | |
1421 | | /// Event that happened on the connection. See [`Substream::read_write`]. |
1422 | | #[must_use] |
1423 | | #[derive(Debug)] |
1424 | | pub enum Event { |
1425 | | /// Error while receiving an inbound substream. |
1426 | | InboundError { |
1427 | | error: InboundError, |
1428 | | was_accepted: bool, |
1429 | | }, |
1430 | | |
1431 | | /// An inbound substream has successfully negotiated a protocol. Call |
1432 | | /// [`Substream::accept_inbound`] or [`Substream::reject_inbound`] in order to resume. |
1433 | | InboundNegotiated(String), |
1434 | | |
1435 | | /// An inbound substream that had successfully negotiated a protocol got abruptly closed |
1436 | | /// while waiting for the call to [`Substream::accept_inbound`] or |
1437 | | /// [`Substream::reject_inbound`]. |
1438 | | InboundNegotiatedCancel, |
1439 | | |
1440 | | /// Received a request in the context of a request-response protocol. |
1441 | | RequestIn { |
1442 | | /// Bytes of the request. Its interpretation is out of scope of this module. |
1443 | | request: Vec<u8>, |
1444 | | }, |
1445 | | |
1446 | | /// Received a response to a previously emitted request on a request-response protocol. |
1447 | | Response { |
1448 | | /// Bytes of the response. Its interpretation is out of scope of this module. |
1449 | | response: Result<Vec<u8>, RequestError>, |
1450 | | }, |
1451 | | |
1452 | | /// Remote has opened an inbound notifications substream. |
1453 | | /// |
1454 | | /// Either [`Substream::accept_in_notifications_substream`] or |
1455 | | /// [`Substream::reject_in_notifications_substream`] must be called in the near future in |
1456 | | /// order to accept or reject this substream. |
1457 | | NotificationsInOpen { |
1458 | | /// Handshake sent by the remote. Its interpretation is out of scope of this module. |
1459 | | handshake: Vec<u8>, |
1460 | | }, |
1461 | | /// Remote has canceled an inbound notifications substream opening. |
1462 | | /// |
1463 | | /// This can only happen after [`Event::NotificationsInOpen`]. |
1464 | | /// [`Substream::accept_in_notifications_substream`] or |
1465 | | /// [`Substream::reject_in_notifications_substream`] should not be called on this substream. |
1466 | | NotificationsInOpenCancel, |
1467 | | /// Remote has sent a notification on an inbound notifications substream. Can only happen |
1468 | | /// after the substream has been accepted. |
1469 | | // TODO: give a way to back-pressure notifications |
1470 | | NotificationIn { |
1471 | | /// Notification sent by the remote. |
1472 | | notification: Vec<u8>, |
1473 | | }, |
1474 | | /// Remote has closed an inbound notifications substream opening. No more notifications will |
1475 | | /// be received. |
1476 | | /// |
1477 | | /// This can only happen after the substream has been accepted. |
1478 | | NotificationsInClose { |
1479 | | /// If `Ok`, the substream has been closed gracefully. If `Err`, a problem happened. |
1480 | | outcome: Result<(), NotificationsInClosedErr>, |
1481 | | }, |
1482 | | |
1483 | | /// Remote has accepted or refused a substream opened with [`Substream::notifications_out`]. |
1484 | | /// |
1485 | | /// If `Ok`, it is now possible to send notifications on this substream. |
1486 | | NotificationsOutResult { |
1487 | | /// If `Ok`, contains the handshake sent back by the remote. Its interpretation is out of |
1488 | | /// scope of this module. |
1489 | | result: Result<Vec<u8>, NotificationsOutErr>, |
1490 | | }, |
1491 | | /// Remote has closed an outgoing notifications substream, meaning that it demands the closing |
1492 | | /// of the substream. |
1493 | | NotificationsOutCloseDemanded, |
1494 | | /// Remote has reset an outgoing notifications substream. The substream is instantly closed. |
1495 | | NotificationsOutReset, |
1496 | | |
1497 | | /// A ping has been successfully answered by the remote. |
1498 | | PingOutSuccess { |
1499 | | /// Time between sending the ping and receiving the pong. |
1500 | | ping_time: Duration, |
1501 | | }, |
1502 | | /// Remote has failed to answer one or more pings. |
1503 | | PingOutError { |
1504 | | /// Number of pings that the remote has failed to answer. |
1505 | | num_pings: NonZeroUsize, |
1506 | | }, |
1507 | | } |
1508 | | |
1509 | | /// Type of inbound protocol. |
1510 | | pub enum InboundTy { |
1511 | | Ping, |
1512 | | Request { |
1513 | | /// Maximum allowed size of the request. |
1514 | | /// If `None`, then no data is expected on the substream, not even the length of the |
1515 | | /// request. |
1516 | | // TODO: use a proper enum |
1517 | | request_max_size: Option<usize>, |
1518 | | }, |
1519 | | Notifications { |
1520 | | max_handshake_size: usize, |
1521 | | }, |
1522 | | } |
1523 | | |
1524 | | /// Error that can happen while processing an inbound substream. |
1525 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXs4_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12InboundErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs4_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12InboundErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
1526 | | pub enum InboundError { |
1527 | | /// Error during protocol negotiation. |
1528 | | #[display(fmt = "Protocol negotiation error: {_0}")] |
1529 | | NegotiationError(multistream_select::Error), |
1530 | | /// Error while receiving an inbound request. |
1531 | | #[display(fmt = "Error receiving inbound request: {_0}")] |
1532 | | RequestInLebError(read_write::IncomingBytesTakeLeb128Error), |
1533 | | /// Substream has been unexpectedly closed. |
1534 | | #[display(fmt = "Substream unexpectedly closed")] |
1535 | | SubstreamClosed, |
1536 | | /// Unexpected end of file while receiving an inbound request. |
1537 | | RequestInExpectedEof, |
1538 | | /// Error while receiving an inbound notifications substream handshake. |
1539 | | #[display(fmt = "Error while receiving an inbound notifications substream handshake: {error}")] |
1540 | | NotificationsInError { |
1541 | | /// Error that happened. |
1542 | | error: read_write::IncomingBytesTakeLeb128Error, |
1543 | | }, |
1544 | | /// Unexpected end of file while receiving an inbound notifications substream handshake. |
1545 | | #[display( |
1546 | | fmt = "Unexpected end of file while receiving an inbound notifications substream handshake" |
1547 | | )] |
1548 | | NotificationsInUnexpectedEof, |
1549 | | } |
1550 | | |
1551 | | /// Error that can happen during a request in a request-response scheme. |
1552 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXs7_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs7_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
1553 | | pub enum RequestError { |
1554 | | /// Remote hasn't answered in time. |
1555 | | Timeout, |
1556 | | /// Remote doesn't support this protocol. |
1557 | | ProtocolNotAvailable, |
1558 | | /// Remote has decided to close the substream. This most likely indicates that the remote |
1559 | | /// is unwilling the respond to the request. |
1560 | | SubstreamClosed, |
1561 | | /// Remote has decided to `RST` the substream. This most likely indicates that the remote has |
1562 | | /// detected a protocol error. |
1563 | | SubstreamReset, |
1564 | | /// Error during protocol negotiation. |
1565 | | #[display(fmt = "Protocol negotiation error: {_0}")] |
1566 | | NegotiationError(multistream_select::Error), |
1567 | | /// Invalid LEB128 number when receiving the response. |
1568 | | ResponseInvalidLeb128, |
1569 | | /// Number of bytes decoded is larger than expected when receiving the response. |
1570 | | ResponseTooLarge, |
1571 | | } |
1572 | | |
1573 | | impl RequestError { |
1574 | | /// Returns `true` if the error is caused by a faulty behavior by the remote. Returns `false` |
1575 | | /// if the error can happen in normal situations. |
1576 | 0 | pub fn is_protocol_error(&self) -> bool { |
1577 | 0 | match self { |
1578 | 0 | RequestError::Timeout => false, // Remote is likely overloaded. |
1579 | 0 | RequestError::ProtocolNotAvailable => true, |
1580 | 0 | RequestError::SubstreamClosed => false, |
1581 | 0 | RequestError::SubstreamReset => true, |
1582 | 0 | RequestError::NegotiationError(_) => true, |
1583 | 0 | RequestError::ResponseInvalidLeb128 => true, |
1584 | 0 | RequestError::ResponseTooLarge => true, |
1585 | | } |
1586 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12RequestError17is_protocol_error Unexecuted instantiation: _RNvMs0_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12RequestError17is_protocol_error |
1587 | | } |
1588 | | |
1589 | | /// Error potentially returned by [`Substream::respond_in_request`]. |
1590 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXs9_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_21RespondInRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs9_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_21RespondInRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
1591 | | pub enum RespondInRequestError { |
1592 | | /// The substream has already been closed. |
1593 | | SubstreamClosed, |
1594 | | } |
1595 | | |
1596 | | /// Error that can happen when trying to open an outbound notifications substream. |
1597 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXsc_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsc_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
1598 | | pub enum NotificationsOutErr { |
1599 | | /// Remote took too long to perform the handshake. |
1600 | | Timeout, |
1601 | | /// Remote has refused the handshake by closing the substream. |
1602 | | RefusedHandshake, |
1603 | | /// Remote has indicated that it doesn't support the requested protocol. |
1604 | | ProtocolNotAvailable, |
1605 | | /// Error during the multistream-select handshake. |
1606 | | #[display(fmt = "Protocol negotiation error: {_0}")] |
1607 | | NegotiationError(multistream_select::Error), |
1608 | | /// Substream has been reset during the negotiation. |
1609 | | SubstreamReset, |
1610 | | /// Error while receiving the remote's handshake. |
1611 | | #[display(fmt = "Error while receiving remote handshake: {_0}")] |
1612 | | HandshakeRecvError(read_write::IncomingBytesTakeLeb128Error), |
1613 | | } |
1614 | | |
1615 | | /// Reason why an inbound notifications substream has been closed. |
1616 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXsf_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsf_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
1617 | | pub enum NotificationsInClosedErr { |
1618 | | /// Error in the protocol. |
1619 | | #[display(fmt = "Error while receiving notification: {_0}")] |
1620 | | ProtocolError(read_write::IncomingBytesTakeLeb128Error), |
1621 | | /// Substream has been closed. |
1622 | | SubstreamClosed, |
1623 | | /// Substream has been reset. |
1624 | | SubstreamReset, |
1625 | | /// Substream has been force-closed because the graceful timeout has been reached. |
1626 | | CloseDesiredTimeout, |
1627 | | } |