/__w/smoldot/smoldot/repo/lib/src/libp2p/collection/single_stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use super::{ |
19 | | super::{ |
20 | | connection::{established, single_stream_handshake}, |
21 | | read_write::ReadWrite, |
22 | | }, |
23 | | ConnectionToCoordinator, ConnectionToCoordinatorInner, CoordinatorToConnection, |
24 | | CoordinatorToConnectionInner, NotificationsOutErr, ShutdownCause, SubstreamId, |
25 | | }; |
26 | | |
27 | | use alloc::{collections::VecDeque, string::ToString as _, sync::Arc}; |
28 | | use core::{ |
29 | | mem, |
30 | | ops::{Add, Sub}, |
31 | | time::Duration, |
32 | | }; |
33 | | |
34 | | pub(super) struct Config<TNow> { |
35 | | pub(super) randomness_seed: [u8; 32], |
36 | | pub(super) handshake: single_stream_handshake::HealthyHandshake, |
37 | | pub(super) handshake_timeout: TNow, |
38 | | pub(super) max_inbound_substreams: usize, |
39 | | pub(super) substreams_capacity: usize, |
40 | | pub(super) max_protocol_name_len: usize, |
41 | | pub(super) ping_protocol: Arc<str>, |
42 | | } |
43 | | |
44 | | /// State machine dedicated to a single single-stream connection. |
45 | | pub struct SingleStreamConnectionTask<TNow> { |
46 | | /// State machine of the underlying connection. |
47 | | connection: SingleStreamConnectionTaskInner<TNow>, |
48 | | |
49 | | /// Buffer of messages destined to the coordinator. |
50 | | /// |
51 | | /// Never goes above a few elements. |
52 | | pending_messages: VecDeque<ConnectionToCoordinatorInner>, |
53 | | } |
54 | | |
55 | | enum SingleStreamConnectionTaskInner<TNow> { |
56 | | /// Connection is still in its handshake phase. |
57 | | Handshake { |
58 | | handshake: single_stream_handshake::HealthyHandshake, |
59 | | |
60 | | /// Seed that will be used to initialize randomness when building the |
61 | | /// [`established::SingleStream`]. |
62 | | /// This seed is computed during the handshake in order to avoid having to access a shared |
63 | | /// state when the handshake is over. While it seems a bit dangerous to leave a randomness |
64 | | /// seed in plain memory, the randomness isn't used for anything critical or related to |
65 | | /// cryptography, but only for example to avoid hash collision attacks. |
66 | | randomness_seed: [u8; 32], |
67 | | |
68 | | /// When the handshake phase times out. |
69 | | timeout: TNow, |
70 | | |
71 | | /// See [`super::Config::max_inbound_substreams`]. |
72 | | max_inbound_substreams: usize, |
73 | | |
74 | | /// See [`Config::substreams_capacity`]. |
75 | | substreams_capacity: usize, |
76 | | |
77 | | max_protocol_name_len: usize, |
78 | | |
79 | | /// See [`super::Config::ping_protocol`]. |
80 | | ping_protocol: Arc<str>, |
81 | | }, |
82 | | |
83 | | /// Connection has been fully established. |
84 | | Established { |
85 | | established: established::SingleStream<TNow, Option<SubstreamId>>, |
86 | | |
87 | | /// Because outgoing substream ids are assigned by the coordinator, we maintain a mapping |
88 | | /// of the "outer ids" to "inner ids". |
89 | | outbound_substreams_map: |
90 | | hashbrown::HashMap<SubstreamId, established::SubstreamId, fnv::FnvBuildHasher>, |
91 | | |
92 | | /// After a [`ConnectionToCoordinatorInner::NotificationsInOpenCancel`] or a |
93 | | /// [`ConnectionToCoordinatorInner::NotificationsInClose`] is emitted, an |
94 | | /// entry is added to this list. If the coordinator accepts or refuses a substream in this |
95 | | /// list, or closes a substream in this list, the acceptance/refusal/closing is dismissed. |
96 | | notifications_in_close_acknowledgments: VecDeque<established::SubstreamId>, |
97 | | |
98 | | /// After a `NotificationsInOpenCancel` is emitted by the connection, an |
99 | | /// entry is added to this list. If the coordinator accepts or refuses a substream in this |
100 | | /// list, the acceptance/refusal is dismissed. |
101 | | // TODO: this works only because SubstreamIds aren't reused |
102 | | inbound_negotiated_cancel_acknowledgments: |
103 | | hashbrown::HashSet<established::SubstreamId, fnv::FnvBuildHasher>, |
104 | | }, |
105 | | |
106 | | /// Connection has finished its shutdown. A [`ConnectionToCoordinatorInner::ShutdownFinished`] |
107 | | /// message has been sent and is waiting to be acknowledged. |
108 | | ShutdownWaitingAck { |
109 | | /// What has initiated the shutdown. |
110 | | initiator: ShutdownInitiator, |
111 | | }, |
112 | | |
113 | | /// Connection has finished its shutdown and its shutdown has been acknowledged. There is |
114 | | /// nothing more to do except stop the connection task. |
115 | | ShutdownAcked { |
116 | | /// What has initiated the shutdown. |
117 | | initiator: ShutdownInitiator, |
118 | | }, |
119 | | |
120 | | /// Temporary state used to satisfy the borrow checker during state transitions. |
121 | | Poisoned, |
122 | | } |
123 | | |
124 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
125 | | enum ShutdownInitiator { |
126 | | /// The coordinator sent a [`CoordinatorToConnectionInner::StartShutdown`] message. |
127 | | Coordinator, |
128 | | /// [`SingleStreamConnectionTask::reset`] has been called. |
129 | | Api, |
130 | | /// The shutdown has been initiated due to a protocol error or because the connection has been |
131 | | /// shut down cleanly by the remote. |
132 | | Remote, |
133 | | } |
134 | | |
135 | | impl<TNow> SingleStreamConnectionTask<TNow> |
136 | | where |
137 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
138 | | { |
139 | | // Note that the parameters of this function are a bit rough and undocumented, as this is |
140 | | // a function only called from the parent module. |
141 | 0 | pub(super) fn new(config: Config<TNow>) -> Self { |
142 | 0 | SingleStreamConnectionTask { |
143 | 0 | connection: SingleStreamConnectionTaskInner::Handshake { |
144 | 0 | handshake: config.handshake, |
145 | 0 | randomness_seed: config.randomness_seed, |
146 | 0 | timeout: config.handshake_timeout, |
147 | 0 | max_inbound_substreams: config.max_inbound_substreams, |
148 | 0 | substreams_capacity: config.substreams_capacity, |
149 | 0 | max_protocol_name_len: config.max_protocol_name_len, |
150 | 0 | ping_protocol: config.ping_protocol, |
151 | 0 | }, |
152 | 0 | pending_messages: VecDeque::with_capacity({ |
153 | 0 | // We never buffer more than a few messages. |
154 | 0 | 4 |
155 | 0 | }), |
156 | 0 | } |
157 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE3newB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE3newB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsiUjFBJteJ7x_17smoldot_full_node |
158 | | |
159 | | /// Pulls a message to send back to the coordinator. |
160 | | /// |
161 | | /// This function takes ownership of `self` and optionally yields it back. If the first |
162 | | /// option contains `None`, then no more message will be generated and the |
163 | | /// [`SingleStreamConnectionTask`] has vanished. This will happen after the connection has been |
164 | | /// shut down or reset. |
165 | | /// It is possible for `self` to not be yielded back even if the [`ReadWrite`] that was last |
166 | | /// passed to [`SingleStreamConnectionTask::read_write`] is still fully open, in which case the |
167 | | /// API user should abruptly reset the connection, for example by sending a TCP RST flag. This |
168 | | /// can happen for example if the connection seems unresponsive and that an attempt at closing |
169 | | /// the connection in a clean way is futile. |
170 | | /// |
171 | | /// If any message is returned, it is the responsibility of the API user to send it to the |
172 | | /// coordinator. |
173 | | /// Do not attempt to buffer the message being returned, as it would work against the |
174 | | /// back-pressure strategy used internally. As soon as a message is returned, it should be |
175 | | /// delivered. If the coordinator is busy at the moment a message should be delivered, then |
176 | | /// the entire thread of execution dedicated to this [`SingleStreamConnectionTask`] should be |
177 | | /// paused until the coordinator is ready and the message delivered. |
178 | | /// |
179 | | /// Messages aren't generated spontaneously. In other words, you don't need to periodically |
180 | | /// call this function just in case there's a new message. Messages are always generated after |
181 | | /// either [`SingleStreamConnectionTask::read_write`] or [`SingleStreamConnectionTask::reset`] |
182 | | /// has been called. Multiple messages can happen in a row. |
183 | | /// |
184 | | /// Because this function frees space in a buffer, calling |
185 | | /// [`SingleStreamConnectionTask::read_write`] again after it has returned might read/write |
186 | | /// more data and generate an event again. In other words, the API user should call |
187 | | /// [`SingleStreamConnectionTask::read_write`] and |
188 | | /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] repeatedly in a loop until no |
189 | | /// more message is generated. |
190 | 0 | pub fn pull_message_to_coordinator( |
191 | 0 | mut self, |
192 | 0 | ) -> (Option<Self>, Option<ConnectionToCoordinator>) { |
193 | 0 | // To be sure that there is no bug in the implementation, we make sure that the number of |
194 | 0 | // buffered messages doesn't go above a certain small limit. |
195 | 0 | debug_assert!(self.pending_messages.len() < 8); |
196 | | |
197 | 0 | let message = self |
198 | 0 | .pending_messages |
199 | 0 | .pop_front() |
200 | 0 | .map(|inner| ConnectionToCoordinator { inner }); Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE27pull_message_to_coordinator0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE27pull_message_to_coordinator0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE27pull_message_to_coordinator0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE27pull_message_to_coordinator0CsiUjFBJteJ7x_17smoldot_full_node |
201 | | |
202 | | // The `ShutdownAcked` state causes the task to exit. |
203 | 0 | let self_ret = if !matches!( |
204 | 0 | self.connection, |
205 | | SingleStreamConnectionTaskInner::ShutdownAcked { .. } |
206 | | ) { |
207 | 0 | Some(self) |
208 | | } else { |
209 | 0 | None |
210 | | }; |
211 | | |
212 | 0 | (self_ret, message) |
213 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE27pull_message_to_coordinatorB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE27pull_message_to_coordinatorCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE27pull_message_to_coordinatorB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE27pull_message_to_coordinatorCsiUjFBJteJ7x_17smoldot_full_node |
214 | | |
215 | | /// Injects a message that has been pulled from the coordinator. |
216 | | /// |
217 | | /// Calling this function might generate data to send to the connection. You should call |
218 | | /// [`SingleStreamConnectionTask::read_write`] after this function has returned (unless you |
219 | | /// have called [`SingleStreamConnectionTask::reset`] in the past). |
220 | 0 | pub fn inject_coordinator_message(&mut self, now: &TNow, message: CoordinatorToConnection) { |
221 | 0 | match (message.inner, &mut self.connection) { |
222 | | ( |
223 | | CoordinatorToConnectionInner::AcceptInbound { |
224 | 0 | substream_id, |
225 | 0 | inbound_ty, |
226 | 0 | }, |
227 | 0 | SingleStreamConnectionTaskInner::Established { |
228 | 0 | established, |
229 | 0 | inbound_negotiated_cancel_acknowledgments, |
230 | 0 | .. |
231 | 0 | }, |
232 | 0 | ) => { |
233 | 0 | if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) { |
234 | 0 | established.accept_inbound(substream_id, inbound_ty, None); |
235 | 0 | } else { |
236 | 0 | self.pending_messages.push_back( |
237 | 0 | ConnectionToCoordinatorInner::InboundAcceptedCancel { id: substream_id }, |
238 | 0 | ) |
239 | | } |
240 | | } |
241 | | ( |
242 | 0 | CoordinatorToConnectionInner::RejectInbound { substream_id }, |
243 | 0 | SingleStreamConnectionTaskInner::Established { |
244 | 0 | established, |
245 | 0 | inbound_negotiated_cancel_acknowledgments, |
246 | 0 | .. |
247 | 0 | }, |
248 | 0 | ) => { |
249 | 0 | if !inbound_negotiated_cancel_acknowledgments.remove(&substream_id) { |
250 | 0 | established.reject_inbound(substream_id); |
251 | 0 | } |
252 | | } |
253 | | ( |
254 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, |
255 | 0 | SingleStreamConnectionTaskInner::Handshake { |
256 | 0 | max_protocol_name_len, |
257 | 0 | .. |
258 | 0 | }, |
259 | 0 | ) => { |
260 | 0 | *max_protocol_name_len = new_max_length; |
261 | 0 | } |
262 | | ( |
263 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, |
264 | 0 | SingleStreamConnectionTaskInner::Established { established, .. }, |
265 | 0 | ) => { |
266 | 0 | established.set_max_protocol_name_len(new_max_length); |
267 | 0 | } |
268 | | ( |
269 | | CoordinatorToConnectionInner::StartRequest { |
270 | 0 | protocol_name, |
271 | 0 | request_data, |
272 | 0 | timeout, |
273 | 0 | max_response_size, |
274 | 0 | substream_id, |
275 | 0 | }, |
276 | 0 | SingleStreamConnectionTaskInner::Established { |
277 | 0 | established, |
278 | 0 | outbound_substreams_map, |
279 | 0 | .. |
280 | 0 | }, |
281 | 0 | ) => { |
282 | 0 | let inner_substream_id = established.add_request( |
283 | 0 | protocol_name, |
284 | 0 | request_data, |
285 | 0 | now.clone() + timeout, |
286 | 0 | max_response_size, |
287 | 0 | Some(substream_id), |
288 | 0 | ); |
289 | 0 |
|
290 | 0 | let _prev_value = outbound_substreams_map.insert(substream_id, inner_substream_id); |
291 | 0 | debug_assert!(_prev_value.is_none()); |
292 | | } |
293 | | ( |
294 | | CoordinatorToConnectionInner::OpenOutNotifications { |
295 | 0 | protocol_name, |
296 | 0 | max_handshake_size, |
297 | 0 | handshake, |
298 | 0 | handshake_timeout, |
299 | 0 | substream_id: outer_substream_id, |
300 | 0 | }, |
301 | 0 | SingleStreamConnectionTaskInner::Established { |
302 | 0 | established, |
303 | 0 | outbound_substreams_map, |
304 | 0 | .. |
305 | 0 | }, |
306 | 0 | ) => { |
307 | 0 | let inner_substream_id = established.open_notifications_substream( |
308 | 0 | protocol_name, |
309 | 0 | handshake, |
310 | 0 | max_handshake_size, |
311 | 0 | now.clone() + handshake_timeout, |
312 | 0 | Some(outer_substream_id), |
313 | 0 | ); |
314 | 0 |
|
315 | 0 | let _prev_value = |
316 | 0 | outbound_substreams_map.insert(outer_substream_id, inner_substream_id); |
317 | 0 | debug_assert!(_prev_value.is_none()); |
318 | | } |
319 | | ( |
320 | 0 | CoordinatorToConnectionInner::CloseOutNotifications { substream_id }, |
321 | 0 | SingleStreamConnectionTaskInner::Established { |
322 | 0 | established, |
323 | 0 | outbound_substreams_map, |
324 | | .. |
325 | | }, |
326 | | ) => { |
327 | | // It is possible that the remote has closed the outbound notification substream |
328 | | // while the `CloseOutNotifications` message was being delivered, or that the API |
329 | | // user close the substream before the message about the substream being closed |
330 | | // was delivered to the coordinator. |
331 | 0 | if let Some(inner_substream_id) = outbound_substreams_map.remove(&substream_id) { |
332 | 0 | established.close_out_notifications_substream(inner_substream_id); |
333 | 0 | } |
334 | | } |
335 | | ( |
336 | | CoordinatorToConnectionInner::QueueNotification { |
337 | 0 | substream_id, |
338 | 0 | notification, |
339 | 0 | }, |
340 | 0 | SingleStreamConnectionTaskInner::Established { |
341 | 0 | established, |
342 | 0 | outbound_substreams_map, |
343 | | .. |
344 | | }, |
345 | | ) => { |
346 | | // It is possible that the remote has closed the outbound notification substream |
347 | | // while a `QueueNotification` message was being delivered, or that the API user |
348 | | // queued a notification before the message about the substream being closed was |
349 | | // delivered to the coordinator. |
350 | | // If that happens, we intentionally silently discard the message, causing the |
351 | | // notification to not be sent. This is consistent with the guarantees about |
352 | | // notifications delivered that are documented in the public API. |
353 | 0 | if let Some(inner_substream_id) = outbound_substreams_map.get(&substream_id) { |
354 | 0 | established.write_notification_unbounded(*inner_substream_id, notification); |
355 | 0 | } |
356 | | } |
357 | | ( |
358 | | CoordinatorToConnectionInner::AnswerRequest { |
359 | 0 | substream_id, |
360 | 0 | response, |
361 | 0 | }, |
362 | 0 | SingleStreamConnectionTaskInner::Established { established, .. }, |
363 | 0 | ) => match established.respond_in_request(substream_id, response) { |
364 | 0 | Ok(()) => {} |
365 | 0 | Err(established::RespondInRequestError::SubstreamClosed) => { |
366 | 0 | // As documented, answering an obsolete request is simply ignored. |
367 | 0 | } |
368 | | }, |
369 | | ( |
370 | | CoordinatorToConnectionInner::AcceptInNotifications { |
371 | 0 | substream_id, |
372 | 0 | handshake, |
373 | 0 | max_notification_size, |
374 | 0 | }, |
375 | 0 | SingleStreamConnectionTaskInner::Established { |
376 | 0 | established, |
377 | 0 | notifications_in_close_acknowledgments, |
378 | | .. |
379 | | }, |
380 | | ) => { |
381 | 0 | if let Some(idx) = notifications_in_close_acknowledgments |
382 | 0 | .iter() |
383 | 0 | .position(|s| *s == substream_id) Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_message0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_message0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_message0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_message0CsiUjFBJteJ7x_17smoldot_full_node |
384 | 0 | { |
385 | 0 | notifications_in_close_acknowledgments.remove(idx); |
386 | 0 | } else { |
387 | 0 | established.accept_in_notifications_substream( |
388 | 0 | substream_id, |
389 | 0 | handshake, |
390 | 0 | max_notification_size, |
391 | 0 | ); |
392 | 0 | } |
393 | | } |
394 | | ( |
395 | 0 | CoordinatorToConnectionInner::RejectInNotifications { substream_id }, |
396 | 0 | SingleStreamConnectionTaskInner::Established { |
397 | 0 | established, |
398 | 0 | notifications_in_close_acknowledgments, |
399 | | .. |
400 | | }, |
401 | | ) => { |
402 | 0 | if let Some(idx) = notifications_in_close_acknowledgments |
403 | 0 | .iter() |
404 | 0 | .position(|s| *s == substream_id) Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages_0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messages_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages_0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messages_0CsiUjFBJteJ7x_17smoldot_full_node |
405 | 0 | { |
406 | 0 | notifications_in_close_acknowledgments.remove(idx); |
407 | 0 | } else { |
408 | 0 | established.reject_in_notifications_substream(substream_id); |
409 | 0 | } |
410 | | } |
411 | | ( |
412 | | CoordinatorToConnectionInner::CloseInNotifications { |
413 | 0 | substream_id, |
414 | 0 | timeout, |
415 | 0 | }, |
416 | 0 | SingleStreamConnectionTaskInner::Established { |
417 | 0 | established, |
418 | 0 | notifications_in_close_acknowledgments, |
419 | | .. |
420 | | }, |
421 | | ) => { |
422 | 0 | if let Some(idx) = notifications_in_close_acknowledgments |
423 | 0 | .iter() |
424 | 0 | .position(|s| *s == substream_id) Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages0_0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messages0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskpE26inject_coordinator_messages0_0Ba_ Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB4_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messages0_0CsiUjFBJteJ7x_17smoldot_full_node |
425 | 0 | { |
426 | 0 | notifications_in_close_acknowledgments.remove(idx); |
427 | 0 | } else { |
428 | 0 | established |
429 | 0 | .close_in_notifications_substream(substream_id, now.clone() + timeout); |
430 | 0 | } |
431 | | } |
432 | | ( |
433 | | CoordinatorToConnectionInner::StartShutdown { .. }, |
434 | | SingleStreamConnectionTaskInner::Established { .. } |
435 | | | SingleStreamConnectionTaskInner::Handshake { .. }, |
436 | 0 | ) => { |
437 | 0 | // TODO: implement proper shutdown |
438 | 0 | self.pending_messages |
439 | 0 | .push_back(ConnectionToCoordinatorInner::StartShutdown(None)); |
440 | 0 | self.pending_messages |
441 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
442 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
443 | 0 | initiator: ShutdownInitiator::Coordinator, |
444 | 0 | }; |
445 | 0 | } |
446 | | ( |
447 | 0 | CoordinatorToConnectionInner::AcceptInbound { .. } |
448 | 0 | | CoordinatorToConnectionInner::RejectInbound { .. } |
449 | 0 | | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. } |
450 | 0 | | CoordinatorToConnectionInner::AcceptInNotifications { .. } |
451 | 0 | | CoordinatorToConnectionInner::RejectInNotifications { .. } |
452 | 0 | | CoordinatorToConnectionInner::CloseInNotifications { .. } |
453 | 0 | | CoordinatorToConnectionInner::StartRequest { .. } |
454 | 0 | | CoordinatorToConnectionInner::AnswerRequest { .. } |
455 | 0 | | CoordinatorToConnectionInner::OpenOutNotifications { .. } |
456 | 0 | | CoordinatorToConnectionInner::CloseOutNotifications { .. } |
457 | 0 | | CoordinatorToConnectionInner::QueueNotification { .. }, |
458 | 0 | SingleStreamConnectionTaskInner::Handshake { .. } |
459 | 0 | | SingleStreamConnectionTaskInner::ShutdownAcked { .. }, |
460 | 0 | ) => unreachable!(), |
461 | | ( |
462 | | CoordinatorToConnectionInner::AcceptInbound { .. } |
463 | | | CoordinatorToConnectionInner::RejectInbound { .. } |
464 | | | CoordinatorToConnectionInner::SetMaxProtocolNameLen { .. } |
465 | | | CoordinatorToConnectionInner::AcceptInNotifications { .. } |
466 | | | CoordinatorToConnectionInner::RejectInNotifications { .. } |
467 | | | CoordinatorToConnectionInner::CloseInNotifications { .. } |
468 | | | CoordinatorToConnectionInner::StartRequest { .. } |
469 | | | CoordinatorToConnectionInner::AnswerRequest { .. } |
470 | | | CoordinatorToConnectionInner::OpenOutNotifications { .. } |
471 | | | CoordinatorToConnectionInner::CloseOutNotifications { .. } |
472 | | | CoordinatorToConnectionInner::QueueNotification { .. }, |
473 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { .. }, |
474 | | ) |
475 | | | ( |
476 | | CoordinatorToConnectionInner::StartShutdown, |
477 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
478 | | initiator: ShutdownInitiator::Api | ShutdownInitiator::Remote, |
479 | | }, |
480 | 0 | ) => { |
481 | 0 | // There might still be some messages coming from the coordinator after the |
482 | 0 | // connection task has sent a message indicating that it has shut down. This is |
483 | 0 | // due to the concurrent nature of the API and doesn't indicate a bug. These |
484 | 0 | // messages are simply ignored by the connection task. |
485 | 0 | } |
486 | | ( |
487 | | CoordinatorToConnectionInner::ShutdownFinishedAck, |
488 | 0 | SingleStreamConnectionTaskInner::ShutdownWaitingAck { initiator }, |
489 | 0 | ) => { |
490 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownAcked { |
491 | 0 | initiator: *initiator, |
492 | 0 | }; |
493 | 0 | } |
494 | | ( |
495 | | CoordinatorToConnectionInner::StartShutdown, |
496 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
497 | | initiator: ShutdownInitiator::Coordinator, |
498 | | .. |
499 | | } |
500 | | | SingleStreamConnectionTaskInner::ShutdownAcked { .. }, |
501 | 0 | ) => unreachable!(), |
502 | 0 | (CoordinatorToConnectionInner::ShutdownFinishedAck, _) => unreachable!(), |
503 | 0 | (_, SingleStreamConnectionTaskInner::Poisoned) => unreachable!(), |
504 | | } |
505 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE26inject_coordinator_messageB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE26inject_coordinator_messageCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE26inject_coordinator_messageB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE26inject_coordinator_messageCsiUjFBJteJ7x_17smoldot_full_node |
506 | | |
507 | | /// Sets the state of the connection to "reset". |
508 | | /// |
509 | | /// This should be called if the remote abruptly closes the connection, such as with a TCP/IP |
510 | | /// RST flag. |
511 | | /// |
512 | | /// After this function has been called, it is illegal to call |
513 | | /// [`SingleStreamConnectionTask::read_write`] or [`SingleStreamConnectionTask::reset`] again. |
514 | | /// |
515 | | /// Calling this function might have generated messages for the coordinator. |
516 | | /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in |
517 | | /// order to process these messages. |
518 | | /// |
519 | | /// # Panic |
520 | | /// |
521 | | /// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past. |
522 | | /// |
523 | 0 | pub fn reset(&mut self) { |
524 | 0 | match self.connection { |
525 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
526 | | initiator: ShutdownInitiator::Api, |
527 | | } |
528 | | | SingleStreamConnectionTaskInner::ShutdownAcked { |
529 | | initiator: ShutdownInitiator::Api, |
530 | | } => { |
531 | | // It is illegal to call `reset` a second time. |
532 | 0 | panic!() |
533 | | } |
534 | 0 | SingleStreamConnectionTaskInner::ShutdownWaitingAck { ref mut initiator } |
535 | 0 | | SingleStreamConnectionTaskInner::ShutdownAcked { ref mut initiator } => { |
536 | 0 | // Mark the initiator as being the API in order to track proper API usage. |
537 | 0 | *initiator = ShutdownInitiator::Api; |
538 | 0 | } |
539 | 0 | _ => { |
540 | 0 | self.pending_messages |
541 | 0 | .push_back(ConnectionToCoordinatorInner::StartShutdown(Some( |
542 | 0 | ShutdownCause::RemoteReset, |
543 | 0 | ))); |
544 | 0 | self.pending_messages |
545 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
546 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
547 | 0 | initiator: ShutdownInitiator::Api, |
548 | 0 | }; |
549 | 0 | } |
550 | | } |
551 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE5resetB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE5resetCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE5resetB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE5resetCsiUjFBJteJ7x_17smoldot_full_node |
552 | | |
553 | | /// Returns `true` if [`SingleStreamConnectionTask::reset`] has been called in the past. |
554 | 0 | pub fn is_reset_called(&self) -> bool { |
555 | 0 | matches!( |
556 | 0 | self.connection, |
557 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
558 | | initiator: ShutdownInitiator::Api, |
559 | | } | SingleStreamConnectionTaskInner::ShutdownAcked { |
560 | | initiator: ShutdownInitiator::Api, |
561 | | } |
562 | | ) |
563 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE15is_reset_calledB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE15is_reset_calledCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE15is_reset_calledB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE15is_reset_calledCsiUjFBJteJ7x_17smoldot_full_node |
564 | | |
565 | | /// Reads data coming from the connection, updates the internal state machine, and writes data |
566 | | /// destined to the connection through the [`ReadWrite`]. |
567 | | /// |
568 | | /// Calling this function might have generated messages for the coordinator. |
569 | | /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] should be called afterwards in |
570 | | /// order to process these messages. |
571 | | /// |
572 | | /// # Panic |
573 | | /// |
574 | | /// Panics if [`SingleStreamConnectionTask::reset`] has been called in the past. |
575 | | /// |
576 | 0 | pub fn read_write(&mut self, read_write: &'_ mut ReadWrite<TNow>) { |
577 | 0 | // There is already at least one pending message. We back-pressure the connection by not |
578 | 0 | // performing any reading or writing, as this might generate more messages and open the |
579 | 0 | // door for a DoS attack by the remote. As documented, the API user is supposed to pull |
580 | 0 | // messages after this function has returned, meaning that they will drain the messages. |
581 | 0 | if !self.pending_messages.is_empty() { |
582 | 0 | return; |
583 | 0 | } |
584 | 0 |
|
585 | 0 | match mem::replace( |
586 | 0 | &mut self.connection, |
587 | 0 | SingleStreamConnectionTaskInner::Poisoned, |
588 | | ) { |
589 | | SingleStreamConnectionTaskInner::Established { |
590 | 0 | established, |
591 | 0 | mut outbound_substreams_map, |
592 | 0 | mut notifications_in_close_acknowledgments, |
593 | 0 | mut inbound_negotiated_cancel_acknowledgments, |
594 | 0 | } => match established.read_write(read_write) { |
595 | 0 | Ok((connection, event)) => { |
596 | 0 | if read_write.is_dead() && read_write.wake_up_after.is_none() { |
597 | 0 | self.pending_messages.push_back( |
598 | 0 | ConnectionToCoordinatorInner::StartShutdown(Some( |
599 | 0 | ShutdownCause::CleanShutdown, |
600 | 0 | )), |
601 | 0 | ); |
602 | 0 | self.pending_messages |
603 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
604 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
605 | 0 | initiator: ShutdownInitiator::Remote, |
606 | 0 | }; |
607 | 0 | return; |
608 | 0 | } |
609 | | |
610 | 0 | match event { |
611 | | Some(established::Event::NewOutboundSubstreamsForbidden) => { |
612 | | // TODO: handle properly |
613 | 0 | self.pending_messages.push_back( |
614 | 0 | ConnectionToCoordinatorInner::StartShutdown(Some( |
615 | 0 | ShutdownCause::CleanShutdown, |
616 | 0 | )), |
617 | 0 | ); |
618 | 0 | self.pending_messages |
619 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
620 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
621 | 0 | initiator: ShutdownInitiator::Remote, |
622 | 0 | }; |
623 | 0 | return; |
624 | | } |
625 | 0 | Some(established::Event::InboundError(err)) => { |
626 | 0 | self.pending_messages |
627 | 0 | .push_back(ConnectionToCoordinatorInner::InboundError(err)); |
628 | 0 | } |
629 | 0 | Some(established::Event::InboundNegotiated { id, protocol_name }) => { |
630 | 0 | self.pending_messages.push_back( |
631 | 0 | ConnectionToCoordinatorInner::InboundNegotiated { |
632 | 0 | id, |
633 | 0 | protocol_name, |
634 | 0 | }, |
635 | 0 | ); |
636 | 0 | } |
637 | 0 | Some(established::Event::InboundNegotiatedCancel { id, .. }) => { |
638 | 0 | inbound_negotiated_cancel_acknowledgments.insert(id); |
639 | 0 | } |
640 | 0 | Some(established::Event::InboundAcceptedCancel { id, .. }) => { |
641 | 0 | self.pending_messages.push_back( |
642 | 0 | ConnectionToCoordinatorInner::InboundAcceptedCancel { id }, |
643 | 0 | ); |
644 | 0 | } |
645 | 0 | Some(established::Event::RequestIn { id, request, .. }) => { |
646 | 0 | self.pending_messages |
647 | 0 | .push_back(ConnectionToCoordinatorInner::RequestIn { id, request }); |
648 | 0 | } |
649 | | Some(established::Event::Response { |
650 | 0 | response, |
651 | 0 | user_data, |
652 | | .. |
653 | 0 | }) => { |
654 | 0 | let Some(outer_substream_id) = user_data else { |
655 | 0 | panic!() |
656 | | }; |
657 | 0 | outbound_substreams_map.remove(&outer_substream_id).unwrap(); |
658 | 0 | self.pending_messages.push_back( |
659 | 0 | ConnectionToCoordinatorInner::Response { |
660 | 0 | response, |
661 | 0 | id: outer_substream_id, |
662 | 0 | }, |
663 | 0 | ); |
664 | | } |
665 | 0 | Some(established::Event::NotificationsInOpen { id, handshake, .. }) => { |
666 | 0 | self.pending_messages.push_back( |
667 | 0 | ConnectionToCoordinatorInner::NotificationsInOpen { id, handshake }, |
668 | 0 | ); |
669 | 0 | } |
670 | 0 | Some(established::Event::NotificationsInOpenCancel { id, .. }) => { |
671 | 0 | notifications_in_close_acknowledgments.push_back(id); |
672 | 0 | self.pending_messages.push_back( |
673 | 0 | ConnectionToCoordinatorInner::NotificationsInOpenCancel { id }, |
674 | 0 | ); |
675 | 0 | } |
676 | 0 | Some(established::Event::NotificationIn { id, notification }) => { |
677 | 0 | self.pending_messages.push_back( |
678 | 0 | ConnectionToCoordinatorInner::NotificationIn { id, notification }, |
679 | 0 | ); |
680 | 0 | } |
681 | 0 | Some(established::Event::NotificationsInClose { id, outcome, .. }) => { |
682 | 0 | // TODO: only do this if not sent by API user |
683 | 0 | notifications_in_close_acknowledgments.push_back(id); |
684 | 0 | self.pending_messages.push_back( |
685 | 0 | ConnectionToCoordinatorInner::NotificationsInClose { id, outcome }, |
686 | 0 | ); |
687 | 0 | } |
688 | 0 | Some(established::Event::NotificationsOutResult { id, result }) => { |
689 | 0 | let (outer_substream_id, result) = match result { |
690 | 0 | Ok(r) => { |
691 | 0 | let Some(outer_substream_id) = connection[id] else { |
692 | 0 | panic!() |
693 | | }; |
694 | 0 | (outer_substream_id, Ok(r)) |
695 | | } |
696 | 0 | Err((err, ud)) => { |
697 | 0 | let Some(outer_substream_id) = ud else { |
698 | 0 | panic!() |
699 | | }; |
700 | 0 | outbound_substreams_map.remove(&outer_substream_id); |
701 | 0 | (outer_substream_id, Err(NotificationsOutErr::Substream(err))) |
702 | | } |
703 | | }; |
704 | | |
705 | 0 | self.pending_messages.push_back( |
706 | 0 | ConnectionToCoordinatorInner::NotificationsOutResult { |
707 | 0 | id: outer_substream_id, |
708 | 0 | result, |
709 | 0 | }, |
710 | 0 | ); |
711 | | } |
712 | 0 | Some(established::Event::NotificationsOutCloseDemanded { id }) => { |
713 | 0 | let Some(outer_substream_id) = connection[id] else { |
714 | 0 | panic!() |
715 | | }; |
716 | 0 | self.pending_messages.push_back( |
717 | 0 | ConnectionToCoordinatorInner::NotificationsOutCloseDemanded { |
718 | 0 | id: outer_substream_id, |
719 | 0 | }, |
720 | 0 | ); |
721 | | } |
722 | 0 | Some(established::Event::NotificationsOutReset { user_data, .. }) => { |
723 | 0 | let Some(outer_substream_id) = user_data else { |
724 | 0 | panic!() |
725 | | }; |
726 | 0 | outbound_substreams_map.remove(&outer_substream_id); |
727 | 0 | self.pending_messages.push_back( |
728 | 0 | ConnectionToCoordinatorInner::NotificationsOutReset { |
729 | 0 | id: outer_substream_id, |
730 | 0 | }, |
731 | 0 | ); |
732 | | } |
733 | 0 | Some(established::Event::PingOutSuccess { ping_time }) => { |
734 | 0 | self.pending_messages.push_back( |
735 | 0 | ConnectionToCoordinatorInner::PingOutSuccess { ping_time }, |
736 | 0 | ); |
737 | 0 | } |
738 | 0 | Some(established::Event::PingOutFailed) => { |
739 | 0 | self.pending_messages |
740 | 0 | .push_back(ConnectionToCoordinatorInner::PingOutFailed); |
741 | 0 | } |
742 | 0 | None => {} |
743 | | } |
744 | | |
745 | 0 | self.connection = SingleStreamConnectionTaskInner::Established { |
746 | 0 | established: connection, |
747 | 0 | outbound_substreams_map, |
748 | 0 | notifications_in_close_acknowledgments: |
749 | 0 | notifications_in_close_acknowledgments, |
750 | 0 | inbound_negotiated_cancel_acknowledgments, |
751 | 0 | }; |
752 | | } |
753 | 0 | Err(err) => { |
754 | 0 | self.pending_messages |
755 | 0 | .push_back(ConnectionToCoordinatorInner::StartShutdown(Some( |
756 | 0 | ShutdownCause::ProtocolError(err), |
757 | 0 | ))); |
758 | 0 | self.pending_messages |
759 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
760 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
761 | 0 | initiator: ShutdownInitiator::Remote, |
762 | 0 | }; |
763 | 0 | } |
764 | | }, |
765 | | |
766 | | SingleStreamConnectionTaskInner::Handshake { |
767 | 0 | mut handshake, |
768 | 0 | randomness_seed, |
769 | 0 | timeout, |
770 | 0 | max_inbound_substreams, |
771 | 0 | substreams_capacity, |
772 | 0 | max_protocol_name_len, |
773 | 0 | ping_protocol, |
774 | 0 | } => { |
775 | 0 | // Check that the handshake isn't taking too long. |
776 | 0 | // |
777 | 0 | // Note that we check this condition before looking into the incoming data, |
778 | 0 | // and it is possible for the buffers to contain the data that leads to the |
779 | 0 | // handshake being finished. If that is the case, however, it is impossible to |
780 | 0 | // know whether this data arrived before or after the timeout. |
781 | 0 | // Whether to put this check before or after reading the buffer is a choice |
782 | 0 | // between having false negatives or having false positives for the timeout. |
783 | 0 | // We are more strict than necessary by having the check before, but this |
784 | 0 | // guarantees that no horrendously slow connections can accidentally make their |
785 | 0 | // way through. |
786 | 0 | if timeout < read_write.now { |
787 | 0 | self.pending_messages |
788 | 0 | .push_back(ConnectionToCoordinatorInner::StartShutdown(Some( |
789 | 0 | ShutdownCause::HandshakeTimeout, |
790 | 0 | ))); |
791 | 0 | self.pending_messages |
792 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
793 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
794 | 0 | initiator: ShutdownInitiator::Remote, |
795 | 0 | }; |
796 | 0 | return; |
797 | 0 | } |
798 | | |
799 | 0 | loop { |
800 | 0 | let (read_before, written_before) = |
801 | 0 | (read_write.read_bytes, read_write.write_bytes_queued); |
802 | | |
803 | 0 | let result = match handshake.read_write(read_write) { |
804 | 0 | Ok(rw) => rw, |
805 | 0 | Err(err) => { |
806 | 0 | self.pending_messages.push_back( |
807 | 0 | ConnectionToCoordinatorInner::StartShutdown(Some( |
808 | 0 | ShutdownCause::HandshakeError(err), |
809 | 0 | )), |
810 | 0 | ); |
811 | 0 | self.pending_messages |
812 | 0 | .push_back(ConnectionToCoordinatorInner::ShutdownFinished); |
813 | 0 | self.connection = SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
814 | 0 | initiator: ShutdownInitiator::Remote, |
815 | 0 | }; |
816 | 0 | return; |
817 | | } |
818 | | }; |
819 | | |
820 | 0 | match result { |
821 | 0 | single_stream_handshake::Handshake::Healthy(updated_handshake) |
822 | 0 | if (read_before, written_before) |
823 | 0 | == (read_write.read_bytes, read_write.write_bytes_queued) => |
824 | 0 | { |
825 | 0 | // `read_write()` should be called again as soon as possible |
826 | 0 | // after `timeout`. |
827 | 0 | read_write.wake_up_after(&timeout); |
828 | 0 | self.connection = SingleStreamConnectionTaskInner::Handshake { |
829 | 0 | handshake: updated_handshake, |
830 | 0 | randomness_seed, |
831 | 0 | timeout, |
832 | 0 | max_inbound_substreams, |
833 | 0 | substreams_capacity, |
834 | 0 | max_protocol_name_len, |
835 | 0 | ping_protocol, |
836 | 0 | }; |
837 | 0 | break; |
838 | | } |
839 | 0 | single_stream_handshake::Handshake::Healthy(updated_handshake) => { |
840 | 0 | handshake = updated_handshake; |
841 | 0 | } |
842 | | single_stream_handshake::Handshake::Success { |
843 | 0 | remote_peer_id, |
844 | 0 | connection, |
845 | 0 | } => { |
846 | 0 | self.pending_messages.push_back( |
847 | 0 | ConnectionToCoordinatorInner::HandshakeFinished(remote_peer_id), |
848 | 0 | ); |
849 | 0 |
|
850 | 0 | self.connection = SingleStreamConnectionTaskInner::Established { |
851 | 0 | established: connection.into_connection(established::Config { |
852 | 0 | max_inbound_substreams, |
853 | 0 | substreams_capacity, |
854 | 0 | max_protocol_name_len, |
855 | 0 | randomness_seed, |
856 | 0 | ping_protocol: ping_protocol.to_string(), // TODO: cloning :-/ |
857 | 0 | ping_interval: Duration::from_secs(20), // TODO: hardcoded |
858 | 0 | ping_timeout: Duration::from_secs(10), // TODO: hardcoded |
859 | 0 | first_out_ping: read_write.now.clone() + Duration::from_secs(2), // TODO: hardcoded |
860 | 0 | }), |
861 | 0 | outbound_substreams_map: |
862 | 0 | hashbrown::HashMap::with_capacity_and_hasher( |
863 | 0 | 0, |
864 | 0 | Default::default(), |
865 | 0 | ), // TODO: capacity? |
866 | 0 | notifications_in_close_acknowledgments: VecDeque::with_capacity(4), |
867 | 0 | inbound_negotiated_cancel_acknowledgments: |
868 | 0 | hashbrown::HashSet::with_capacity_and_hasher( |
869 | 0 | 2, |
870 | 0 | Default::default(), |
871 | 0 | ), |
872 | 0 | }; |
873 | 0 | break; |
874 | | } |
875 | | } |
876 | | } |
877 | | } |
878 | | |
879 | 0 | c @ (SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
880 | | initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote, |
881 | | } |
882 | | | SingleStreamConnectionTaskInner::ShutdownAcked { |
883 | | initiator: ShutdownInitiator::Coordinator | ShutdownInitiator::Remote, |
884 | 0 | }) => { |
885 | 0 | // The user might legitimately call this function after the connection has |
886 | 0 | // already shut down. In that case, just do nothing. |
887 | 0 | self.connection = c; |
888 | 0 |
|
889 | 0 | // This might have been done already during the shutdown process, but we do it |
890 | 0 | // again just in case. |
891 | 0 | read_write.close_write(); |
892 | 0 | } |
893 | | |
894 | | SingleStreamConnectionTaskInner::ShutdownWaitingAck { |
895 | | initiator: ShutdownInitiator::Api, |
896 | | } |
897 | | | SingleStreamConnectionTaskInner::ShutdownAcked { |
898 | | initiator: ShutdownInitiator::Api, |
899 | | } => { |
900 | | // As documented, it is illegal to call this function after calling `reset()`. |
901 | 0 | panic!() |
902 | | } |
903 | | |
904 | 0 | SingleStreamConnectionTaskInner::Poisoned => unreachable!(), |
905 | | } |
906 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE10read_writeB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskpE10read_writeB8_ Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10collection13single_streamINtB2_26SingleStreamConnectionTaskNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_writeCsiUjFBJteJ7x_17smoldot_full_node |
907 | | } |