/__w/smoldot/smoldot/repo/lib/src/libp2p/collection/multi_stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use super::{ |
19 | | super::{ |
20 | | connection::{established, noise, webrtc_framing}, |
21 | | read_write::ReadWrite, |
22 | | }, |
23 | | ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection, |
24 | | CoordinatorToConnectionInner, NotificationsOutErr, PeerId, ShutdownCause, SubstreamFate, |
25 | | SubstreamId, |
26 | | }; |
27 | | |
28 | | use alloc::{collections::VecDeque, string::ToString as _, sync::Arc}; |
29 | | use core::{ |
30 | | hash::Hash, |
31 | | ops::{Add, Sub}, |
32 | | time::Duration, |
33 | | }; |
34 | | |
35 | | /// State machine dedicated to a single multi-stream connection. |
36 | | pub struct MultiStreamConnectionTask<TNow, TSubId> { |
37 | | connection: MultiStreamConnectionTaskInner<TNow, TSubId>, |
38 | | } |
39 | | enum MultiStreamConnectionTaskInner<TNow, TSubId> { |
40 | | /// Connection is still in its handshake phase. |
41 | | Handshake { |
42 | | /// Substream that has been opened to perform the handshake, if any. |
43 | | opened_substream: Option<(TSubId, webrtc_framing::WebRtcFraming)>, |
44 | | |
45 | | /// Noise handshake in progress. Always `Some`, except to be temporarily extracted. |
46 | | handshake: Option<noise::HandshakeInProgress>, |
47 | | |
48 | | /// Other substreams, besides [`MultiStreamConnectionTaskInner::Handshake::opened_substream`], |
49 | | /// that have been opened. For each substream, contains a boolean indicating whether the |
50 | | /// substream is outbound (`true`) or inbound (`false`). |
51 | | /// |
52 | | /// Due to the asynchronous nature of the protocol, it is not a logic error to open |
53 | | /// additional substreams before the handshake has finished. The remote might think that |
54 | | /// the handshake has finished while the local node hasn't finished processing it yet. |
55 | | /// |
56 | | /// These substreams aren't processed as long as the handshake hasn't finished. It is, |
57 | | /// however, important to remember that substreams have been opened. |
58 | | extra_open_substreams: hashbrown::HashMap<TSubId, bool, fnv::FnvBuildHasher>, |
59 | | |
60 | | /// State machine used once the connection has been established. Unused during the |
61 | | /// handshake, but created ahead of time. Always `Some`, except to be temporarily |
62 | | /// extracted. |
63 | | established: Option<established::MultiStream<TNow, TSubId, Option<SubstreamId>>>, |
64 | | }, |
65 | | |
66 | | /// Connection has been fully established. |
67 | | Established { |
68 | | established: established::MultiStream<TNow, TSubId, Option<SubstreamId>>, |
69 | | |
70 | | /// If `Some`, contains the substream that was used for the handshake. This substream |
71 | | /// is meant to be closed as soon as possible. |
72 | | handshake_substream: Option<TSubId>, |
73 | | |
74 | | /// If `Some`, then no `HandshakeFinished` message has been sent back yet. |
75 | | handshake_finished_message_to_send: Option<PeerId>, |
76 | | |
77 | | /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping |
78 | | /// of the "outer ids" to "inner ids". |
79 | | outbound_substreams_map: |
80 | | hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>, |
81 | | |
82 | | /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a |
83 | | /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an |
84 | | /// entry is added to this list. If the coordinator accepts or refuses a substream in this |
85 | | /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed. |
86 | | // TODO: this works only because SubstreamIds aren't reused |
87 | | notifications_in_close_acknowledgments: |
88 | | hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>, |
89 | | |
90 | | /// Messages about inbound accept cancellations to send back. |
91 | | inbound_accept_cancel_events: VecDeque<established::SubstreamId>, |
92 | | }, |
93 | | |
94 | | /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`] |
95 | | /// message has been sent and is waiting to be acknowledged. |
96 | | ShutdownWaitingAck { |
97 | | /// What has initiated the shutdown. |
98 | | initiator: ShutdownInitiator, |
99 | | |
100 | | /// `None` if the [`ConnectionToCoordinatorInner::StartShutdown`] message has already |
101 | | /// been sent to the coordinator. `Some` if the message hasn't been sent yet. |
102 | | start_shutdown_message_to_send: Option<Option<ShutdownCause>>, |
103 | | |
104 | | /// `true` if the [`ConnectionToCoordinatorInner::ShutdownFinished`] message has already |
105 | | /// been sent to the coordinator. |
106 | | shutdown_finish_message_sent: bool, |
107 | | }, |
108 | | |
109 | | /// Connection has finished its shutdown and its shutdown has been acknowledged. There is |
110 | | /// nothing more to do except stop the connection task. |
111 | | ShutdownAcked { |
112 | | /// What has initiated the shutdown. |
113 | | initiator: ShutdownInitiator, |
114 | | }, |
115 | | } |
116 | | |
117 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
118 | | enum ShutdownInitiator { |
119 | | /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message. |
120 | | Coordinator, |
121 | | /// [`MultiStreamConnectionTask::reset`] has been called. |
122 | | Api, |
123 | | } |
124 | | |
125 | | impl<TNow, TSubId> MultiStreamConnectionTask<TNow, TSubId> |
126 | | where |
127 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
128 | | TSubId: Clone + PartialEq + Eq + Hash, |
129 | | { |
130 | | // Note that the parameters of this function are a bit rough and undocumented, as this is |
131 | | // a function only called from the parent module. |
132 | 0 | pub(super) fn new( |
133 | 0 | randomness_seed: [u8; 32], |
134 | 0 | when_connection_start: TNow, |
135 | 0 | handshake: noise::HandshakeInProgress, |
136 | 0 | max_inbound_substreams: usize, |
137 | 0 | substreams_capacity: usize, |
138 | 0 | max_protocol_name_len: usize, |
139 | 0 | ping_protocol: Arc<str>, |
140 | 0 | ) -> Self { |
141 | 0 | MultiStreamConnectionTask { |
142 | 0 | connection: MultiStreamConnectionTaskInner::Handshake { |
143 | 0 | // TODO: the handshake doesn't have a timeout |
144 | 0 | handshake: Some(handshake), |
145 | 0 | opened_substream: None, |
146 | 0 | extra_open_substreams: hashbrown::HashMap::with_capacity_and_hasher( |
147 | 0 | 0, |
148 | 0 | Default::default(), |
149 | 0 | ), |
150 | 0 | established: Some(established::MultiStream::webrtc(established::Config { |
151 | 0 | max_inbound_substreams, |
152 | 0 | substreams_capacity, |
153 | 0 | max_protocol_name_len, |
154 | 0 | randomness_seed, |
155 | 0 | ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/ |
156 | 0 | ping_interval: Duration::from_secs(20), // TODO: hardcoded |
157 | 0 | ping_timeout: Duration::from_secs(10), // TODO: hardcoded |
158 | 0 | first_out_ping: when_connection_start, // TODO: only start the ping after the Noise handshake has ended |
159 | 0 | })), |
160 | 0 | }, |
161 | 0 | } |
162 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE3newB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE3newB8_ |
163 | | |
164 | | /// Pulls a message to send back to the coordinator. |
165 | | /// |
166 | | /// This function takes ownership of `self` and optionally yields it back. If the first |
167 | | /// option contains `None`, then no more message will be generated and the |
168 | | /// [`MultiStreamConnectionTask`] has vanished. This will happen after the connection has been |
169 | | /// shut down or reset. |
170 | | /// It is possible for `self` to not be yielded back even if substreams are still open, in |
171 | | /// which case the API user should abruptly reset the connection, for example by sending a |
172 | | /// TCP RST flag. |
173 | | /// |
174 | | /// If any message is returned, it is the responsibility of the API user to send it to the |
175 | | /// coordinator. |
176 | | /// Do not attempt to buffer the message being returned, as it would work against the |
177 | | /// back-pressure strategy used internally. As soon as a message is returned, it should be |
178 | | /// delivered. If the coordinator is busy at the moment a message should be delivered, then |
179 | | /// the entire thread of execution dedicated to this [`MultiStreamConnectionTask`] should be |
180 | | /// paused until the coordinator is ready and the message delivered. |
181 | | /// |
182 | | /// Messages aren't generated spontaneously. In other words, you don't need to periodically |
183 | | /// call this function just in case there's a new message. Messages are always generated after |
184 | | /// [`MultiStreamConnectionTask::substream_read_write`], |
185 | | /// [`MultiStreamConnectionTask::add_substream`], or [`MultiStreamConnectionTask::reset`] |
186 | | /// has been called. Multiple messages can happen in a row. |
187 | | /// |
188 | | /// Because this function frees space in a buffer, processing substreams again after it |
189 | | /// has returned might read/write more data and generate an event again. In other words, |
190 | | /// the API user should call [`MultiStreamConnectionTask::substream_read_write`] and |
191 | | /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no |
192 | | /// more message is generated. |
193 | 0 | pub fn pull_message_to_coordinator( |
194 | 0 | mut self, |
195 | 0 | ) -> (Option<Self>, Option<ConnectionToCoordinator>) { |
196 | 0 | match &mut self.connection { |
197 | 0 | MultiStreamConnectionTaskInner::Handshake { .. } => (Some(self), None), |
198 | | MultiStreamConnectionTaskInner::Established { |
199 | 0 | established, |
200 | 0 | outbound_substreams_map, |
201 | 0 | handshake_finished_message_to_send, |
202 | 0 | notifications_in_close_acknowledgments, |
203 | 0 | inbound_accept_cancel_events, |
204 | | .. |
205 | | } => { |
206 | 0 | if let Some(remote_peer_id) = handshake_finished_message_to_send.take() { |
207 | 0 | return ( |
208 | 0 | Some(self), |
209 | 0 | Some(ConnectionToCoordinator { |
210 | 0 | inner: ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id), |
211 | 0 | }), |
212 | 0 | ); |
213 | 0 | } |
214 | | |
215 | 0 | if let Some(substream_id) = inbound_accept_cancel_events.pop_front() { |
216 | 0 | return ( |
217 | 0 | Some(self), |
218 | 0 | Some(ConnectionToCoordinator { |
219 | 0 | inner: ConnectionToCoordinatorInner::InboundAcceptedCancel { |
220 | 0 | id: substream_id, |
221 | 0 | }, |
222 | 0 | }), |
223 | 0 | ); |
224 | 0 | } |
225 | | |
226 | 0 | let event = match established.pull_event() { |
227 | | Some(established::Event::NewOutboundSubstreamsForbidden) => { |
228 | | // TODO: handle properly |
229 | 0 | self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
230 | 0 | start_shutdown_message_to_send: Some(None), |
231 | 0 | shutdown_finish_message_sent: false, |
232 | 0 | initiator: ShutdownInitiator::Coordinator, |
233 | 0 | }; |
234 | 0 | Some(ConnectionToCoordinatorInner::StartShutdown(None)) |
235 | | } |
236 | 0 | Some(established::Event::InboundError(err)) => { |
237 | 0 | Some(ConnectionToCoordinatorInner::InboundError(err)) |
238 | | } |
239 | 0 | Some(established::Event::InboundNegotiated { id, protocol_name }) => { |
240 | 0 | Some(ConnectionToCoordinatorInner::InboundNegotiated { id, protocol_name }) |
241 | | } |
242 | 0 | Some(established::Event::InboundNegotiatedCancel { id, .. }) => { |
243 | 0 | notifications_in_close_acknowledgments.insert(id); |
244 | 0 | None |
245 | | } |
246 | 0 | Some(established::Event::InboundAcceptedCancel { id, .. }) => { |
247 | 0 | Some(ConnectionToCoordinatorInner::InboundAcceptedCancel { id }) |
248 | | } |
249 | 0 | Some(established::Event::RequestIn { id, request, .. }) => { |
250 | 0 | Some(ConnectionToCoordinatorInner::RequestIn { id, request }) |
251 | | } |
252 | | Some(established::Event::Response { |
253 | 0 | response, |
254 | 0 | user_data, |
255 | | .. |
256 | | }) => { |
257 | 0 | let Some(outer_substream_id) = user_data else { |
258 | 0 | panic!() |
259 | | }; |
260 | 0 | outbound_substreams_map.remove(&outer_substream_id).unwrap(); |
261 | 0 | Some(ConnectionToCoordinatorInner::Response { |
262 | 0 | response, |
263 | 0 | id: outer_substream_id, |
264 | 0 | }) |
265 | | } |
266 | 0 | Some(established::Event::NotificationsInOpen { id, handshake, .. }) => { |
267 | 0 | Some(ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake }) |
268 | | } |
269 | 0 | Some(established::Event::NotificationsInOpenCancel { id, .. }) => { |
270 | 0 | notifications_in_close_acknowledgments.insert(id); |
271 | 0 | Some(ConnectionToCoordinatorInner::NotificationsInOpenCancel { id }) |
272 | | } |
273 | 0 | Some(established::Event::NotificationIn { id, notification }) => { |
274 | 0 | Some(ConnectionToCoordinatorInner::NotificationIn { id, notification }) |
275 | | } |
276 | 0 | Some(established::Event::NotificationsInClose { id, outcome, .. }) => { |
277 | 0 | notifications_in_close_acknowledgments.insert(id); |
278 | 0 | Some(ConnectionToCoordinatorInner::NotificationsInClose { id, outcome }) |
279 | | } |
280 | 0 | Some(established::Event::NotificationsOutResult { id, result }) => { |
281 | 0 | let (outer_substream_id, result) = match result { |
282 | 0 | Ok(r) => { |
283 | 0 | let Some(outer_substream_id) = established[id] else { |
284 | 0 | panic!() |
285 | | }; |
286 | 0 | (outer_substream_id, Ok(r)) |
287 | | } |
288 | 0 | Err((err, ud)) => { |
289 | 0 | let Some(outer_substream_id) = ud else { |
290 | 0 | panic!() |
291 | | }; |
292 | 0 | outbound_substreams_map.remove(&outer_substream_id); |
293 | 0 | (outer_substream_id, Err(NotificationsOutErr::Substream(err))) |
294 | | } |
295 | | }; |
296 | | |
297 | 0 | Some(ConnectionToCoordinatorInner::NotificationsOutResult { |
298 | 0 | id: outer_substream_id, |
299 | 0 | result, |
300 | 0 | }) |
301 | | } |
302 | 0 | Some(established::Event::NotificationsOutCloseDemanded { id }) => { |
303 | 0 | let Some(outer_substream_id) = established[id] else { |
304 | 0 | panic!() |
305 | | }; |
306 | 0 | Some( |
307 | 0 | ConnectionToCoordinatorInner::NotificationsOutCloseDemanded { |
308 | 0 | id: outer_substream_id, |
309 | 0 | }, |
310 | 0 | ) |
311 | | } |
312 | 0 | Some(established::Event::NotificationsOutReset { user_data, .. }) => { |
313 | 0 | let Some(outer_substream_id) = user_data else { |
314 | 0 | panic!() |
315 | | }; |
316 | 0 | outbound_substreams_map.remove(&outer_substream_id); |
317 | 0 | Some(ConnectionToCoordinatorInner::NotificationsOutReset { |
318 | 0 | id: outer_substream_id, |
319 | 0 | }) |
320 | | } |
321 | 0 | Some(established::Event::PingOutSuccess { ping_time }) => { |
322 | 0 | Some(ConnectionToCoordinatorInner::PingOutSuccess { ping_time }) |
323 | | } |
324 | | Some(established::Event::PingOutFailed) => { |
325 | 0 | Some(ConnectionToCoordinatorInner::PingOutFailed) |
326 | | } |
327 | 0 | None => None, |
328 | | }; |
329 | | |
330 | 0 | ( |
331 | 0 | Some(self), |
332 | 0 | event.map(|ev| ConnectionToCoordinator { inner: ev }), Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE27pull_message_to_coordinator0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27pull_message_to_coordinator0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE27pull_message_to_coordinator0Ba_ |
333 | 0 | ) |
334 | | } |
335 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
336 | 0 | start_shutdown_message_to_send, |
337 | 0 | shutdown_finish_message_sent, |
338 | | .. |
339 | | } => { |
340 | 0 | if let Some(reason) = start_shutdown_message_to_send.take() { |
341 | 0 | debug_assert!(!*shutdown_finish_message_sent); |
342 | 0 | ( |
343 | 0 | Some(self), |
344 | 0 | Some(ConnectionToCoordinator { |
345 | 0 | inner: ConnectionToCoordinatorInner::StartShutdown(reason), |
346 | 0 | }), |
347 | 0 | ) |
348 | 0 | } else if !*shutdown_finish_message_sent { |
349 | 0 | debug_assert!(start_shutdown_message_to_send.is_none()); |
350 | 0 | *shutdown_finish_message_sent = true; |
351 | 0 | ( |
352 | 0 | Some(self), |
353 | 0 | Some(ConnectionToCoordinator { |
354 | 0 | inner: ConnectionToCoordinatorInner::ShutdownFinished, |
355 | 0 | }), |
356 | 0 | ) |
357 | | } else { |
358 | 0 | (Some(self), None) |
359 | | } |
360 | | } |
361 | 0 | MultiStreamConnectionTaskInner::ShutdownAcked { .. } => (None, None), |
362 | | } |
363 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27pull_message_to_coordinatorB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27pull_message_to_coordinatorCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27pull_message_to_coordinatorB8_ |
364 | | |
365 | | /// Injects a message that has been pulled from the coordinator. |
366 | | /// |
367 | | /// Calling this function might generate data to send to the connection. You should call |
368 | | /// [`MultiStreamConnectionTask::desired_outbound_substreams`] and |
369 | | /// [`MultiStreamConnectionTask::substream_read_write`] after this function has returned. |
370 | 0 | pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) { |
371 | 0 | match (message.inner, &mut self.connection) { |
372 | | ( |
373 | | CoordinatorToConnectionInner::AcceptInbound { |
374 | 0 | substream_id, |
375 | 0 | inbound_ty, |
376 | 0 | }, |
377 | 0 | MultiStreamConnectionTaskInner::Established { |
378 | 0 | established, |
379 | 0 | notifications_in_close_acknowledgments, |
380 | 0 | inbound_accept_cancel_events, |
381 | 0 | .. |
382 | 0 | }, |
383 | 0 | ) => { |
384 | 0 | if !notifications_in_close_acknowledgments.remove(&substream_id) { |
385 | 0 | established.accept_inbound(substream_id, inbound_ty, None); |
386 | 0 | } else { |
387 | 0 | inbound_accept_cancel_events.push_back(substream_id) |
388 | | } |
389 | | } |
390 | | ( |
391 | 0 | CoordinatorToConnectionInner::RejectInbound { substream_id }, |
392 | 0 | MultiStreamConnectionTaskInner::Established { |
393 | 0 | established, |
394 | 0 | notifications_in_close_acknowledgments, |
395 | 0 | .. |
396 | 0 | }, |
397 | 0 | ) => { |
398 | 0 | if !notifications_in_close_acknowledgments.remove(&substream_id) { |
399 | 0 | established.reject_inbound(substream_id); |
400 | 0 | } |
401 | | } |
402 | | ( |
403 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, |
404 | 0 | MultiStreamConnectionTaskInner::Handshake { |
405 | 0 | established: Some(established), |
406 | | .. |
407 | | } |
408 | 0 | | MultiStreamConnectionTaskInner::Established { established, .. }, |
409 | 0 | ) => { |
410 | 0 | established.set_max_protocol_name_len(new_max_length); |
411 | 0 | } |
412 | | ( |
413 | | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. }, |
414 | | MultiStreamConnectionTaskInner::Handshake { |
415 | | established: None, .. |
416 | | }, |
417 | | ) => { |
418 | 0 | unreachable!() |
419 | | } |
420 | | ( |
421 | | CoordinatorToConnectionInner::StartRequest { |
422 | 0 | protocol_name, |
423 | 0 | request_data, |
424 | 0 | timeout, |
425 | 0 | max_response_size, |
426 | 0 | substream_id, |
427 | 0 | }, |
428 | 0 | MultiStreamConnectionTaskInner::Established { |
429 | 0 | established, |
430 | 0 | outbound_substreams_map, |
431 | 0 | .. |
432 | 0 | }, |
433 | 0 | ) => { |
434 | 0 | let inner_substream_id = established.add_request( |
435 | 0 | protocol_name, |
436 | 0 | request_data, |
437 | 0 | now.clone() + timeout, |
438 | 0 | max_response_size, |
439 | 0 | Some(substream_id), |
440 | 0 | ); |
441 | 0 | let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id); |
442 | 0 | debug_assert!(_prev_value.is_none()); |
443 | | } |
444 | | ( |
445 | | CoordinatorToConnectionInner::OpenOutNotifications { |
446 | 0 | max_handshake_size, |
447 | 0 | protocol_name, |
448 | 0 | handshake, |
449 | 0 | handshake_timeout, |
450 | 0 | substream_id: outer_substream_id, |
451 | 0 | }, |
452 | 0 | MultiStreamConnectionTaskInner::Established { |
453 | 0 | established, |
454 | 0 | outbound_substreams_map, |
455 | 0 | .. |
456 | 0 | }, |
457 | 0 | ) => { |
458 | 0 | let inner_substream_id = established.open_notifications_substream( |
459 | 0 | protocol_name, |
460 | 0 | max_handshake_size, |
461 | 0 | handshake, |
462 | 0 | now.clone() + handshake_timeout, |
463 | 0 | Some(outer_substream_id), |
464 | 0 | ); |
465 | 0 |
|
466 | 0 | let _prev_value = |
467 | 0 | outbound_substreams_map.insert(outer_substream_id, inner_substream_id); |
468 | 0 | debug_assert!(_prev_value.is_none()); |
469 | | } |
470 | | ( |
471 | 0 | CoordinatorToConnectionInner::CloseOutNotifications { substream_id }, |
472 | 0 | MultiStreamConnectionTaskInner::Established { |
473 | 0 | established, |
474 | 0 | outbound_substreams_map, |
475 | | .. |
476 | | }, |
477 | | ) => { |
478 | | // It is possible that the remote has closed the outbound notification substream |
479 | | // while the `CloseOutNotifications` message was being delivered, or that the API |
480 | | // user close the substream before the message about the substream being closed |
481 | | // was delivered to the coordinator. |
482 | 0 | if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) { |
483 | 0 | established.close_out_notifications_substream(inner_substream_id); |
484 | 0 | } |
485 | | } |
486 | | ( |
487 | | CoordinatorToConnectionInner::QueueNotification { |
488 | 0 | substream_id, |
489 | 0 | notification, |
490 | 0 | }, |
491 | 0 | MultiStreamConnectionTaskInner::Established { |
492 | 0 | established, |
493 | 0 | outbound_substreams_map, |
494 | | .. |
495 | | }, |
496 | | ) => { |
497 | | // It is possible that the remote has closed the outbound notification substream |
498 | | // while a `QueueNotification` message was being delivered, or that the API user |
499 | | // queued a notification before the message about the substream being closed was |
500 | | // delivered to the coordinator. |
501 | | // If that happens, we intentionally silently discard the message, causing the |
502 | | // notification to not be sent. This is consistent with the guarantees about |
503 | | // notifications delivered that are documented in the public API. |
504 | 0 | if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) { |
505 | 0 | established.write_notification_unbounded(*inner_substream_id, notification); |
506 | 0 | } |
507 | | } |
508 | | ( |
509 | | CoordinatorToConnectionInner::AnswerRequest { |
510 | 0 | substream_id, |
511 | 0 | response, |
512 | 0 | }, |
513 | 0 | MultiStreamConnectionTaskInner::Established { established, .. }, |
514 | 0 | ) => match established.respond_in_request(substream_id, response) { |
515 | 0 | Ok(()) => {} |
516 | 0 | Err(established::RespondInRequestError::SubstreamClosed) => { |
517 | 0 | // As documented, answering an obsolete request is simply ignored. |
518 | 0 | } |
519 | | }, |
520 | | ( |
521 | | CoordinatorToConnectionInner::AcceptInNotifications { |
522 | 0 | substream_id, |
523 | 0 | handshake, |
524 | 0 | max_notification_size, |
525 | 0 | }, |
526 | 0 | MultiStreamConnectionTaskInner::Established { |
527 | 0 | established, |
528 | 0 | notifications_in_close_acknowledgments, |
529 | 0 | .. |
530 | 0 | }, |
531 | 0 | ) => { |
532 | 0 | if !notifications_in_close_acknowledgments.remove(&substream_id) { |
533 | 0 | established.accept_in_notifications_substream( |
534 | 0 | substream_id, |
535 | 0 | handshake, |
536 | 0 | max_notification_size, |
537 | 0 | ); |
538 | 0 | } |
539 | | } |
540 | | ( |
541 | 0 | CoordinatorToConnectionInner::RejectInNotifications { substream_id }, |
542 | 0 | MultiStreamConnectionTaskInner::Established { |
543 | 0 | established, |
544 | 0 | notifications_in_close_acknowledgments, |
545 | 0 | .. |
546 | 0 | }, |
547 | 0 | ) => { |
548 | 0 | if !notifications_in_close_acknowledgments.remove(&substream_id) { |
549 | 0 | established.reject_in_notifications_substream(substream_id); |
550 | 0 | } |
551 | | } |
552 | | ( |
553 | | CoordinatorToConnectionInner::CloseInNotifications { |
554 | 0 | substream_id, |
555 | 0 | timeout, |
556 | 0 | }, |
557 | 0 | MultiStreamConnectionTaskInner::Established { |
558 | 0 | established, |
559 | 0 | notifications_in_close_acknowledgments, |
560 | 0 | .. |
561 | 0 | }, |
562 | 0 | ) => { |
563 | 0 | if !notifications_in_close_acknowledgments.remove(&substream_id) { |
564 | 0 | established |
565 | 0 | .close_in_notifications_substream(substream_id, now.clone() + timeout); |
566 | 0 | } |
567 | | } |
568 | | ( |
569 | | CoordinatorToConnectionInner::StartShutdown { .. }, |
570 | | MultiStreamConnectionTaskInner::Handshake { .. } |
571 | | | MultiStreamConnectionTaskInner::Established { .. }, |
572 | 0 | ) => { |
573 | 0 | // TODO: implement proper shutdown |
574 | 0 | self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
575 | 0 | start_shutdown_message_to_send: Some(None), |
576 | 0 | shutdown_finish_message_sent: false, |
577 | 0 | initiator: ShutdownInitiator::Coordinator, |
578 | 0 | }; |
579 | 0 | } |
580 | | ( |
581 | 0 | CoordinatorToConnectionInner::AcceptInbound { .. } |
582 | 0 | | CoordinatorToConnectionInner::RejectInbound { .. } |
583 | 0 | | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. } |
584 | 0 | | CoordinatorToConnectionInner::AcceptInNotifications { .. } |
585 | 0 | | CoordinatorToConnectionInner::RejectInNotifications { .. } |
586 | 0 | | CoordinatorToConnectionInner::CloseInNotifications { .. } |
587 | 0 | | CoordinatorToConnectionInner::StartRequest { .. } |
588 | 0 | | CoordinatorToConnectionInner::AnswerRequest { .. } |
589 | 0 | | CoordinatorToConnectionInner::OpenOutNotifications { .. } |
590 | 0 | | CoordinatorToConnectionInner::CloseOutNotifications { .. } |
591 | 0 | | CoordinatorToConnectionInner::QueueNotification { .. }, |
592 | 0 | MultiStreamConnectionTaskInner::Handshake { .. } |
593 | 0 | | MultiStreamConnectionTaskInner::ShutdownAcked { .. }, |
594 | 0 | ) => unreachable!(), |
595 | | ( |
596 | | CoordinatorToConnectionInner::AcceptInbound { .. } |
597 | | | CoordinatorToConnectionInner::RejectInbound { .. } |
598 | | | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. } |
599 | | | CoordinatorToConnectionInner::AcceptInNotifications { .. } |
600 | | | CoordinatorToConnectionInner::RejectInNotifications { .. } |
601 | | | CoordinatorToConnectionInner::CloseInNotifications { .. } |
602 | | | CoordinatorToConnectionInner::StartRequest { .. } |
603 | | | CoordinatorToConnectionInner::AnswerRequest { .. } |
604 | | | CoordinatorToConnectionInner::OpenOutNotifications { .. } |
605 | | | CoordinatorToConnectionInner::CloseOutNotifications { .. } |
606 | | | CoordinatorToConnectionInner::QueueNotification { .. }, |
607 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. }, |
608 | | ) |
609 | | | ( |
610 | | CoordinatorToConnectionInner::StartShutdown, |
611 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
612 | | initiator: ShutdownInitiator::Api, |
613 | | .. |
614 | | }, |
615 | 0 | ) => { |
616 | 0 | // There might still be some messages coming from the coordinator after the |
617 | 0 | // connection task has sent a message indicating that it has shut down. This is |
618 | 0 | // due to the concurrent nature of the API and doesn't indicate a bug. These |
619 | 0 | // messages are simply ignored by the connection task. |
620 | 0 | } |
621 | | ( |
622 | | CoordinatorToConnectionInner::ShutdownFinishedAck, |
623 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
624 | 0 | start_shutdown_message_to_send: start_shutdown_message_sent, |
625 | 0 | shutdown_finish_message_sent, |
626 | 0 | initiator, |
627 | 0 | }, |
628 | 0 | ) => { |
629 | 0 | debug_assert!( |
630 | 0 | start_shutdown_message_sent.is_none() && *shutdown_finish_message_sent |
631 | | ); |
632 | 0 | self.connection = MultiStreamConnectionTaskInner::ShutdownAcked { |
633 | 0 | initiator: *initiator, |
634 | 0 | }; |
635 | | } |
636 | | ( |
637 | | CoordinatorToConnectionInner::StartShutdown, |
638 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
639 | | initiator: ShutdownInitiator::Coordinator, |
640 | | .. |
641 | | } |
642 | | | MultiStreamConnectionTaskInner::ShutdownAcked { .. }, |
643 | 0 | ) => unreachable!(), |
644 | 0 | (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(), |
645 | | } |
646 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE26inject_coordinator_messageB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE26inject_coordinator_messageCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE26inject_coordinator_messageB8_ |
647 | | |
648 | | /// Returns the number of new outbound substreams that the state machine would like to see |
649 | | /// opened. |
650 | | /// |
651 | | /// This value doesn't change automatically over time but only after a call to |
652 | | /// [`MultiStreamConnectionTask::substream_read_write`], |
653 | | /// [`MultiStreamConnectionTask::inject_coordinator_message`], |
654 | | /// [`MultiStreamConnectionTask::add_substream`], or |
655 | | /// [`MultiStreamConnectionTask::reset_substream`]. |
656 | | /// |
657 | | /// Note that the user is expected to track the number of substreams that are currently being |
658 | | /// opened. For example, if this function returns 2 and there are already 2 substreams |
659 | | /// currently being opened, then there is no need to open any additional one. |
660 | 0 | pub fn desired_outbound_substreams(&self) -> u32 { |
661 | 0 | match &self.connection { |
662 | | MultiStreamConnectionTaskInner::Handshake { |
663 | 0 | opened_substream, .. |
664 | 0 | } => { |
665 | 0 | if opened_substream.is_none() { |
666 | 0 | 1 |
667 | | } else { |
668 | 0 | 0 |
669 | | } |
670 | | } |
671 | 0 | MultiStreamConnectionTaskInner::Established { established, .. } => { |
672 | 0 | established.desired_outbound_substreams() |
673 | | } |
674 | | MultiStreamConnectionTaskInner::ShutdownAcked { .. } |
675 | 0 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => 0, |
676 | | } |
677 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27desired_outbound_substreamsB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE27desired_outbound_substreamsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE27desired_outbound_substreamsB8_ |
678 | | |
679 | | /// Notifies the state machine that a new substream has been opened. |
680 | | /// |
681 | | /// `outbound` indicates whether the substream has been opened by the remote (`false`) or |
682 | | /// locally (`true`). |
683 | | /// |
684 | | /// If `outbound` is `true`, then the value returned by |
685 | | /// [`MultiStreamConnectionTask::desired_outbound_substreams`] will decrease by one. |
686 | | /// |
687 | | /// # Panic |
688 | | /// |
689 | | /// Panics if there already exists a substream with an identical identifier. |
690 | | /// |
691 | 0 | pub fn add_substream(&mut self, id: TSubId, outbound: bool) { |
692 | 0 | match &mut self.connection { |
693 | 0 | MultiStreamConnectionTaskInner::Handshake { |
694 | 0 | opened_substream: ref mut opened_substream @ None, |
695 | | .. |
696 | 0 | } if outbound => { |
697 | 0 | *opened_substream = Some((id, webrtc_framing::WebRtcFraming::new())); |
698 | 0 | } |
699 | | MultiStreamConnectionTaskInner::Handshake { |
700 | 0 | opened_substream, |
701 | 0 | extra_open_substreams, |
702 | 0 | .. |
703 | 0 | } => { |
704 | 0 | assert!(opened_substream |
705 | 0 | .as_ref() |
706 | 0 | .map_or(true, |(open, _)| *open != id)); Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE13add_substream0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE13add_substream0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE13add_substream0Ba_ |
707 | | // TODO: add a limit to the number allowed? |
708 | 0 | let _was_in = extra_open_substreams.insert(id, outbound); |
709 | 0 | assert!(_was_in.is_none()); |
710 | | } |
711 | 0 | MultiStreamConnectionTaskInner::Established { established, .. } => { |
712 | 0 | established.add_substream(id, outbound) |
713 | | } |
714 | | MultiStreamConnectionTaskInner::ShutdownAcked { .. } |
715 | 0 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => { |
716 | 0 | // TODO: reset the substream or something? |
717 | 0 | } |
718 | | } |
719 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE13add_substreamB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE13add_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE13add_substreamB8_ |
720 | | |
721 | | /// Sets the state of the connection to "reset". |
722 | | /// |
723 | | /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP |
724 | | /// RST flag. |
725 | | /// |
726 | | /// After this function has been called, it is illegal to call |
727 | | /// [`MultiStreamConnectionTask::substream_read_write`] or |
728 | | /// [`MultiStreamConnectionTask::reset`] again. |
729 | | /// |
730 | | /// Calling this function might have generated messages for the coordinator. |
731 | | /// [`MultiStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in |
732 | | /// order to process these messages. |
733 | | /// |
734 | | /// # Panic |
735 | | /// |
736 | | /// Panics if [`MultiStreamConnectionTask::reset`] has been called in the past. |
737 | | /// |
738 | 0 | pub fn reset(&mut self) { |
739 | 0 | match self.connection { |
740 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
741 | | initiator: ShutdownInitiator::Api, |
742 | | .. |
743 | | } |
744 | | | MultiStreamConnectionTaskInner::ShutdownAcked { |
745 | | initiator: ShutdownInitiator::Api, |
746 | | .. |
747 | | } => { |
748 | | // It is illegal to call `reset` a second time. |
749 | 0 | panic!() |
750 | | } |
751 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
752 | 0 | ref mut initiator, .. |
753 | | } |
754 | | | MultiStreamConnectionTaskInner::ShutdownAcked { |
755 | 0 | ref mut initiator, .. |
756 | 0 | } => { |
757 | 0 | // Mark the initiator as being the API in order to track proper API usage. |
758 | 0 | *initiator = ShutdownInitiator::Api; |
759 | 0 | } |
760 | 0 | _ => { |
761 | 0 | self.connection = MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
762 | 0 | initiator: ShutdownInitiator::Api, |
763 | 0 | shutdown_finish_message_sent: false, |
764 | 0 | start_shutdown_message_to_send: Some(Some(ShutdownCause::RemoteReset)), |
765 | 0 | }; |
766 | 0 | } |
767 | | } |
768 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE5resetB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE5resetCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE5resetB8_ |
769 | | |
770 | | /// Returns `true` if [`MultiStreamConnectionTask::reset`] has been called in the past. |
771 | 0 | pub fn is_reset_called(&self) -> bool { |
772 | 0 | matches!( |
773 | 0 | self.connection, |
774 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { |
775 | | initiator: ShutdownInitiator::Api, |
776 | | .. |
777 | | } | MultiStreamConnectionTaskInner::ShutdownAcked { |
778 | | initiator: ShutdownInitiator::Api, |
779 | | .. |
780 | | } |
781 | | ) |
782 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15is_reset_calledB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15is_reset_calledCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15is_reset_calledB8_ |
783 | | |
784 | | /// Immediately destroys the substream with the given identifier. |
785 | | /// |
786 | | /// The given identifier is now considered invalid by the state machine. |
787 | | /// |
788 | | /// # Panic |
789 | | /// |
790 | | /// Panics if there is no substream with that identifier. |
791 | | /// |
792 | 0 | pub fn reset_substream(&mut self, substream_id: &TSubId) { |
793 | 0 | match &mut self.connection { |
794 | | MultiStreamConnectionTaskInner::Established { |
795 | 0 | handshake_substream, |
796 | 0 | .. |
797 | 0 | } if handshake_substream |
798 | 0 | .as_ref() |
799 | 0 | .map_or(false, |s| s == substream_id) => Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE15reset_substream0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15reset_substream0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE15reset_substream0Ba_ |
800 | 0 | { |
801 | 0 | *handshake_substream = None; |
802 | 0 | } |
803 | 0 | MultiStreamConnectionTaskInner::Established { established, .. } => { |
804 | 0 | established.reset_substream(substream_id) |
805 | | } |
806 | | MultiStreamConnectionTaskInner::Handshake { |
807 | 0 | opened_substream: Some((opened_substream, _)), |
808 | 0 | .. |
809 | 0 | } if opened_substream == substream_id => { |
810 | 0 | // TODO: the handshake has failed, kill the connection? |
811 | 0 | } |
812 | | MultiStreamConnectionTaskInner::Handshake { |
813 | 0 | extra_open_substreams, |
814 | 0 | .. |
815 | 0 | } => { |
816 | 0 | let _was_in = extra_open_substreams.remove(substream_id).is_some(); |
817 | 0 | assert!(_was_in); |
818 | | } |
819 | | MultiStreamConnectionTaskInner::ShutdownAcked { .. } |
820 | 0 | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => { |
821 | 0 | // TODO: panic if substream id invalid? |
822 | 0 | } |
823 | | } |
824 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15reset_substreamB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE15reset_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE15reset_substreamB8_ |
825 | | |
826 | | /// Reads/writes data on the substream. |
827 | | /// |
828 | | /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead |
829 | | /// according to the state machine and its identifier is now invalid. If the reading or |
830 | | /// writing side of the substream was still open, then the user should reset that substream. |
831 | | /// |
832 | | /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and |
833 | | /// [`ReadWrite::write_bytes_queueable`] must always be `Some`. |
834 | | /// |
835 | | /// # Panic |
836 | | /// |
837 | | /// Panics if there is no substream with that identifier. |
838 | | /// Panics if this is a WebRTC connection, and the reading or writing side is closed. |
839 | | /// |
840 | | #[must_use] |
841 | 0 | pub fn substream_read_write( |
842 | 0 | &mut self, |
843 | 0 | substream_id: &TSubId, |
844 | 0 | read_write: &'_ mut ReadWrite<TNow>, |
845 | 0 | ) -> SubstreamFate { |
846 | 0 | // In WebRTC, the reading and writing sides are never closed. |
847 | 0 | // Note that the `established::MultiStream` state machine also performs this check, but |
848 | 0 | // we do it here again because we're not necessarily in the ̀`established` state. |
849 | 0 | assert!( |
850 | 0 | read_write.expected_incoming_bytes.is_some() |
851 | 0 | && read_write.write_bytes_queueable.is_some() |
852 | | ); |
853 | | |
854 | 0 | match &mut self.connection { |
855 | 0 | MultiStreamConnectionTaskInner::Handshake { |
856 | 0 | handshake, |
857 | 0 | opened_substream: Some((opened_handshake_substream, handshake_webrtc_framing)), |
858 | 0 | established, |
859 | 0 | extra_open_substreams, |
860 | 0 | } if opened_handshake_substream == substream_id => { |
861 | | // TODO: check the handshake timeout |
862 | | |
863 | | // Progress the Noise handshake. |
864 | 0 | let handshake_outcome = { |
865 | 0 | // The Noise data is not directly the data of the substream. Instead, |
866 | 0 | // everything is wrapped within a Protobuf frame. |
867 | 0 | let mut with_framing = match handshake_webrtc_framing.read_write(read_write) { |
868 | 0 | Ok(f) => f, |
869 | 0 | Err(_err) => { |
870 | 0 | // TODO: not great for diagnostic to just ignore the error; also, the connection should just reset entirely |
871 | 0 | return SubstreamFate::Reset; |
872 | | } |
873 | | }; |
874 | 0 | handshake.take().unwrap().read_write(&mut with_framing) |
875 | | }; |
876 | | |
877 | 0 | match handshake_outcome { |
878 | 0 | Ok(noise::NoiseHandshake::InProgress(handshake_update)) => { |
879 | 0 | *handshake = Some(handshake_update); |
880 | 0 | SubstreamFate::Continue |
881 | | } |
882 | 0 | Err(_err) => return SubstreamFate::Reset, // TODO: /!\ |
883 | | Ok(noise::NoiseHandshake::Success { |
884 | | cipher: _, |
885 | 0 | remote_peer_id, |
886 | 0 | }) => { |
887 | 0 | // The handshake has succeeded and we will transition into "established" |
888 | 0 | // mode. |
889 | 0 | let mut established = established.take().unwrap(); |
890 | 0 | for (substream_id, outbound) in extra_open_substreams.drain() { |
891 | 0 | established.add_substream(substream_id, outbound); |
892 | 0 | } |
893 | | |
894 | 0 | self.connection = MultiStreamConnectionTaskInner::Established { |
895 | 0 | established, |
896 | 0 | handshake_finished_message_to_send: Some(remote_peer_id), |
897 | 0 | handshake_substream: None, // TODO: do properly |
898 | 0 | outbound_substreams_map: hashbrown::HashMap::with_capacity_and_hasher( |
899 | 0 | 0, |
900 | 0 | Default::default(), |
901 | 0 | ), |
902 | 0 | notifications_in_close_acknowledgments: |
903 | 0 | hashbrown::HashSet::with_capacity_and_hasher(2, Default::default()), |
904 | 0 | inbound_accept_cancel_events: VecDeque::with_capacity(2), |
905 | 0 | }; |
906 | 0 |
|
907 | 0 | // TODO: hacky |
908 | 0 | SubstreamFate::Reset |
909 | | } |
910 | | } |
911 | | } |
912 | | MultiStreamConnectionTaskInner::Established { |
913 | 0 | handshake_substream, |
914 | 0 | .. |
915 | 0 | } if handshake_substream |
916 | 0 | .as_ref() |
917 | 0 | .map_or(false, |s| s == substream_id) => Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE20substream_read_write0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE20substream_read_write0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB4_25MultiStreamConnectionTaskppE20substream_read_write0Ba_ |
918 | 0 | { |
919 | 0 | // Close the writing side. If the reading side is closed, we indicate that the |
920 | 0 | // substream is dead. If the reading side is still open, we indicate that it's not |
921 | 0 | // dead and simply wait for the remote to close it. |
922 | 0 | // TODO: kill the connection if the remote sends more data? |
923 | 0 | read_write.close_write(); |
924 | 0 | if read_write.expected_incoming_bytes.is_none() { |
925 | 0 | *handshake_substream = None; |
926 | 0 | SubstreamFate::Reset |
927 | | } else { |
928 | 0 | SubstreamFate::Continue |
929 | | } |
930 | | } |
931 | 0 | MultiStreamConnectionTaskInner::Established { established, .. } => { |
932 | 0 | established.substream_read_write(substream_id, read_write) |
933 | | } |
934 | | MultiStreamConnectionTaskInner::Handshake { |
935 | 0 | extra_open_substreams, |
936 | 0 | .. |
937 | 0 | } => { |
938 | 0 | assert!(extra_open_substreams.contains_key(substream_id)); |
939 | | // Don't do anything. Don't read or write. Instead we wait for the handshake to |
940 | | // be finished. |
941 | 0 | SubstreamFate::Continue |
942 | | } |
943 | | MultiStreamConnectionTaskInner::ShutdownAcked { .. } |
944 | | | MultiStreamConnectionTaskInner::ShutdownWaitingAck { .. } => { |
945 | | // TODO: panic if substream id invalid? |
946 | 0 | SubstreamFate::Reset |
947 | | } |
948 | | } |
949 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE20substream_read_writeB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationjE20substream_read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection12multi_streamINtB2_25MultiStreamConnectionTaskppE20substream_read_writeB8_ |
950 | | } |