/__w/smoldot/smoldot/repo/lib/src/libp2p/collection.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! Collection of libp2p connections. |
19 | | //! |
20 | | //! The [`Network`] struct in this module is a collection of libp2p connections. In the |
21 | | //! documentation below, it is also called the *coordinator*. |
22 | | //! |
23 | | //! When a connection is inserted in the collection with [`Network::insert_single_stream`] or |
24 | | //! [`Network::insert_multi_stream`], two objects are returned: an identifier for this new |
25 | | //! connection assigned by the collection, and a [`SingleStreamConnectionTask`] or |
26 | | //! [`MultiStreamConnectionTask`]. |
27 | | //! |
28 | | //! All the [`SingleStreamConnectionTask`]s/[`MultiStreamConnectionTask`] created by the |
29 | | //! [`Network`] communicate with that [`Network`] by passing messages. Passing the messages has |
30 | | //! to be done explicitly by the API user. It is the responsibility of the API user to pull |
31 | | //! messages from the coordinator (i.e. the [`Network`]) and push them onto the |
32 | | //! [`SingleStreamConnectionTask`] or [`MultiStreamConnectionTask`] and vice-versa. |
33 | | //! |
34 | | //! # Usage |
35 | | //! |
36 | | //! - Whenever a new connection is established, use [`Network::insert_single_stream`] or |
37 | | //! [`Network::insert_multi_stream`] to allocate a connection in the collection. |
38 | | //! - When a connection has received data or is ready to send more data, use |
39 | | //! [`SingleStreamConnectionTask::read_write`], [`SingleStreamConnectionTask::reset`], |
40 | | //! [`MultiStreamConnectionTask::substream_read_write`], [`MultiStreamConnectionTask::reset`], |
41 | | //! [`MultiStreamConnectionTask::add_substream`], and/or |
42 | | //! [`MultiStreamConnectionTask::desired_outbound_substreams`] to synchronize the state of the |
43 | | //! task with the actual state of the connection. |
44 | | //! - Pull messages from the [`SingleStreamConnectionTask`]s and [`MultiStreamConnectionTask`]s |
45 | | //! and inject them into the [`Network`], and vice versa. |
46 | | //! - In parallel, continuously call [`Network::next_event`] to process the events generated by |
47 | | //! the calls to [`Network::inject_connection_message`]. |
48 | | //! |
49 | | |
50 | | use crate::libp2p::connection::noise; |
51 | | |
52 | | use super::connection::{established, single_stream_handshake}; |
53 | | use alloc::{ |
54 | | collections::{BTreeMap, BTreeSet, VecDeque}, |
55 | | string::String, |
56 | | sync::Arc, |
57 | | vec::Vec, |
58 | | }; |
59 | | use core::{ |
60 | | hash::Hash, |
61 | | marker::PhantomData, |
62 | | ops::{self, Add, Sub}, |
63 | | time::Duration, |
64 | | }; |
65 | | use rand_chacha::{ |
66 | | rand_core::{RngCore as _, SeedableRng as _}, |
67 | | ChaCha20Rng, |
68 | | }; |
69 | | |
70 | | pub use super::peer_id::PeerId; |
71 | | pub use super::read_write::ReadWrite; |
72 | | pub use established::{InboundError, InboundTy, SubstreamFate}; |
73 | | pub use single_stream_handshake::HandshakeError; |
74 | | |
75 | | pub use multi_stream::MultiStreamConnectionTask; |
76 | | pub use single_stream::SingleStreamConnectionTask; |
77 | | |
78 | | mod multi_stream; |
79 | | mod single_stream; |
80 | | |
81 | | /// What kind of handshake to perform on the newly-added connection. |
82 | | pub enum SingleStreamHandshakeKind<'a> { |
83 | | /// Use the multistream-select protocol to negotiate the Noise encryption, then use the |
84 | | /// multistream-select protocol to negotiate the Yamux multiplexing. |
85 | | MultistreamSelectNoiseYamux { |
86 | | /// Must be `true` if the connection has been initiated locally, or `false` if it has been |
87 | | /// initiated by the remote. |
88 | | is_initiator: bool, |
89 | | /// Local secret key to use for the handshake. |
90 | | noise_key: &'a noise::NoiseKey, |
91 | | }, |
92 | | } |
93 | | |
94 | | /// What kind of handshake to perform on the newly-added connection. |
95 | | pub enum MultiStreamHandshakeKind<'a> { |
96 | | /// The connection is a WebRTC connection. |
97 | | /// |
98 | | /// See <https://github.com/libp2p/specs/pull/412> for details. |
99 | | /// |
100 | | /// The reading and writing side of substreams must never be closed. Substreams can only be |
101 | | /// abruptly destroyed by either side. |
102 | | WebRtc { |
103 | | /// Must be `true` if the connection has been initiated locally, or `false` if it has been |
104 | | /// initiated by the remote. |
105 | | is_initiator: bool, |
106 | | /// Local secret key to use for the handshake. |
107 | | noise_key: &'a noise::NoiseKey, |
108 | | /// Multihash encoding of the TLS certificate used by the local node at the DTLS layer. |
109 | | local_tls_certificate_multihash: Vec<u8>, |
110 | | /// Multihash encoding of the TLS certificate used by the remote node at the DTLS layer. |
111 | | remote_tls_certificate_multihash: Vec<u8>, |
112 | | }, |
113 | | } |
114 | | |
115 | | /// Configuration for a [`Network`]. |
116 | | pub struct Config { |
117 | | /// Seed for the randomness within the networking state machine. |
118 | | pub randomness_seed: [u8; 32], |
119 | | |
120 | | /// Number of connections containers should initially allocate for. |
121 | | pub capacity: usize, |
122 | | |
123 | | /// Maximum number of substreams that each remote can have simultaneously opened. |
124 | | /// |
125 | | /// > **Note**: This limit is necessary in order to avoid DoS attacks where a remote opens too |
126 | | /// > many substreams. |
127 | | pub max_inbound_substreams: usize, |
128 | | |
129 | | /// Maximum size in bytes of the protocols supported by the local node. Any protocol larger |
130 | | /// than that requested by the remote is automatically refused. Necessary in order to avoid |
131 | | /// situations where the remote sends an infinitely-sized protocol name. |
132 | | pub max_protocol_name_len: usize, |
133 | | |
134 | | /// Amount of time after which a connection handshake is considered to have taken too long |
135 | | /// and must be aborted. |
136 | | pub handshake_timeout: Duration, |
137 | | |
138 | | /// Name of the ping protocol on the network. |
139 | | pub ping_protocol: String, |
140 | | } |
141 | | |
142 | | /// Identifier of a connection spawned by the [`Network`]. |
143 | | // |
144 | | // Identifiers are never reused. |
145 | | #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] |
146 | | pub struct ConnectionId(u64); |
147 | | |
148 | | impl ConnectionId { |
149 | | /// Value that compares inferior or equal to any possible [`ConnectionId`̀]. |
150 | | pub const MIN: Self = ConnectionId(u64::MIN); |
151 | | /// Value that compares superior or equal to any possible [`ConnectionId`̀]. |
152 | | pub const MAX: Self = ConnectionId(u64::MAX); |
153 | | } |
154 | | |
155 | | /// Identifier of a request, or an inbound substream, or an outbound substream. |
156 | | // |
157 | | // Identifiers are never reused. |
158 | | #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] |
159 | | pub struct SubstreamId(u64); |
160 | | |
161 | | impl SubstreamId { |
162 | | /// Value that compares inferior or equal to any possible [`SubstreamId`̀]. |
163 | | pub const MIN: Self = SubstreamId(u64::MIN); |
164 | | /// Value that compares superior or equal to any possible [`SubstreamId`̀]. |
165 | | pub const MAX: Self = SubstreamId(u64::MAX); |
166 | | } |
167 | | |
168 | | /// Data structure containing the list of all connections, pending or not, and their latest known |
169 | | /// state. See also [the module-level documentation](..). |
170 | | pub struct Network<TConn, TNow> { |
171 | | /// Messages waiting to be sent to connection tasks. |
172 | | messages_to_connections: VecDeque<(ConnectionId, CoordinatorToConnectionInner)>, |
173 | | |
174 | | /// Messages received from connection tasks. Processed in [`Network::next_event`]. |
175 | | pending_incoming_messages: VecDeque<(ConnectionId, ConnectionToCoordinatorInner)>, |
176 | | |
177 | | /// Id to assign to the next connection. |
178 | | next_connection_id: ConnectionId, |
179 | | |
180 | | /// Id to assign to the next substream, such as the next request or next notifications |
181 | | /// substream. |
182 | | /// |
183 | | /// All substreams share the same pool of ids across all connections. |
184 | | next_substream_id: SubstreamId, |
185 | | |
186 | | /// List of all connections in the data structure. |
187 | | connections: hashbrown::HashMap<ConnectionId, Connection<TConn>, fnv::FnvBuildHasher>, |
188 | | |
189 | | /// If `Some`, the given connection is in the process of shutting down. Calling |
190 | | /// [`Network::next_event`] will cancel all ongoing requests and notification substreams |
191 | | /// that concern this connection before processing any other incoming message. |
192 | | shutting_down_connection: Option<ConnectionId>, |
193 | | |
194 | | /// List of connections for which a [`ConnectionToCoordinatorInner::ShutdownFinished`] has |
195 | | /// been received and a [`CoordinatorToConnectionInner::ShutdownFinishedAck`] has been sent. |
196 | | /// We can now remove these connections and generate a [`Event::Shutdown`]. |
197 | | shutdown_finished_connections: VecDeque<ConnectionId>, |
198 | | |
199 | | /// List of all outgoing notification substreams that we have opened. Can be either pending |
200 | | /// (waiting for the connection task to say whether it has been accepted or not) or fully |
201 | | /// open. |
202 | | outgoing_notification_substreams: |
203 | | hashbrown::HashMap<SubstreamId, (ConnectionId, SubstreamState), fnv::FnvBuildHasher>, |
204 | | |
205 | | /// Always contains the same entries as [`Network::outgoing_notification_substreams`] but |
206 | | /// ordered differently. |
207 | | // TODO: group with the other similar BTreeSets? |
208 | | outgoing_notification_substreams_by_connection: BTreeSet<(ConnectionId, SubstreamId)>, |
209 | | |
210 | | /// List of all requests that have been started locally. |
211 | | outgoing_requests: BTreeSet<(ConnectionId, SubstreamId)>, |
212 | | |
213 | | /// List of inbound negotiated substreams that connections have received and haven't been |
214 | | /// answered by the API user yet. |
215 | | ingoing_negotiated_substreams: hashbrown::HashMap< |
216 | | SubstreamId, |
217 | | (ConnectionId, established::SubstreamId, bool), |
218 | | fnv::FnvBuildHasher, |
219 | | >, |
220 | | |
221 | | /// Always contains the same entries as [`Network::ingoing_negotiated_substreams`] but ordered |
222 | | /// differently. |
223 | | // TODO: group with the other similar BTreeSets? |
224 | | ingoing_negotiated_substreams_by_connection: |
225 | | BTreeMap<(ConnectionId, established::SubstreamId), SubstreamId>, |
226 | | |
227 | | /// List in incoming notification substreams that connections have received. Can be either |
228 | | /// pending (waiting to be accepted/refused) or fully opened. |
229 | | /// |
230 | | /// The substream ID of the substream is allocated by the connection task, and thus we need |
231 | | /// to keep a mapping of inner `<->` substream IDs. |
232 | | ingoing_notification_substreams: hashbrown::HashMap< |
233 | | SubstreamId, |
234 | | (ConnectionId, SubstreamState, established::SubstreamId), |
235 | | fnv::FnvBuildHasher, |
236 | | >, |
237 | | |
238 | | /// Always contains the same entries as [`Network::ingoing_notification_substreams`] but |
239 | | /// ordered differently. |
240 | | // TODO: group with the other similar BTreeSets? |
241 | | ingoing_notification_substreams_by_connection: |
242 | | BTreeMap<(ConnectionId, established::SubstreamId), SubstreamId>, |
243 | | |
244 | | /// List of requests that connections have received and haven't been answered by the API user |
245 | | /// yet. |
246 | | ingoing_requests: hashbrown::HashMap< |
247 | | SubstreamId, |
248 | | (ConnectionId, established::SubstreamId), |
249 | | fnv::FnvBuildHasher, |
250 | | >, |
251 | | |
252 | | /// Always contains the same entries as [`Network::ingoing_requests`] but ordered differently. |
253 | | // TODO: group with the other similar BTreeSets? |
254 | | ingoing_requests_by_connection: BTreeSet<(ConnectionId, SubstreamId)>, |
255 | | |
256 | | /// Generator for randomness seeds given to the established connections. |
257 | | randomness_seeds: ChaCha20Rng, |
258 | | |
259 | | /// See [`Config::max_inbound_substreams`]. |
260 | | max_inbound_substreams: usize, |
261 | | |
262 | | /// See [`Config::max_protocol_name_len`]. |
263 | | max_protocol_name_len: usize, |
264 | | |
265 | | /// See [`Config::handshake_timeout`]. |
266 | | handshake_timeout: Duration, |
267 | | |
268 | | /// See [`Config::ping_protocol`]. |
269 | | ping_protocol: Arc<str>, |
270 | | |
271 | | // Phantom data to keep the `TNow` type pinned. |
272 | | // TODO: considering removing |
273 | | now_pin: PhantomData<fn() -> TNow>, |
274 | | } |
275 | | |
276 | | struct Connection<TConn> { |
277 | | state: InnerConnectionState, |
278 | | |
279 | | user_data: TConn, |
280 | | } |
281 | | |
282 | | enum InnerConnectionState { |
283 | | /// The connection is still in its handshaking state. |
284 | | Handshaking, |
285 | | /// The connection is fully established. |
286 | | Established, |
287 | | /// The connection is in the process of shutting down. |
288 | | ShuttingDown { |
289 | | /// `true` if the state before the shutdown was [`InnerConnectionState::Established`]. |
290 | | was_established: bool, |
291 | | |
292 | | /// `true` if [`Network::start_shutdown`] has been called on this connection. |
293 | | /// |
294 | | /// Even if the remote starts the shutdown at the same time, from an API perspective if |
295 | | /// this flag is `true` it will be considered as if the API user had initiated the |
296 | | /// shutdown. |
297 | | api_initiated: bool, |
298 | | }, |
299 | | } |
300 | | |
301 | | /// See [`Network::outgoing_notification_substreams`] and |
302 | | /// [`Network::ingoing_notification_substreams`]. |
303 | | /// |
304 | | /// > **Note**: There is no `Closed` variant, as this corresponds to a lack of entry in the map. |
305 | | #[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)] |
306 | | enum SubstreamState { |
307 | | /// Substream hasn't been accepted or refused yet. |
308 | | Pending, |
309 | | Open, |
310 | | /// Substream is in the process of being closed. Only relevant for inbound substreams. |
311 | | RequestedClosing, |
312 | | } |
313 | | |
314 | | impl<TConn, TNow> Network<TConn, TNow> |
315 | | where |
316 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
317 | | { |
318 | | /// Initializes a new network data structure. |
319 | 21 | pub fn new(config: Config) -> Self { |
320 | 21 | // The initial capacities given to the containers below are more or less arbitrary, the |
321 | 21 | // objective being to avoid relocating the containers. |
322 | 21 | Network { |
323 | 21 | messages_to_connections: VecDeque::with_capacity(config.capacity * 2), |
324 | 21 | pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2), |
325 | 21 | next_substream_id: SubstreamId(0), |
326 | 21 | handshake_timeout: config.handshake_timeout, |
327 | 21 | next_connection_id: ConnectionId(0), |
328 | 21 | connections: hashbrown::HashMap::with_capacity_and_hasher( |
329 | 21 | config.capacity, |
330 | 21 | Default::default(), |
331 | 21 | ), |
332 | 21 | shutting_down_connection: None, |
333 | 21 | shutdown_finished_connections: VecDeque::with_capacity(config.capacity), |
334 | 21 | outgoing_requests: BTreeSet::new(), |
335 | 21 | ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher( |
336 | 21 | 4 * config.capacity, |
337 | 21 | Default::default(), |
338 | 21 | ), |
339 | 21 | ingoing_requests_by_connection: BTreeSet::new(), |
340 | 21 | outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( |
341 | 21 | 4 * config.capacity, |
342 | 21 | Default::default(), |
343 | 21 | ), |
344 | 21 | outgoing_notification_substreams_by_connection: BTreeSet::new(), |
345 | 21 | ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( |
346 | 21 | 4 * config.capacity, |
347 | 21 | Default::default(), |
348 | 21 | ), |
349 | 21 | ingoing_notification_substreams_by_connection: BTreeMap::new(), |
350 | 21 | ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher( |
351 | 21 | 4 * config.capacity, |
352 | 21 | Default::default(), |
353 | 21 | ), |
354 | 21 | ingoing_negotiated_substreams_by_connection: BTreeMap::new(), |
355 | 21 | randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed), |
356 | 21 | max_inbound_substreams: config.max_inbound_substreams, |
357 | 21 | max_protocol_name_len: config.max_protocol_name_len, |
358 | 21 | ping_protocol: config.ping_protocol.into(), |
359 | 21 | now_pin: PhantomData, |
360 | 21 | } |
361 | 21 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE3newB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE3newB9_ _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 319 | 2 | pub fn new(config: Config) -> Self { | 320 | 2 | // The initial capacities given to the containers below are more or less arbitrary, the | 321 | 2 | // objective being to avoid relocating the containers. | 322 | 2 | Network { | 323 | 2 | messages_to_connections: VecDeque::with_capacity(config.capacity * 2), | 324 | 2 | pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2), | 325 | 2 | next_substream_id: SubstreamId(0), | 326 | 2 | handshake_timeout: config.handshake_timeout, | 327 | 2 | next_connection_id: ConnectionId(0), | 328 | 2 | connections: hashbrown::HashMap::with_capacity_and_hasher( | 329 | 2 | config.capacity, | 330 | 2 | Default::default(), | 331 | 2 | ), | 332 | 2 | shutting_down_connection: None, | 333 | 2 | shutdown_finished_connections: VecDeque::with_capacity(config.capacity), | 334 | 2 | outgoing_requests: BTreeSet::new(), | 335 | 2 | ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher( | 336 | 2 | 4 * config.capacity, | 337 | 2 | Default::default(), | 338 | 2 | ), | 339 | 2 | ingoing_requests_by_connection: BTreeSet::new(), | 340 | 2 | outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 341 | 2 | 4 * config.capacity, | 342 | 2 | Default::default(), | 343 | 2 | ), | 344 | 2 | outgoing_notification_substreams_by_connection: BTreeSet::new(), | 345 | 2 | ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 346 | 2 | 4 * config.capacity, | 347 | 2 | Default::default(), | 348 | 2 | ), | 349 | 2 | ingoing_notification_substreams_by_connection: BTreeMap::new(), | 350 | 2 | ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 351 | 2 | 4 * config.capacity, | 352 | 2 | Default::default(), | 353 | 2 | ), | 354 | 2 | ingoing_negotiated_substreams_by_connection: BTreeMap::new(), | 355 | 2 | randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed), | 356 | 2 | max_inbound_substreams: config.max_inbound_substreams, | 357 | 2 | max_protocol_name_len: config.max_protocol_name_len, | 358 | 2 | ping_protocol: config.ping_protocol.into(), | 359 | 2 | now_pin: PhantomData, | 360 | 2 | } | 361 | 2 | } |
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCscDgN54JpMGG_6author _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 319 | 19 | pub fn new(config: Config) -> Self { | 320 | 19 | // The initial capacities given to the containers below are more or less arbitrary, the | 321 | 19 | // objective being to avoid relocating the containers. | 322 | 19 | Network { | 323 | 19 | messages_to_connections: VecDeque::with_capacity(config.capacity * 2), | 324 | 19 | pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2), | 325 | 19 | next_substream_id: SubstreamId(0), | 326 | 19 | handshake_timeout: config.handshake_timeout, | 327 | 19 | next_connection_id: ConnectionId(0), | 328 | 19 | connections: hashbrown::HashMap::with_capacity_and_hasher( | 329 | 19 | config.capacity, | 330 | 19 | Default::default(), | 331 | 19 | ), | 332 | 19 | shutting_down_connection: None, | 333 | 19 | shutdown_finished_connections: VecDeque::with_capacity(config.capacity), | 334 | 19 | outgoing_requests: BTreeSet::new(), | 335 | 19 | ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher( | 336 | 19 | 4 * config.capacity, | 337 | 19 | Default::default(), | 338 | 19 | ), | 339 | 19 | ingoing_requests_by_connection: BTreeSet::new(), | 340 | 19 | outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 341 | 19 | 4 * config.capacity, | 342 | 19 | Default::default(), | 343 | 19 | ), | 344 | 19 | outgoing_notification_substreams_by_connection: BTreeSet::new(), | 345 | 19 | ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 346 | 19 | 4 * config.capacity, | 347 | 19 | Default::default(), | 348 | 19 | ), | 349 | 19 | ingoing_notification_substreams_by_connection: BTreeMap::new(), | 350 | 19 | ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher( | 351 | 19 | 4 * config.capacity, | 352 | 19 | Default::default(), | 353 | 19 | ), | 354 | 19 | ingoing_negotiated_substreams_by_connection: BTreeMap::new(), | 355 | 19 | randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed), | 356 | 19 | max_inbound_substreams: config.max_inbound_substreams, | 357 | 19 | max_protocol_name_len: config.max_protocol_name_len, | 358 | 19 | ping_protocol: config.ping_protocol.into(), | 359 | 19 | now_pin: PhantomData, | 360 | 19 | } | 361 | 19 | } |
|
362 | | |
363 | | /// Adds a new single-stream connection to the collection. |
364 | | /// |
365 | | /// Must be passed the moment (as a `TNow`) when the connection process has been started, in |
366 | | /// order to determine when the handshake timeout expires. |
367 | 0 | pub fn insert_single_stream( |
368 | 0 | &mut self, |
369 | 0 | when_connection_start: TNow, |
370 | 0 | handshake_kind: SingleStreamHandshakeKind, |
371 | 0 | substreams_capacity: usize, |
372 | 0 | user_data: TConn, |
373 | 0 | ) -> (ConnectionId, SingleStreamConnectionTask<TNow>) { |
374 | 0 | let connection_id = self.next_connection_id; |
375 | 0 | self.next_connection_id.0 += 1; |
376 | 0 |
|
377 | 0 | // We only support one kind of handshake at the moment. Make sure (at compile time) that |
378 | 0 | // the value provided as parameter is indeed the one expected. |
379 | 0 | let SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { |
380 | 0 | is_initiator, |
381 | 0 | noise_key, |
382 | 0 | } = handshake_kind; |
383 | 0 |
|
384 | 0 | let connection_task = SingleStreamConnectionTask::new(single_stream::Config { |
385 | 0 | randomness_seed: { |
386 | 0 | let mut seed = [0; 32]; |
387 | 0 | self.randomness_seeds.fill_bytes(&mut seed); |
388 | 0 | seed |
389 | 0 | }, |
390 | 0 | handshake: { |
391 | 0 | let mut ephemeral_secret_key = zeroize::Zeroizing::new([0; 32]); |
392 | 0 | self.randomness_seeds.fill_bytes(&mut *ephemeral_secret_key); |
393 | 0 | single_stream_handshake::HealthyHandshake::noise_yamux( |
394 | 0 | noise_key, |
395 | 0 | &ephemeral_secret_key, |
396 | 0 | is_initiator, |
397 | 0 | ) |
398 | 0 | }, |
399 | 0 | handshake_timeout: when_connection_start + self.handshake_timeout, |
400 | 0 | max_inbound_substreams: self.max_inbound_substreams, |
401 | 0 | substreams_capacity, |
402 | 0 | max_protocol_name_len: self.max_protocol_name_len, |
403 | 0 | ping_protocol: self.ping_protocol.clone(), |
404 | 0 | }); |
405 | 0 |
|
406 | 0 | let _previous_value = self.connections.insert( |
407 | 0 | connection_id, |
408 | 0 | Connection { |
409 | 0 | state: InnerConnectionState::Handshaking, |
410 | 0 | user_data, |
411 | 0 | }, |
412 | 0 | ); |
413 | 0 | debug_assert!(_previous_value.is_none()); |
414 | | |
415 | 0 | (connection_id, connection_task) |
416 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE20insert_single_streamB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE20insert_single_streamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE20insert_single_streamB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE20insert_single_streamCsiUjFBJteJ7x_17smoldot_full_node |
417 | | |
418 | | /// Adds a new multi-stream connection to the collection. |
419 | | /// |
420 | | /// Must be passed the moment (as a `TNow`) when the connection process has been started, in |
421 | | /// order to determine when the handshake timeout expires. |
422 | 0 | pub fn insert_multi_stream<TSubId>( |
423 | 0 | &mut self, |
424 | 0 | when_connection_start: TNow, |
425 | 0 | handshake_kind: MultiStreamHandshakeKind, |
426 | 0 | substreams_capacity: usize, |
427 | 0 | user_data: TConn, |
428 | 0 | ) -> (ConnectionId, MultiStreamConnectionTask<TNow, TSubId>) |
429 | 0 | where |
430 | 0 | TSubId: Clone + PartialEq + Eq + Hash, |
431 | 0 | { |
432 | 0 | let connection_id = self.next_connection_id; |
433 | 0 | self.next_connection_id.0 += 1; |
434 | | |
435 | | // In the WebRTC handshake, the Noise prologue must be set to `"libp2p-webrtc-noise:"` |
436 | | // followed with the multihash-encoded fingerprints of the initiator's certificate |
437 | | // and the receiver's certificate. |
438 | | // See <https://github.com/libp2p/specs/pull/412>. |
439 | 0 | let (noise_key, noise_prologue, local_is_noise_initiator) = { |
440 | | let MultiStreamHandshakeKind::WebRtc { |
441 | 0 | noise_key, |
442 | 0 | is_initiator, |
443 | 0 | local_tls_certificate_multihash, |
444 | 0 | remote_tls_certificate_multihash, |
445 | 0 | } = handshake_kind; |
446 | 0 | const PREFIX: &[u8] = b"libp2p-webrtc-noise:"; |
447 | 0 | let mut out = Vec::with_capacity( |
448 | 0 | PREFIX.len() |
449 | 0 | + local_tls_certificate_multihash.len() |
450 | 0 | + remote_tls_certificate_multihash.len(), |
451 | 0 | ); |
452 | 0 | out.extend_from_slice(PREFIX); |
453 | 0 | if is_initiator { |
454 | 0 | out.extend_from_slice(&local_tls_certificate_multihash); |
455 | 0 | out.extend_from_slice(&remote_tls_certificate_multihash); |
456 | 0 | } else { |
457 | 0 | out.extend_from_slice(&remote_tls_certificate_multihash); |
458 | 0 | out.extend_from_slice(&local_tls_certificate_multihash); |
459 | 0 | } |
460 | | |
461 | | // In the WebRTC libp2p protocol, the initiator of the connection is *not* the |
462 | | // initiator of the Noise handshake. Instead, it's the "server" that initiates the |
463 | | // Noise handshake. This saves a round-trip. |
464 | 0 | (noise_key, out, !is_initiator) |
465 | 0 | }; |
466 | 0 |
|
467 | 0 | let handshake = { |
468 | 0 | let mut noise_ephemeral_key = zeroize::Zeroizing::new([0; 32]); |
469 | 0 | self.randomness_seeds.fill_bytes(&mut *noise_ephemeral_key); |
470 | 0 | noise::HandshakeInProgress::new(noise::Config { |
471 | 0 | key: noise_key, |
472 | 0 | is_initiator: local_is_noise_initiator, |
473 | 0 | prologue: &noise_prologue, |
474 | 0 | ephemeral_secret_key: &noise_ephemeral_key, |
475 | 0 | }) |
476 | 0 | }; |
477 | 0 |
|
478 | 0 | let connection_task = MultiStreamConnectionTask::new( |
479 | 0 | { |
480 | 0 | let mut seed = [0; 32]; |
481 | 0 | self.randomness_seeds.fill_bytes(&mut seed); |
482 | 0 | seed |
483 | 0 | }, |
484 | 0 | when_connection_start, |
485 | 0 | handshake, |
486 | 0 | self.max_inbound_substreams, |
487 | 0 | substreams_capacity, |
488 | 0 | self.max_protocol_name_len, |
489 | 0 | self.ping_protocol.clone(), |
490 | 0 | ); |
491 | 0 |
|
492 | 0 | let _previous_value = self.connections.insert( |
493 | 0 | connection_id, |
494 | 0 | Connection { |
495 | 0 | state: InnerConnectionState::Handshaking, |
496 | 0 | user_data, |
497 | 0 | }, |
498 | 0 | ); |
499 | 0 | debug_assert!(_previous_value.is_none()); |
500 | | |
501 | 0 | (connection_id, connection_task) |
502 | 0 | } Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE19insert_multi_streampEBa_ Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE19insert_multi_streamjECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE19insert_multi_streampEBa_ |
503 | | |
504 | | /// Switches the connection to a state where it will shut down soon. |
505 | | /// |
506 | | /// Calling this function does **not** generate a [`Event::StartShutdown`] event for this |
507 | | /// connection. The event is implied. |
508 | | /// |
509 | | /// It is no longer possible to start requests, open notifications substreams, or send |
510 | | /// notifications. No new incoming requests or notification substreams will be reported. The |
511 | | /// incoming notifications that were sent by the remote before the shutdown started will still |
512 | | /// be reported. |
513 | | /// |
514 | | /// It is possible to call this method on connection that are still in their handshaking |
515 | | /// phase. |
516 | | /// |
517 | | /// This function generates a message destined to the connection. Use |
518 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
519 | | /// |
520 | | /// # Panic |
521 | | /// |
522 | | /// Panics if the connection is already shutting down, either because |
523 | | /// [`Network::start_shutdown`] was called or a [`Event::StartShutdown`] event was yielded. |
524 | | /// |
525 | | #[track_caller] |
526 | 0 | pub fn start_shutdown(&mut self, connection_id: ConnectionId) { |
527 | 0 | let connection = match self.connections.get_mut(&connection_id) { |
528 | 0 | Some(c) => c, |
529 | 0 | None => panic!(), |
530 | | }; |
531 | | |
532 | 0 | let is_established = match connection.state { |
533 | 0 | InnerConnectionState::Handshaking => false, |
534 | 0 | InnerConnectionState::Established => true, |
535 | | InnerConnectionState::ShuttingDown { |
536 | | api_initiated: true, |
537 | | .. |
538 | 0 | } => panic!("start_shutdown called twice on same connection"), // Forbidden. |
539 | | InnerConnectionState::ShuttingDown { |
540 | | api_initiated: false, |
541 | | .. |
542 | 0 | } => panic!("start_shutdown called after StartShutdown event"), // Forbidden. |
543 | | }; |
544 | | |
545 | 0 | connection.state = InnerConnectionState::ShuttingDown { |
546 | 0 | was_established: is_established, |
547 | 0 | api_initiated: true, |
548 | 0 | }; |
549 | 0 |
|
550 | 0 | self.messages_to_connections |
551 | 0 | .push_back((connection_id, CoordinatorToConnectionInner::StartShutdown)); |
552 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14start_shutdownB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14start_shutdownCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14start_shutdownB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14start_shutdownCsiUjFBJteJ7x_17smoldot_full_node |
553 | | |
554 | | /// Returns true if the collection doesn't contain any connection. |
555 | 0 | pub fn is_empty(&self) -> bool { |
556 | 0 | self.connections.is_empty() |
557 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE8is_emptyB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE8is_emptyB9_ |
558 | | |
559 | | /// Returns the number of connections in the collection. |
560 | 0 | pub fn len(&self) -> usize { |
561 | 0 | self.connections.len() |
562 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE3lenB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE3lenB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3lenCsiUjFBJteJ7x_17smoldot_full_node |
563 | | |
564 | | /// Returns the state of the given connection. |
565 | | /// |
566 | | /// # Panic |
567 | | /// |
568 | | /// Panics if the identifier is invalid or corresponds to a connection that has already |
569 | | /// entirely shut down. |
570 | | /// |
571 | 0 | pub fn connection_state(&self, connection_id: ConnectionId) -> ConnectionState { |
572 | 0 | let connection = self.connections.get(&connection_id).unwrap(); |
573 | 0 | match connection.state { |
574 | 0 | InnerConnectionState::Handshaking => ConnectionState { |
575 | 0 | established: false, |
576 | 0 | shutting_down: false, |
577 | 0 | }, |
578 | 0 | InnerConnectionState::Established => ConnectionState { |
579 | 0 | established: true, |
580 | 0 | shutting_down: false, |
581 | 0 | }, |
582 | | InnerConnectionState::ShuttingDown { |
583 | 0 | was_established, .. |
584 | 0 | } => ConnectionState { |
585 | 0 | established: was_established, |
586 | 0 | shutting_down: true, |
587 | 0 | }, |
588 | | } |
589 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE16connection_stateB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE16connection_stateCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE16connection_stateB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE16connection_stateCsiUjFBJteJ7x_17smoldot_full_node |
590 | | |
591 | | /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`]. |
592 | | /// |
593 | | /// The new value only applies to substreams opened after this function has been called. |
594 | 21 | pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) { |
595 | 21 | if self.max_protocol_name_len == new_max_length { |
596 | 21 | return; |
597 | 0 | } |
598 | 0 |
|
599 | 0 | self.max_protocol_name_len = new_max_length; |
600 | 0 |
|
601 | 0 | // Send a message to all connections to update this value. |
602 | 0 | self.messages_to_connections.reserve(self.connections.len()); |
603 | 0 | for connection_id in self.connections.keys() { |
604 | 0 | self.messages_to_connections.push_back(( |
605 | 0 | *connection_id, |
606 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, |
607 | 0 | )); |
608 | 0 | } |
609 | 21 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE25set_max_protocol_name_lenB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE25set_max_protocol_name_lenB9_ _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 594 | 2 | pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) { | 595 | 2 | if self.max_protocol_name_len == new_max_length { | 596 | 2 | return; | 597 | 0 | } | 598 | 0 |
| 599 | 0 | self.max_protocol_name_len = new_max_length; | 600 | 0 |
| 601 | 0 | // Send a message to all connections to update this value. | 602 | 0 | self.messages_to_connections.reserve(self.connections.len()); | 603 | 0 | for connection_id in self.connections.keys() { | 604 | 0 | self.messages_to_connections.push_back(( | 605 | 0 | *connection_id, | 606 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, | 607 | 0 | )); | 608 | 0 | } | 609 | 2 | } |
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCscDgN54JpMGG_6author _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 594 | 19 | pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) { | 595 | 19 | if self.max_protocol_name_len == new_max_length { | 596 | 19 | return; | 597 | 0 | } | 598 | 0 |
| 599 | 0 | self.max_protocol_name_len = new_max_length; | 600 | 0 |
| 601 | 0 | // Send a message to all connections to update this value. | 602 | 0 | self.messages_to_connections.reserve(self.connections.len()); | 603 | 0 | for connection_id in self.connections.keys() { | 604 | 0 | self.messages_to_connections.push_back(( | 605 | 0 | *connection_id, | 606 | 0 | CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length }, | 607 | 0 | )); | 608 | 0 | } | 609 | 19 | } |
|
610 | | |
611 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol |
612 | | /// name and indicate the type of the protocol. |
613 | | /// |
614 | | /// # Panic |
615 | | /// |
616 | | /// Panics if the substream is not in the correct state. |
617 | | /// |
618 | 0 | pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy) { |
619 | 0 | let (connection_id, inner_substream_id, already_accepted) = |
620 | 0 | match self.ingoing_negotiated_substreams.get_mut(&substream_id) { |
621 | 0 | Some(s) => s, |
622 | 0 | None => panic!(), |
623 | | }; |
624 | 0 | assert!(!*already_accepted); |
625 | | |
626 | 0 | self.messages_to_connections.push_back(( |
627 | 0 | *connection_id, |
628 | 0 | CoordinatorToConnectionInner::AcceptInbound { |
629 | 0 | substream_id: *inner_substream_id, |
630 | 0 | inbound_ty: ty, |
631 | 0 | }, |
632 | 0 | )); |
633 | 0 |
|
634 | 0 | *already_accepted = true; |
635 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14accept_inboundB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14accept_inboundB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node |
636 | | |
637 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the |
638 | | /// protocol name as not supported. |
639 | | /// |
640 | | /// # Panic |
641 | | /// |
642 | | /// Panics if the substream is not in the correct state. |
643 | | /// |
644 | 0 | pub fn reject_inbound(&mut self, substream_id: SubstreamId) { |
645 | 0 | let (connection_id, inner_substream_id, already_accepted) = |
646 | 0 | match self.ingoing_negotiated_substreams.remove(&substream_id) { |
647 | 0 | Some(s) => s, |
648 | 0 | None => panic!(), |
649 | | }; |
650 | 0 | let _was_in = self |
651 | 0 | .ingoing_negotiated_substreams_by_connection |
652 | 0 | .remove(&(connection_id, inner_substream_id)); |
653 | 0 | debug_assert!(_was_in.is_some()); |
654 | 0 | assert!(!already_accepted); |
655 | | |
656 | 0 | self.messages_to_connections.push_back(( |
657 | 0 | connection_id, |
658 | 0 | CoordinatorToConnectionInner::RejectInbound { |
659 | 0 | substream_id: inner_substream_id, |
660 | 0 | }, |
661 | 0 | )); |
662 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14reject_inboundB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14reject_inboundB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node |
663 | | |
664 | | /// Sends a request to the given peer. |
665 | | /// |
666 | | /// A [`Event::Response`] event will later be generated containing the result of the request. |
667 | | /// This event is generated even if the connection the request was sent on has been closed. |
668 | | /// |
669 | | /// It is invalid to start a request on a connection before a [`Event::HandshakeFinished`] |
670 | | /// or after a [`Event::StartShutdown`] has been generated, or after |
671 | | /// [`Network::start_shutdown`] has been called. |
672 | | /// |
673 | | /// Returns a newly-allocated identifier for this substream. |
674 | | /// |
675 | | /// This function generates a message destined to the connection. Use |
676 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
677 | | /// |
678 | | /// # Requests |
679 | | /// |
680 | | /// A request consists in: |
681 | | /// |
682 | | /// - Opening a substream on an established connection with the target. |
683 | | /// - Negotiating the requested protocol (`protocol_index`) on this substream using the |
684 | | /// *multistream-select* protocol. |
685 | | /// - Sending the request (`request_data` parameter), prefixed with its length. |
686 | | /// - Waiting for the response (prefixed with its length), which is then returned. |
687 | | /// |
688 | | /// An error happens if the connection closes while the request is in progress, if the remote |
689 | | /// doesn't support the given protocol, if the request or response doesn't respect the protocol |
690 | | /// limits, or if the remote takes too much time to answer. |
691 | | /// |
692 | | /// The timeout is the time between the moment the substream is opened and the moment the |
693 | | /// response is sent back. It starts ticking only after the request starts being sent. If the |
694 | | /// emitter doesn't send the request or if the receiver doesn't answer during this time |
695 | | /// window, the request is considered failed. |
696 | | /// |
697 | | /// # Panic |
698 | | /// |
699 | | /// Panics if the [`ConnectionId`] is invalid or is a connection that hasn't finished its |
700 | | /// handshake or is shutting down. |
701 | | /// |
702 | | #[track_caller] |
703 | 0 | pub fn start_request( |
704 | 0 | &mut self, |
705 | 0 | target: ConnectionId, |
706 | 0 | protocol_name: String, |
707 | 0 | request_data: Option<Vec<u8>>, |
708 | 0 | timeout: Duration, |
709 | 0 | max_response_size: usize, |
710 | 0 | ) -> SubstreamId { |
711 | 0 | let connection = match self.connections.get(&target) { |
712 | 0 | Some(c) => c, |
713 | 0 | None => panic!(), |
714 | | }; |
715 | 0 | assert!(matches!( |
716 | 0 | connection.state, |
717 | | InnerConnectionState::Established |
718 | | )); |
719 | | |
720 | 0 | let substream_id = self.next_substream_id; |
721 | 0 | self.next_substream_id.0 += 1; |
722 | 0 |
|
723 | 0 | let _was_inserted = self.outgoing_requests.insert((target, substream_id)); |
724 | 0 | debug_assert!(_was_inserted); |
725 | | |
726 | 0 | self.messages_to_connections.push_back(( |
727 | 0 | target, |
728 | 0 | CoordinatorToConnectionInner::StartRequest { |
729 | 0 | protocol_name, |
730 | 0 | request_data, |
731 | 0 | timeout, |
732 | 0 | max_response_size, |
733 | 0 | substream_id, |
734 | 0 | }, |
735 | 0 | )); |
736 | 0 |
|
737 | 0 | substream_id |
738 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE13start_requestB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE13start_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE13start_requestB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE13start_requestCsiUjFBJteJ7x_17smoldot_full_node |
739 | | |
740 | | /// Start opening a notifications substream. |
741 | | /// |
742 | | /// It is invalid to open a notifications substream on a connection before a |
743 | | /// [`Event::HandshakeFinished`] or after a [`Event::StartShutdown`] has been generated, or |
744 | | /// after [`Network::start_shutdown`] has been called. |
745 | | /// |
746 | | /// Returns a newly-allocated identifier for this substream. |
747 | | /// |
748 | | /// This function generates a message destined to the connection. Use |
749 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
750 | | /// |
751 | | /// # Panic |
752 | | /// |
753 | | /// Panics if the [`ConnectionId`] is invalid or is a connection that hasn't finished its |
754 | | /// handshake or is shutting down. |
755 | | /// |
756 | | #[track_caller] |
757 | 0 | pub fn open_out_notifications( |
758 | 0 | &mut self, |
759 | 0 | connection_id: ConnectionId, |
760 | 0 | protocol_name: String, |
761 | 0 | handshake_timeout: Duration, |
762 | 0 | handshake: impl Into<Vec<u8>>, |
763 | 0 | max_handshake_size: usize, |
764 | 0 | ) -> SubstreamId { |
765 | 0 | let connection = match self.connections.get(&connection_id) { |
766 | 0 | Some(c) => c, |
767 | 0 | None => panic!(), |
768 | | }; |
769 | 0 | assert!(matches!( |
770 | 0 | connection.state, |
771 | | InnerConnectionState::Established |
772 | | )); |
773 | | |
774 | 0 | let substream_id = self.next_substream_id; |
775 | 0 | self.next_substream_id.0 += 1; |
776 | 0 |
|
777 | 0 | let _prev_value = self |
778 | 0 | .outgoing_notification_substreams |
779 | 0 | .insert(substream_id, (connection_id, SubstreamState::Pending)); |
780 | 0 | debug_assert!(_prev_value.is_none()); |
781 | 0 | let _was_inserted = self |
782 | 0 | .outgoing_notification_substreams_by_connection |
783 | 0 | .insert((connection_id, substream_id)); |
784 | 0 | debug_assert!(_was_inserted); |
785 | | |
786 | 0 | self.messages_to_connections.push_back(( |
787 | 0 | connection_id, |
788 | 0 | CoordinatorToConnectionInner::OpenOutNotifications { |
789 | 0 | protocol_name, |
790 | 0 | handshake: handshake.into(), |
791 | 0 | handshake_timeout, |
792 | 0 | max_handshake_size, |
793 | 0 | substream_id, |
794 | 0 | }, |
795 | 0 | )); |
796 | 0 |
|
797 | 0 | substream_id |
798 | 0 | } Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE22open_out_notificationspEBa_ Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE22open_out_notificationsINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE22open_out_notificationspEBa_ Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE22open_out_notificationsINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node |
799 | | |
800 | | /// Start closing a previously-open notifications substream, or cancels opening a |
801 | | /// notifications substream. |
802 | | /// |
803 | | /// All the notifications that have been queued remain queued. The substream actually closes |
804 | | /// only once the queue is empty. |
805 | | /// |
806 | | /// Calling this method does *not* emit any event. The [`SubstreamId`] is considered invalid |
807 | | /// after this function returns. |
808 | | /// |
809 | | /// This function generates a message destined to the connection. Use |
810 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
811 | | /// |
812 | | /// # Panic |
813 | | /// |
814 | | /// Panics if [`SubstreamId`] doesn't correspond to an outbound notifications substream. |
815 | | /// |
816 | | #[track_caller] |
817 | 0 | pub fn close_out_notifications(&mut self, substream_id: SubstreamId) { |
818 | | // Both `Pending` and `Open` states are accepted. |
819 | 0 | let (connection_id, _state) = |
820 | 0 | match self.outgoing_notification_substreams.remove(&substream_id) { |
821 | 0 | Some(s) => s, |
822 | 0 | None => panic!(), |
823 | | }; |
824 | 0 | let _was_in = self |
825 | 0 | .outgoing_notification_substreams_by_connection |
826 | 0 | .remove(&(connection_id, substream_id)); |
827 | 0 | debug_assert!(_was_in); |
828 | | |
829 | 0 | self.messages_to_connections.push_back(( |
830 | 0 | connection_id, |
831 | 0 | CoordinatorToConnectionInner::CloseOutNotifications { substream_id }, |
832 | 0 | )); |
833 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23close_out_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23close_out_notificationsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23close_out_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23close_out_notificationsCsiUjFBJteJ7x_17smoldot_full_node |
834 | | |
835 | | /// Adds a notification to the queue of notifications to send to the given peer. |
836 | | /// |
837 | | /// It is invalid to call this on a [`SubstreamId`] before a successful |
838 | | /// [`Event::NotificationsOutResult`] has been yielded. |
839 | | /// |
840 | | /// Each substream maintains a queue of notifications to be sent to the remote. This method |
841 | | /// attempts to push a notification to this queue. |
842 | | /// |
843 | | /// An error is also returned if the queue exceeds a certain size in bytes, for two reasons: |
844 | | /// |
845 | | /// - Since the content of the queue is transferred at a limited rate, each notification |
846 | | /// pushed at the end of the queue will take more time than the previous one to reach the |
847 | | /// destination. Once the queue reaches a certain size, the time it would take for |
848 | | /// newly-pushed notifications to reach the destination would start being unreasonably large. |
849 | | /// |
850 | | /// - If the remote deliberately applies back-pressure on the substream, it is undesirable to |
851 | | /// increase the memory usage of the local node. |
852 | | /// |
853 | | /// Similarly, the queue being full is a normal situation and notification protocols should |
854 | | /// be designed in such a way that discarding notifications shouldn't have a too negative |
855 | | /// impact. |
856 | | /// |
857 | | /// Regardless of the success of this function, no guarantee exists about the successful |
858 | | /// delivery of notifications. |
859 | | /// |
860 | | /// This function generates a message destined to the connection. Use |
861 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
862 | | /// |
863 | | /// # Panics |
864 | | /// |
865 | | /// Panics if [`SubstreamId`] is not a fully open outbound notifications substream. |
866 | | /// |
867 | | #[track_caller] |
868 | 0 | pub fn queue_notification( |
869 | 0 | &mut self, |
870 | 0 | substream_id: SubstreamId, |
871 | 0 | notification: impl Into<Vec<u8>>, |
872 | 0 | ) -> Result<(), QueueNotificationError> { |
873 | 0 | let (connection_id, state) = match self.outgoing_notification_substreams.get(&substream_id) |
874 | | { |
875 | 0 | Some(s) => s, |
876 | 0 | None => panic!(), |
877 | | }; |
878 | 0 | assert!(matches!(state, SubstreamState::Open)); |
879 | | |
880 | | // TODO: add some back-pressure system and return a `QueueNotificationError` if full |
881 | | |
882 | 0 | self.messages_to_connections.push_back(( |
883 | 0 | *connection_id, |
884 | 0 | CoordinatorToConnectionInner::QueueNotification { |
885 | 0 | substream_id, |
886 | 0 | notification: notification.into(), |
887 | 0 | }, |
888 | 0 | )); |
889 | 0 |
|
890 | 0 | Ok(()) |
891 | 0 | } Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE18queue_notificationpEBa_ Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE18queue_notificationINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE18queue_notificationpEBa_ Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE18queue_notificationINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node |
892 | | |
893 | | /// Accepts a request for an inbound notifications substream reported by an |
894 | | /// [`Event::NotificationsInOpen`]. |
895 | | /// |
896 | | /// If a [`Event::NotificationsInClose`] event is yielded, then this function must not be |
897 | | /// called and will panic. |
898 | | /// |
899 | | /// This function generates a message destined to the connection. Use |
900 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
901 | | /// |
902 | | /// # Panic |
903 | | /// |
904 | | /// Panics if the [`SubstreamId`] doesn't correspond to an inbound notifications substream. |
905 | | /// |
906 | | #[track_caller] |
907 | 0 | pub fn accept_in_notifications( |
908 | 0 | &mut self, |
909 | 0 | substream_id: SubstreamId, |
910 | 0 | handshake: Vec<u8>, |
911 | 0 | max_notification_size: usize, |
912 | 0 | ) { |
913 | 0 | let (connection_id, state, inner_substream_id) = |
914 | 0 | match self.ingoing_notification_substreams.get_mut(&substream_id) { |
915 | 0 | Some(s) => s, |
916 | 0 | None => panic!(), |
917 | | }; |
918 | 0 | assert!(matches!(state, SubstreamState::Pending)); |
919 | | |
920 | 0 | self.messages_to_connections.push_back(( |
921 | 0 | *connection_id, |
922 | 0 | CoordinatorToConnectionInner::AcceptInNotifications { |
923 | 0 | substream_id: *inner_substream_id, |
924 | 0 | handshake, |
925 | 0 | max_notification_size, |
926 | 0 | }, |
927 | 0 | )); |
928 | 0 |
|
929 | 0 | *state = SubstreamState::Open; |
930 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23accept_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23accept_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23accept_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23accept_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node |
931 | | |
932 | | /// Rejects a request for an inbound notifications substream reported by an |
933 | | /// [`Event::NotificationsInOpen`]. |
934 | | /// |
935 | | /// If a [`Event::NotificationsInClose`] event is yielded, then this function must not be |
936 | | /// called and will panic. |
937 | | /// |
938 | | /// The [`SubstreamId`] is considered no longer valid after this function returns. |
939 | | /// |
940 | | /// This function generates a message destined to the connection. Use |
941 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
942 | | /// |
943 | | /// # Panic |
944 | | /// |
945 | | /// Panics if the [`SubstreamId`] doesn't correspond to an inbound notifications substream. |
946 | | /// |
947 | | #[track_caller] |
948 | 0 | pub fn reject_in_notifications(&mut self, substream_id: SubstreamId) { |
949 | 0 | if let Some((connection_id, SubstreamState::Pending, inner_substream_id)) = |
950 | 0 | self.ingoing_notification_substreams.remove(&substream_id) |
951 | | { |
952 | 0 | let _was_in = self |
953 | 0 | .ingoing_notification_substreams_by_connection |
954 | 0 | .remove(&(connection_id, inner_substream_id)); |
955 | 0 | debug_assert_eq!(_was_in, Some(substream_id)); |
956 | | |
957 | 0 | self.messages_to_connections.push_back(( |
958 | 0 | connection_id, |
959 | 0 | CoordinatorToConnectionInner::RejectInNotifications { |
960 | 0 | substream_id: inner_substream_id, |
961 | 0 | }, |
962 | 0 | )); |
963 | | } else { |
964 | | // Note that, if this is reached, the pending substream is not inserted back |
965 | | // in the state machine, meaning that `self` is now in an inconsistent state. |
966 | | // But considering that we panic, this state mismatch isn't actually observable. |
967 | 0 | panic!() |
968 | | } |
969 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23reject_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23reject_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23reject_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23reject_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node |
970 | | |
971 | | /// Start the closing of an inbound notifications substream that was previously accepted with |
972 | | /// [`Network::accept_in_notifications`] |
973 | | /// |
974 | | /// Calling this function will later generate a [`Event::NotificationsInClose`] event once the |
975 | | /// substream is effectively closed. |
976 | | /// This function gracefully asks the remote to close the substream. The remote has the |
977 | | /// duration indicated with `timeout` to effectively close the substream. In the meanwhile, |
978 | | /// notifications can still be received. |
979 | | /// |
980 | | /// This function generates a message destined to the connection. Use |
981 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
982 | | /// |
983 | | /// # Panic |
984 | | /// |
985 | | /// Panics if the [`SubstreamId`] doesn't correspond to an accepted inbound notifications |
986 | | /// substream. |
987 | | /// |
988 | | #[track_caller] |
989 | 0 | pub fn start_close_in_notifications(&mut self, substream_id: SubstreamId, timeout: Duration) { |
990 | 0 | let (connection_id, state, inner_substream_id) = |
991 | 0 | match self.ingoing_notification_substreams.get_mut(&substream_id) { |
992 | 0 | Some(s) => s, |
993 | 0 | None => panic!(), |
994 | | }; |
995 | 0 | assert!(matches!(state, SubstreamState::Open)); |
996 | | |
997 | 0 | self.messages_to_connections.push_back(( |
998 | 0 | *connection_id, |
999 | 0 | CoordinatorToConnectionInner::CloseInNotifications { |
1000 | 0 | substream_id: *inner_substream_id, |
1001 | 0 | timeout, |
1002 | 0 | }, |
1003 | 0 | )); |
1004 | 0 |
|
1005 | 0 | *state = SubstreamState::RequestedClosing; |
1006 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE28start_close_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE28start_close_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE28start_close_in_notificationsB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE28start_close_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node |
1007 | | |
1008 | | /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`]. |
1009 | | /// |
1010 | | /// If the substream was in the meanwhile yielded in an [`Event::RequestInCancel`], then this |
1011 | | /// function must not be called and will panic. |
1012 | | /// |
1013 | | /// The [`SubstreamId`] is considered no longer valid after this function returns. |
1014 | | /// |
1015 | | /// This function generates a message destined to the connection. Use |
1016 | | /// [`Network::pull_message_to_connection`] to process these messages after it has returned. |
1017 | | /// |
1018 | | /// # Panic |
1019 | | /// |
1020 | | /// Panics if the [`SubstreamId`] doesn't correspond to an active incoming request. |
1021 | | /// |
1022 | | #[track_caller] |
1023 | 0 | pub fn respond_in_request(&mut self, substream_id: SubstreamId, response: Result<Vec<u8>, ()>) { |
1024 | 0 | let (connection_id, inner_substream_id) = match self.ingoing_requests.remove(&substream_id) |
1025 | | { |
1026 | 0 | Some(s) => s, |
1027 | 0 | None => panic!(), |
1028 | | }; |
1029 | | |
1030 | 0 | self.ingoing_requests_by_connection |
1031 | 0 | .remove(&(connection_id, substream_id)); |
1032 | 0 |
|
1033 | 0 | self.messages_to_connections.push_back(( |
1034 | 0 | connection_id, |
1035 | 0 | CoordinatorToConnectionInner::AnswerRequest { |
1036 | 0 | substream_id: inner_substream_id, |
1037 | 0 | response, |
1038 | 0 | }, |
1039 | 0 | )); |
1040 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE18respond_in_requestB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE18respond_in_requestB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node |
1041 | | |
1042 | | /// Pulls a message that must be sent to a connection. |
1043 | | /// |
1044 | | /// The message must be passed to [`SingleStreamConnectionTask::inject_coordinator_message`] |
1045 | | /// or [`MultiStreamConnectionTask::inject_coordinator_message`] in the appropriate connection. |
1046 | | /// |
1047 | | /// This function guarantees that the [`ConnectionId`] always refers to a connection that |
1048 | | /// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`] |
1049 | | /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None` |
1050 | | /// and that no [`Event::Shutdown`] has been generated for this connection. |
1051 | 147 | pub fn pull_message_to_connection( |
1052 | 147 | &mut self, |
1053 | 147 | ) -> Option<(ConnectionId, CoordinatorToConnection)> { |
1054 | 147 | let message0 = self |
1055 | 147 | .messages_to_connections |
1056 | 147 | .pop_front() |
1057 | 147 | .map(|(id, inner)| (id, CoordinatorToConnection { inner })0 )?; Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE26pull_message_to_connection0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE26pull_message_to_connection0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE26pull_message_to_connection0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE26pull_message_to_connection0CsiUjFBJteJ7x_17smoldot_full_node |
1058 | | |
1059 | 0 | if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner { |
1060 | 0 | self.shutdown_finished_connections.push_back(message.0); |
1061 | 0 | } |
1062 | | |
1063 | 0 | Some(message) |
1064 | 147 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE26pull_message_to_connectionB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE26pull_message_to_connectionCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE26pull_message_to_connectionB9_ _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE26pull_message_to_connectionCsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 1051 | 147 | pub fn pull_message_to_connection( | 1052 | 147 | &mut self, | 1053 | 147 | ) -> Option<(ConnectionId, CoordinatorToConnection)> { | 1054 | 147 | let message0 = self | 1055 | 147 | .messages_to_connections | 1056 | 147 | .pop_front() | 1057 | 147 | .map(|(id, inner)| (id, CoordinatorToConnection { inner }))?; | 1058 | | | 1059 | 0 | if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner { | 1060 | 0 | self.shutdown_finished_connections.push_back(message.0); | 1061 | 0 | } | 1062 | | | 1063 | 0 | Some(message) | 1064 | 147 | } |
|
1065 | | |
1066 | | /// Injects into the state machine a message generated by |
1067 | | /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] or |
1068 | | /// [`MultiStreamConnectionTask::pull_message_to_coordinator`]. |
1069 | | /// |
1070 | | /// This message is queued and is later processed in [`Network::next_event`]. This means that |
1071 | | /// it is [`Network::next_event`] and not [`Network::inject_connection_message`] that updates |
1072 | | /// the internals of the state machine according to the content of the message. For example, |
1073 | | /// if a [`SingleStreamConnectionTask`] sends a message to the coordinator indicating that a |
1074 | | /// notifications substream has been closed, the coordinator will still believe that it is |
1075 | | /// open until [`Network::next_event`] processes this message and at the same time returns a |
1076 | | /// corresponding [`Event`]. Processing messages directly in |
1077 | | /// [`Network::inject_connection_message`] would introduce "race conditions" where the API |
1078 | | /// user can't be sure in which state a connection or a substream is. |
1079 | 0 | pub fn inject_connection_message( |
1080 | 0 | &mut self, |
1081 | 0 | connection_id: ConnectionId, |
1082 | 0 | message: ConnectionToCoordinator, |
1083 | 0 | ) { |
1084 | 0 | assert!(self.connections.contains_key(&connection_id)); |
1085 | | |
1086 | | // TODO: add a limit for a back-pressure-like system? |
1087 | 0 | self.pending_incoming_messages |
1088 | 0 | .push_back((connection_id, message.inner)); |
1089 | 0 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE25inject_connection_messageB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE25inject_connection_messageCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE25inject_connection_messageB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25inject_connection_messageCsiUjFBJteJ7x_17smoldot_full_node |
1090 | | |
1091 | | /// Returns the next event produced by the service. Returns `None` if no event is available. |
1092 | | /// |
1093 | | /// Call this function in a loop after having injected messages using |
1094 | | /// [`Network::inject_connection_message`]. |
1095 | 147 | pub fn next_event(&mut self) -> Option<Event<TConn>> { |
1096 | | loop { |
1097 | 147 | if let Some(connection_id0 ) = self.shutdown_finished_connections.pop_front() { |
1098 | 0 | let connection = self.connections.remove(&connection_id).unwrap(); |
1099 | 0 | let was_established = match &connection.state { |
1100 | | InnerConnectionState::ShuttingDown { |
1101 | 0 | was_established, .. |
1102 | 0 | } => *was_established, |
1103 | 0 | _ => unreachable!(), |
1104 | | }; |
1105 | | |
1106 | 0 | return Some(Event::Shutdown { |
1107 | 0 | id: connection_id, |
1108 | 0 | was_established, |
1109 | 0 | user_data: connection.user_data, |
1110 | 0 | }); |
1111 | 147 | } |
1112 | | |
1113 | | // When a connection starts its shutdown, its id is put in `shutting_down_connection`. |
1114 | | // When that happens, we go through the local state and clean up all requests and |
1115 | | // notification substreams that are in progress/open and return the cancellations |
1116 | | // as events. |
1117 | | // |
1118 | | // `shutting_down_connection` is set back to `None` only if it turns out that there |
1119 | | // is no request or notification substream in progress/open anymore. |
1120 | 147 | if let Some(shutting_down_connection0 ) = self.shutting_down_connection { |
1121 | | // Outgoing notification substreams to close. |
1122 | 0 | if let Some((_, substream_id)) = self |
1123 | 0 | .outgoing_notification_substreams_by_connection |
1124 | 0 | .range( |
1125 | 0 | (shutting_down_connection, SubstreamId::MIN) |
1126 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), |
1127 | 0 | ) |
1128 | 0 | .cloned() |
1129 | 0 | .next() |
1130 | | { |
1131 | 0 | self.outgoing_notification_substreams_by_connection |
1132 | 0 | .remove(&(shutting_down_connection, substream_id)); |
1133 | 0 | let (_, state) = self |
1134 | 0 | .outgoing_notification_substreams |
1135 | 0 | .remove(&substream_id) |
1136 | 0 | .unwrap(); |
1137 | 0 | return Some(match state { |
1138 | 0 | SubstreamState::Open => Event::NotificationsOutReset { substream_id }, |
1139 | 0 | SubstreamState::Pending => Event::NotificationsOutResult { |
1140 | 0 | substream_id, |
1141 | 0 | result: Err(NotificationsOutErr::ConnectionShutdown), |
1142 | 0 | }, |
1143 | 0 | SubstreamState::RequestedClosing => unreachable!(), // Never set for outgoing notification substreams. |
1144 | | }); |
1145 | 0 | } |
1146 | | |
1147 | | // Ingoing notification substreams to close. |
1148 | 0 | if let Some((key, substream_id)) = self |
1149 | 0 | .ingoing_notification_substreams_by_connection |
1150 | 0 | .range( |
1151 | 0 | (shutting_down_connection, established::SubstreamId::MIN) |
1152 | 0 | ..=(shutting_down_connection, established::SubstreamId::MAX), |
1153 | 0 | ) |
1154 | 0 | .map(|(k, v)| (*k, *v)) Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_event0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_event0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_event0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_event0CsiUjFBJteJ7x_17smoldot_full_node |
1155 | 0 | .next() |
1156 | | { |
1157 | 0 | let (_, state, _) = self |
1158 | 0 | .ingoing_notification_substreams |
1159 | 0 | .remove(&substream_id) |
1160 | 0 | .unwrap(); |
1161 | 0 | self.ingoing_notification_substreams_by_connection |
1162 | 0 | .remove(&key) |
1163 | 0 | .unwrap(); |
1164 | 0 |
|
1165 | 0 | return Some(match state { |
1166 | | SubstreamState::Open | SubstreamState::RequestedClosing => { |
1167 | 0 | Event::NotificationsInClose { |
1168 | 0 | substream_id, |
1169 | 0 | outcome: Err(NotificationsInClosedErr::ConnectionShutdown), |
1170 | 0 | } |
1171 | | } |
1172 | | SubstreamState::Pending => { |
1173 | 0 | Event::NotificationsInOpenCancel { substream_id } |
1174 | | } |
1175 | | }); |
1176 | 0 | } |
1177 | | |
1178 | | // Find outgoing requests to cancel. |
1179 | 0 | if let Some((_, substream_id)) = self |
1180 | 0 | .outgoing_requests |
1181 | 0 | .range( |
1182 | 0 | (shutting_down_connection, SubstreamId::MIN) |
1183 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), |
1184 | 0 | ) |
1185 | 0 | .next() |
1186 | | { |
1187 | 0 | let substream_id = *substream_id; |
1188 | 0 | self.outgoing_requests |
1189 | 0 | .remove(&(shutting_down_connection, substream_id)); |
1190 | 0 |
|
1191 | 0 | return Some(Event::Response { |
1192 | 0 | substream_id, |
1193 | 0 | response: Err(RequestError::ConnectionShutdown), |
1194 | 0 | }); |
1195 | 0 | } |
1196 | | |
1197 | | // Find ingoing requests to cancel. |
1198 | 0 | if let Some((_, substream_id)) = self |
1199 | 0 | .ingoing_requests_by_connection |
1200 | 0 | .range( |
1201 | 0 | (shutting_down_connection, SubstreamId::MIN) |
1202 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), |
1203 | 0 | ) |
1204 | 0 | .next() |
1205 | | { |
1206 | 0 | let substream_id = *substream_id; |
1207 | 0 |
|
1208 | 0 | let _was_in = self.ingoing_requests.remove(&substream_id); |
1209 | 0 | debug_assert!(_was_in.is_some()); |
1210 | 0 | let _was_in = self |
1211 | 0 | .ingoing_requests_by_connection |
1212 | 0 | .remove(&(shutting_down_connection, substream_id)); |
1213 | 0 | debug_assert!(_was_in); |
1214 | | |
1215 | 0 | return Some(Event::RequestInCancel { substream_id }); |
1216 | 0 | } |
1217 | | |
1218 | | // Find ingoing negotiated substreams to cancel. |
1219 | 0 | if let Some((key, substream_id)) = self |
1220 | 0 | .ingoing_negotiated_substreams_by_connection |
1221 | 0 | .range( |
1222 | 0 | (shutting_down_connection, established::SubstreamId::MIN) |
1223 | 0 | ..=(shutting_down_connection, established::SubstreamId::MAX), |
1224 | 0 | ) |
1225 | 0 | .map(|(k, v)| (*k, *v)) Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events_0CsiUjFBJteJ7x_17smoldot_full_node |
1226 | 0 | .next() |
1227 | | { |
1228 | 0 | let Some((_, _, was_accepted)) = |
1229 | 0 | self.ingoing_negotiated_substreams.remove(&substream_id) |
1230 | | else { |
1231 | 0 | unreachable!() |
1232 | | }; |
1233 | 0 | let _was_in = self |
1234 | 0 | .ingoing_negotiated_substreams_by_connection |
1235 | 0 | .remove(&key); |
1236 | 0 | debug_assert!(_was_in.is_some()); |
1237 | | |
1238 | 0 | if was_accepted { |
1239 | 0 | return Some(Event::InboundAcceptedCancel { substream_id }); |
1240 | | } else { |
1241 | 0 | return Some(Event::InboundNegotiatedCancel { substream_id }); |
1242 | | } |
1243 | 0 | } |
1244 | 0 |
|
1245 | 0 | // If this is reached, this connection has no more request or notifications |
1246 | 0 | // substream that is still in progress or open. The connection is successfully |
1247 | 0 | // shut down. |
1248 | 0 | self.shutting_down_connection = None; |
1249 | 147 | } |
1250 | | |
1251 | | // Now actually process messages. |
1252 | 147 | let (connection_id, message0 ) = self.pending_incoming_messages.pop_front()?; |
1253 | 0 | let connection = &mut self.connections.get_mut(&connection_id).unwrap(); |
1254 | 0 |
|
1255 | 0 | break Some(match message { |
1256 | 0 | ConnectionToCoordinatorInner::StartShutdown(reason) => { |
1257 | | // The `ConnectionToCoordinator` message contains a shutdown reason if |
1258 | | // and only if it sends `StartShutdown` as a response to a shutdown |
1259 | | // initiated by the remote. If the shutdown was initiated locally |
1260 | | // (`api_initiated` is `true`), then it can contain `None`, but it can also |
1261 | | // contain `Some` in case the shutdown was initiated by the remote at the same |
1262 | | // time as it was initiated locally. |
1263 | | |
1264 | 0 | let report_event = match &mut connection.state { |
1265 | | InnerConnectionState::ShuttingDown { |
1266 | | api_initiated: true, |
1267 | | .. |
1268 | 0 | } => false, |
1269 | | InnerConnectionState::ShuttingDown { |
1270 | | api_initiated: false, |
1271 | | .. |
1272 | 0 | } => unreachable!(), |
1273 | 0 | st @ InnerConnectionState::Handshaking => { |
1274 | 0 | *st = InnerConnectionState::ShuttingDown { |
1275 | 0 | api_initiated: false, |
1276 | 0 | was_established: false, |
1277 | 0 | }; |
1278 | 0 | true |
1279 | | } |
1280 | 0 | st @ InnerConnectionState::Established => { |
1281 | 0 | *st = InnerConnectionState::ShuttingDown { |
1282 | 0 | api_initiated: false, |
1283 | 0 | was_established: true, |
1284 | 0 | }; |
1285 | 0 | true |
1286 | | } |
1287 | | }; |
1288 | | |
1289 | | // Control flow can't reach here if `shutting_down_connection` is ̀`Some`. |
1290 | 0 | debug_assert!(self.shutting_down_connection.is_none()); |
1291 | 0 | self.shutting_down_connection = Some(connection_id); |
1292 | 0 |
|
1293 | 0 | if !report_event { |
1294 | | // No `StartShutdown` event is generated if the API user has started |
1295 | | // the shutdown themselves. In that case, `StartShutdown` is merely a |
1296 | | // confirmation. |
1297 | 0 | continue; |
1298 | | } else { |
1299 | 0 | Event::StartShutdown { |
1300 | 0 | id: connection_id, |
1301 | 0 | reason: reason.unwrap(), // See comment above. |
1302 | 0 | } |
1303 | | } |
1304 | | } |
1305 | | ConnectionToCoordinatorInner::ShutdownFinished => { |
1306 | 0 | self.messages_to_connections.push_back(( |
1307 | 0 | connection_id, |
1308 | 0 | CoordinatorToConnectionInner::ShutdownFinishedAck, |
1309 | 0 | )); |
1310 | 0 | continue; |
1311 | | } |
1312 | 0 | ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => { |
1313 | 0 | debug_assert_eq!( |
1314 | 0 | self.ingoing_notification_substreams_by_connection |
1315 | 0 | .range( |
1316 | 0 | (connection_id, established::SubstreamId::MIN) |
1317 | 0 | ..=(connection_id, established::SubstreamId::MAX) |
1318 | 0 | ) |
1319 | 0 | .count(), |
1320 | | 0 |
1321 | | ); |
1322 | 0 | debug_assert_eq!( |
1323 | 0 | self.outgoing_notification_substreams_by_connection |
1324 | 0 | .range( |
1325 | 0 | (connection_id, SubstreamId::MIN) |
1326 | 0 | ..=(connection_id, SubstreamId::MAX) |
1327 | 0 | ) |
1328 | 0 | .count(), |
1329 | | 0 |
1330 | | ); |
1331 | 0 | debug_assert_eq!( |
1332 | 0 | self.ingoing_requests_by_connection |
1333 | 0 | .range( |
1334 | 0 | (connection_id, SubstreamId::MIN) |
1335 | 0 | ..=(connection_id, SubstreamId::MAX) |
1336 | 0 | ) |
1337 | 0 | .count(), |
1338 | | 0 |
1339 | | ); |
1340 | | |
1341 | 0 | match &mut connection.state { |
1342 | | InnerConnectionState::ShuttingDown { |
1343 | 0 | was_established, |
1344 | 0 | api_initiated, |
1345 | 0 | } => { |
1346 | 0 | debug_assert!(!*was_established); |
1347 | 0 | debug_assert!(*api_initiated); |
1348 | 0 | continue; |
1349 | | } |
1350 | 0 | st @ InnerConnectionState::Handshaking => { |
1351 | 0 | *st = InnerConnectionState::Established |
1352 | | } |
1353 | 0 | InnerConnectionState::Established => unreachable!(), |
1354 | | } |
1355 | | |
1356 | 0 | Event::HandshakeFinished { |
1357 | 0 | id: connection_id, |
1358 | 0 | peer_id, |
1359 | 0 | } |
1360 | | } |
1361 | 0 | ConnectionToCoordinatorInner::InboundError(error) => { |
1362 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1363 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1364 | 0 | connection.state |
1365 | | { |
1366 | 0 | debug_assert!(api_initiated); |
1367 | 0 | continue; |
1368 | 0 | } |
1369 | 0 |
|
1370 | 0 | Event::InboundError { |
1371 | 0 | id: connection_id, |
1372 | 0 | error, |
1373 | 0 | } |
1374 | | } |
1375 | | ConnectionToCoordinatorInner::InboundNegotiated { |
1376 | 0 | id: connection_substream_id, |
1377 | 0 | protocol_name, |
1378 | | } => { |
1379 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1380 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1381 | 0 | connection.state |
1382 | | { |
1383 | 0 | debug_assert!(api_initiated); |
1384 | 0 | continue; |
1385 | 0 | } |
1386 | 0 |
|
1387 | 0 | let substream_id = self.next_substream_id; |
1388 | 0 | self.next_substream_id.0 += 1; |
1389 | 0 |
|
1390 | 0 | self.ingoing_negotiated_substreams.insert( |
1391 | 0 | substream_id, |
1392 | 0 | (connection_id, connection_substream_id, false), |
1393 | 0 | ); |
1394 | 0 | self.ingoing_negotiated_substreams_by_connection |
1395 | 0 | .insert((connection_id, connection_substream_id), substream_id); |
1396 | 0 |
|
1397 | 0 | Event::InboundNegotiated { |
1398 | 0 | id: connection_id, |
1399 | 0 | substream_id, |
1400 | 0 | protocol_name, |
1401 | 0 | } |
1402 | | } |
1403 | | ConnectionToCoordinatorInner::InboundAcceptedCancel { |
1404 | 0 | id: connection_substream_id, |
1405 | | } => { |
1406 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1407 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1408 | 0 | connection.state |
1409 | | { |
1410 | 0 | debug_assert!(api_initiated); |
1411 | 0 | continue; |
1412 | 0 | } |
1413 | 0 |
|
1414 | 0 | let substream_id = self |
1415 | 0 | .ingoing_negotiated_substreams_by_connection |
1416 | 0 | .remove(&(connection_id, connection_substream_id)) |
1417 | 0 | .unwrap_or_else(|| unreachable!()); Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events0_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events0_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events0_0CsiUjFBJteJ7x_17smoldot_full_node |
1418 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); |
1419 | 0 | debug_assert!(_was_in.is_some()); |
1420 | | |
1421 | 0 | Event::InboundAcceptedCancel { substream_id } |
1422 | | } |
1423 | | ConnectionToCoordinatorInner::RequestIn { |
1424 | 0 | id: connection_substream_id, |
1425 | 0 | request, |
1426 | | } => { |
1427 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1428 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1429 | 0 | connection.state |
1430 | | { |
1431 | 0 | debug_assert!(api_initiated); |
1432 | 0 | continue; |
1433 | 0 | } |
1434 | 0 |
|
1435 | 0 | let substream_id = self |
1436 | 0 | .ingoing_negotiated_substreams_by_connection |
1437 | 0 | .remove(&(connection_id, connection_substream_id)) |
1438 | 0 | .unwrap_or_else(|| unreachable!()); Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events1_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events1_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events1_0CsiUjFBJteJ7x_17smoldot_full_node |
1439 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); |
1440 | 0 | debug_assert!(_was_in.is_some()); |
1441 | | |
1442 | 0 | self.ingoing_requests |
1443 | 0 | .insert(substream_id, (connection_id, connection_substream_id)); |
1444 | 0 | self.ingoing_requests_by_connection |
1445 | 0 | .insert((connection_id, substream_id)); |
1446 | 0 |
|
1447 | 0 | Event::RequestIn { |
1448 | 0 | substream_id, |
1449 | 0 | request_payload: request, |
1450 | 0 | } |
1451 | | } |
1452 | | ConnectionToCoordinatorInner::Response { |
1453 | 0 | id: substream_id, |
1454 | 0 | response, |
1455 | | .. |
1456 | | } => { |
1457 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1458 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1459 | 0 | connection.state |
1460 | | { |
1461 | 0 | debug_assert!(api_initiated); |
1462 | 0 | continue; |
1463 | 0 | } |
1464 | 0 |
|
1465 | 0 | let _was_in = self |
1466 | 0 | .outgoing_requests |
1467 | 0 | .remove(&(connection_id, substream_id)); |
1468 | 0 | debug_assert!(_was_in); |
1469 | | |
1470 | 0 | Event::Response { |
1471 | 0 | substream_id, |
1472 | 0 | response: response.map_err(RequestError::Substream), |
1473 | 0 | } |
1474 | | } |
1475 | | ConnectionToCoordinatorInner::NotificationsInOpen { |
1476 | 0 | id: inner_substream_id, |
1477 | 0 | handshake, |
1478 | | } => { |
1479 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1480 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1481 | 0 | connection.state |
1482 | | { |
1483 | 0 | debug_assert!(api_initiated); |
1484 | 0 | continue; |
1485 | 0 | } |
1486 | 0 |
|
1487 | 0 | let substream_id = self |
1488 | 0 | .ingoing_negotiated_substreams_by_connection |
1489 | 0 | .remove(&(connection_id, inner_substream_id)) |
1490 | 0 | .unwrap_or_else(|| unreachable!()); Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events2_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events2_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events2_0Bb_ Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events2_0CsiUjFBJteJ7x_17smoldot_full_node |
1491 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); |
1492 | 0 | debug_assert!(_was_in.is_some()); |
1493 | | |
1494 | 0 | self.ingoing_notification_substreams.insert( |
1495 | 0 | substream_id, |
1496 | 0 | (connection_id, SubstreamState::Pending, inner_substream_id), |
1497 | 0 | ); |
1498 | 0 | self.ingoing_notification_substreams_by_connection |
1499 | 0 | .insert((connection_id, inner_substream_id), substream_id); |
1500 | 0 |
|
1501 | 0 | Event::NotificationsInOpen { |
1502 | 0 | substream_id, |
1503 | 0 | remote_handshake: handshake, |
1504 | 0 | } |
1505 | | } |
1506 | | ConnectionToCoordinatorInner::NotificationsInOpenCancel { |
1507 | 0 | id: inner_substream_id, |
1508 | | .. |
1509 | | } => { |
1510 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1511 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1512 | 0 | connection.state |
1513 | | { |
1514 | 0 | debug_assert!(api_initiated); |
1515 | 0 | continue; |
1516 | 0 | } |
1517 | | |
1518 | | // The event might concern a substream that we have already accepted or |
1519 | | // refused. In that situation, either reinterpret the event as |
1520 | | // "NotificationsInClose" or discard it. |
1521 | 0 | if let Some(substream_id) = self |
1522 | 0 | .ingoing_notification_substreams_by_connection |
1523 | 0 | .remove(&(connection_id, inner_substream_id)) |
1524 | | { |
1525 | 0 | let (_, state, _) = self |
1526 | 0 | .ingoing_notification_substreams |
1527 | 0 | .remove(&substream_id) |
1528 | 0 | .unwrap(); |
1529 | 0 | match state { |
1530 | | SubstreamState::Open | SubstreamState::RequestedClosing => { |
1531 | 0 | Event::NotificationsInClose { |
1532 | 0 | substream_id, |
1533 | 0 | outcome: Err(NotificationsInClosedErr::Substream( |
1534 | 0 | established::NotificationsInClosedErr::SubstreamReset, |
1535 | 0 | )), |
1536 | 0 | } |
1537 | | } |
1538 | | SubstreamState::Pending => { |
1539 | 0 | Event::NotificationsInOpenCancel { substream_id } |
1540 | | } |
1541 | | } |
1542 | | } else { |
1543 | | // Substream was refused. As documented, we must confirm the reception of |
1544 | | // the event by sending back a rejection. |
1545 | 0 | self.messages_to_connections.push_back(( |
1546 | 0 | connection_id, |
1547 | 0 | CoordinatorToConnectionInner::RejectInNotifications { |
1548 | 0 | substream_id: inner_substream_id, |
1549 | 0 | }, |
1550 | 0 | )); |
1551 | 0 | continue; |
1552 | | } |
1553 | | } |
1554 | | ConnectionToCoordinatorInner::NotificationIn { |
1555 | 0 | id: inner_substream_id, |
1556 | 0 | notification, |
1557 | | } => { |
1558 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1559 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1560 | 0 | connection.state |
1561 | | { |
1562 | 0 | debug_assert!(api_initiated); |
1563 | 0 | continue; |
1564 | 0 | } |
1565 | 0 |
|
1566 | 0 | let substream_id = *self |
1567 | 0 | .ingoing_notification_substreams_by_connection |
1568 | 0 | .get(&(connection_id, inner_substream_id)) |
1569 | 0 | .unwrap(); |
1570 | 0 |
|
1571 | 0 | Event::NotificationsIn { |
1572 | 0 | substream_id, |
1573 | 0 | notification, |
1574 | 0 | } |
1575 | | } |
1576 | | ConnectionToCoordinatorInner::NotificationsInClose { |
1577 | 0 | id: inner_substream_id, |
1578 | 0 | outcome, |
1579 | | .. |
1580 | | } => { |
1581 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1582 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1583 | 0 | connection.state |
1584 | | { |
1585 | 0 | debug_assert!(api_initiated); |
1586 | 0 | continue; |
1587 | 0 | } |
1588 | 0 |
|
1589 | 0 | let substream_id = self |
1590 | 0 | .ingoing_notification_substreams_by_connection |
1591 | 0 | .remove(&(connection_id, inner_substream_id)) |
1592 | 0 | .unwrap(); |
1593 | 0 | let (_, state, _) = self |
1594 | 0 | .ingoing_notification_substreams |
1595 | 0 | .remove(&substream_id) |
1596 | 0 | .unwrap(); |
1597 | 0 | debug_assert!(matches!( |
1598 | 0 | state, |
1599 | | SubstreamState::Open | SubstreamState::RequestedClosing |
1600 | | )); |
1601 | | |
1602 | 0 | if let SubstreamState::Open = state { |
1603 | 0 | // As documented, we must confirm the reception of the event by sending |
1604 | 0 | // back a rejection, provided that no such event has been sent beforehand. |
1605 | 0 | self.messages_to_connections.push_back(( |
1606 | 0 | connection_id, |
1607 | 0 | CoordinatorToConnectionInner::CloseInNotifications { |
1608 | 0 | substream_id: inner_substream_id, |
1609 | 0 | timeout: Duration::new(0, 0), |
1610 | 0 | }, |
1611 | 0 | )); |
1612 | 0 | } |
1613 | | |
1614 | 0 | Event::NotificationsInClose { |
1615 | 0 | substream_id, |
1616 | 0 | outcome: outcome.map_err(NotificationsInClosedErr::Substream), |
1617 | 0 | } |
1618 | | } |
1619 | | ConnectionToCoordinatorInner::NotificationsOutResult { |
1620 | 0 | id: substream_id, |
1621 | 0 | result, |
1622 | | } => { |
1623 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1624 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1625 | 0 | connection.state |
1626 | | { |
1627 | 0 | debug_assert!(api_initiated); |
1628 | 0 | continue; |
1629 | 0 | } |
1630 | | |
1631 | 0 | let mut entry = match self.outgoing_notification_substreams.entry(substream_id) |
1632 | | { |
1633 | 0 | hashbrown::hash_map::Entry::Occupied(e) => e, |
1634 | | hashbrown::hash_map::Entry::Vacant(_) => { |
1635 | | // This can be reached if the API user closed the substream while it |
1636 | | // was being open. |
1637 | 0 | continue; |
1638 | | } |
1639 | | }; |
1640 | | |
1641 | 0 | debug_assert!(matches!(entry.get_mut().1, SubstreamState::Pending)); |
1642 | | |
1643 | 0 | if result.is_ok() { |
1644 | 0 | entry.insert((connection_id, SubstreamState::Open)); |
1645 | 0 | } else { |
1646 | 0 | entry.remove(); |
1647 | 0 |
|
1648 | 0 | let _was_removed = self |
1649 | 0 | .outgoing_notification_substreams_by_connection |
1650 | 0 | .remove(&(connection_id, substream_id)); |
1651 | 0 | debug_assert!(_was_removed); |
1652 | | } |
1653 | | |
1654 | 0 | Event::NotificationsOutResult { |
1655 | 0 | substream_id, |
1656 | 0 | result, |
1657 | 0 | } |
1658 | | } |
1659 | | ConnectionToCoordinatorInner::NotificationsOutCloseDemanded { |
1660 | 0 | id: substream_id, |
1661 | | .. |
1662 | | } => { |
1663 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1664 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1665 | 0 | connection.state |
1666 | | { |
1667 | 0 | debug_assert!(api_initiated); |
1668 | 0 | continue; |
1669 | 0 | } |
1670 | 0 |
|
1671 | 0 | match self.outgoing_notification_substreams.get(&substream_id) { |
1672 | 0 | Some((_connection_id, _substream_state)) => { |
1673 | 0 | debug_assert_eq!(*_connection_id, connection_id); |
1674 | 0 | debug_assert!(matches!(_substream_state, SubstreamState::Open)); |
1675 | | } |
1676 | | None => { |
1677 | | // The substream might already have been destroyed if the user closed |
1678 | | // the substream while this message was pending in the queue. |
1679 | 0 | continue; |
1680 | | } |
1681 | | } |
1682 | | |
1683 | 0 | Event::NotificationsOutCloseDemanded { substream_id } |
1684 | | } |
1685 | 0 | ConnectionToCoordinatorInner::NotificationsOutReset { id: substream_id } => { |
1686 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1687 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1688 | 0 | connection.state |
1689 | | { |
1690 | 0 | debug_assert!(api_initiated); |
1691 | 0 | continue; |
1692 | 0 | } |
1693 | 0 |
|
1694 | 0 | match self.outgoing_notification_substreams.remove(&substream_id) { |
1695 | 0 | Some((_connection_id, _substream_state)) => { |
1696 | 0 | debug_assert_eq!(_connection_id, connection_id); |
1697 | 0 | debug_assert!(matches!(_substream_state, SubstreamState::Open)); |
1698 | | } |
1699 | | None => { |
1700 | | // The substream might already have been destroyed if the user closed |
1701 | | // the substream while this message was pending in the queue. |
1702 | 0 | continue; |
1703 | | } |
1704 | | } |
1705 | | |
1706 | 0 | let _was_removed = self |
1707 | 0 | .outgoing_notification_substreams_by_connection |
1708 | 0 | .remove(&(connection_id, substream_id)); |
1709 | 0 | debug_assert!(_was_removed); |
1710 | | |
1711 | 0 | Event::NotificationsOutReset { substream_id } |
1712 | | } |
1713 | 0 | ConnectionToCoordinatorInner::PingOutSuccess { ping_time } => { |
1714 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1715 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1716 | 0 | connection.state |
1717 | | { |
1718 | 0 | debug_assert!(api_initiated); |
1719 | 0 | continue; |
1720 | 0 | } |
1721 | 0 |
|
1722 | 0 | Event::PingOutSuccess { |
1723 | 0 | id: connection_id, |
1724 | 0 | ping_time, |
1725 | 0 | } |
1726 | | } |
1727 | | ConnectionToCoordinatorInner::PingOutFailed => { |
1728 | | // Ignore events if a shutdown has been initiated by the coordinator. |
1729 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = |
1730 | 0 | connection.state |
1731 | | { |
1732 | 0 | debug_assert!(api_initiated); |
1733 | 0 | continue; |
1734 | 0 | } |
1735 | 0 |
|
1736 | 0 | Event::PingOutFailed { id: connection_id } |
1737 | | } |
1738 | | }); |
1739 | | } |
1740 | 147 | } Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE10next_eventB9_ Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_eventCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE10next_eventB9_ _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_eventCsiUjFBJteJ7x_17smoldot_full_node Line | Count | Source | 1095 | 147 | pub fn next_event(&mut self) -> Option<Event<TConn>> { | 1096 | | loop { | 1097 | 147 | if let Some(connection_id0 ) = self.shutdown_finished_connections.pop_front() { | 1098 | 0 | let connection = self.connections.remove(&connection_id).unwrap(); | 1099 | 0 | let was_established = match &connection.state { | 1100 | | InnerConnectionState::ShuttingDown { | 1101 | 0 | was_established, .. | 1102 | 0 | } => *was_established, | 1103 | 0 | _ => unreachable!(), | 1104 | | }; | 1105 | | | 1106 | 0 | return Some(Event::Shutdown { | 1107 | 0 | id: connection_id, | 1108 | 0 | was_established, | 1109 | 0 | user_data: connection.user_data, | 1110 | 0 | }); | 1111 | 147 | } | 1112 | | | 1113 | | // When a connection starts its shutdown, its id is put in `shutting_down_connection`. | 1114 | | // When that happens, we go through the local state and clean up all requests and | 1115 | | // notification substreams that are in progress/open and return the cancellations | 1116 | | // as events. | 1117 | | // | 1118 | | // `shutting_down_connection` is set back to `None` only if it turns out that there | 1119 | | // is no request or notification substream in progress/open anymore. | 1120 | 147 | if let Some(shutting_down_connection0 ) = self.shutting_down_connection { | 1121 | | // Outgoing notification substreams to close. | 1122 | 0 | if let Some((_, substream_id)) = self | 1123 | 0 | .outgoing_notification_substreams_by_connection | 1124 | 0 | .range( | 1125 | 0 | (shutting_down_connection, SubstreamId::MIN) | 1126 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), | 1127 | 0 | ) | 1128 | 0 | .cloned() | 1129 | 0 | .next() | 1130 | | { | 1131 | 0 | self.outgoing_notification_substreams_by_connection | 1132 | 0 | .remove(&(shutting_down_connection, substream_id)); | 1133 | 0 | let (_, state) = self | 1134 | 0 | .outgoing_notification_substreams | 1135 | 0 | .remove(&substream_id) | 1136 | 0 | .unwrap(); | 1137 | 0 | return Some(match state { | 1138 | 0 | SubstreamState::Open => Event::NotificationsOutReset { substream_id }, | 1139 | 0 | SubstreamState::Pending => Event::NotificationsOutResult { | 1140 | 0 | substream_id, | 1141 | 0 | result: Err(NotificationsOutErr::ConnectionShutdown), | 1142 | 0 | }, | 1143 | 0 | SubstreamState::RequestedClosing => unreachable!(), // Never set for outgoing notification substreams. | 1144 | | }); | 1145 | 0 | } | 1146 | | | 1147 | | // Ingoing notification substreams to close. | 1148 | 0 | if let Some((key, substream_id)) = self | 1149 | 0 | .ingoing_notification_substreams_by_connection | 1150 | 0 | .range( | 1151 | 0 | (shutting_down_connection, established::SubstreamId::MIN) | 1152 | 0 | ..=(shutting_down_connection, established::SubstreamId::MAX), | 1153 | 0 | ) | 1154 | 0 | .map(|(k, v)| (*k, *v)) | 1155 | 0 | .next() | 1156 | | { | 1157 | 0 | let (_, state, _) = self | 1158 | 0 | .ingoing_notification_substreams | 1159 | 0 | .remove(&substream_id) | 1160 | 0 | .unwrap(); | 1161 | 0 | self.ingoing_notification_substreams_by_connection | 1162 | 0 | .remove(&key) | 1163 | 0 | .unwrap(); | 1164 | 0 |
| 1165 | 0 | return Some(match state { | 1166 | | SubstreamState::Open | SubstreamState::RequestedClosing => { | 1167 | 0 | Event::NotificationsInClose { | 1168 | 0 | substream_id, | 1169 | 0 | outcome: Err(NotificationsInClosedErr::ConnectionShutdown), | 1170 | 0 | } | 1171 | | } | 1172 | | SubstreamState::Pending => { | 1173 | 0 | Event::NotificationsInOpenCancel { substream_id } | 1174 | | } | 1175 | | }); | 1176 | 0 | } | 1177 | | | 1178 | | // Find outgoing requests to cancel. | 1179 | 0 | if let Some((_, substream_id)) = self | 1180 | 0 | .outgoing_requests | 1181 | 0 | .range( | 1182 | 0 | (shutting_down_connection, SubstreamId::MIN) | 1183 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), | 1184 | 0 | ) | 1185 | 0 | .next() | 1186 | | { | 1187 | 0 | let substream_id = *substream_id; | 1188 | 0 | self.outgoing_requests | 1189 | 0 | .remove(&(shutting_down_connection, substream_id)); | 1190 | 0 |
| 1191 | 0 | return Some(Event::Response { | 1192 | 0 | substream_id, | 1193 | 0 | response: Err(RequestError::ConnectionShutdown), | 1194 | 0 | }); | 1195 | 0 | } | 1196 | | | 1197 | | // Find ingoing requests to cancel. | 1198 | 0 | if let Some((_, substream_id)) = self | 1199 | 0 | .ingoing_requests_by_connection | 1200 | 0 | .range( | 1201 | 0 | (shutting_down_connection, SubstreamId::MIN) | 1202 | 0 | ..=(shutting_down_connection, SubstreamId::MAX), | 1203 | 0 | ) | 1204 | 0 | .next() | 1205 | | { | 1206 | 0 | let substream_id = *substream_id; | 1207 | 0 |
| 1208 | 0 | let _was_in = self.ingoing_requests.remove(&substream_id); | 1209 | 0 | debug_assert!(_was_in.is_some()); | 1210 | 0 | let _was_in = self | 1211 | 0 | .ingoing_requests_by_connection | 1212 | 0 | .remove(&(shutting_down_connection, substream_id)); | 1213 | 0 | debug_assert!(_was_in); | 1214 | | | 1215 | 0 | return Some(Event::RequestInCancel { substream_id }); | 1216 | 0 | } | 1217 | | | 1218 | | // Find ingoing negotiated substreams to cancel. | 1219 | 0 | if let Some((key, substream_id)) = self | 1220 | 0 | .ingoing_negotiated_substreams_by_connection | 1221 | 0 | .range( | 1222 | 0 | (shutting_down_connection, established::SubstreamId::MIN) | 1223 | 0 | ..=(shutting_down_connection, established::SubstreamId::MAX), | 1224 | 0 | ) | 1225 | 0 | .map(|(k, v)| (*k, *v)) | 1226 | 0 | .next() | 1227 | | { | 1228 | 0 | let Some((_, _, was_accepted)) = | 1229 | 0 | self.ingoing_negotiated_substreams.remove(&substream_id) | 1230 | | else { | 1231 | 0 | unreachable!() | 1232 | | }; | 1233 | 0 | let _was_in = self | 1234 | 0 | .ingoing_negotiated_substreams_by_connection | 1235 | 0 | .remove(&key); | 1236 | 0 | debug_assert!(_was_in.is_some()); | 1237 | | | 1238 | 0 | if was_accepted { | 1239 | 0 | return Some(Event::InboundAcceptedCancel { substream_id }); | 1240 | | } else { | 1241 | 0 | return Some(Event::InboundNegotiatedCancel { substream_id }); | 1242 | | } | 1243 | 0 | } | 1244 | 0 |
| 1245 | 0 | // If this is reached, this connection has no more request or notifications | 1246 | 0 | // substream that is still in progress or open. The connection is successfully | 1247 | 0 | // shut down. | 1248 | 0 | self.shutting_down_connection = None; | 1249 | 147 | } | 1250 | | | 1251 | | // Now actually process messages. | 1252 | 147 | let (connection_id, message0 ) = self.pending_incoming_messages.pop_front()?; | 1253 | 0 | let connection = &mut self.connections.get_mut(&connection_id).unwrap(); | 1254 | 0 |
| 1255 | 0 | break Some(match message { | 1256 | 0 | ConnectionToCoordinatorInner::StartShutdown(reason) => { | 1257 | | // The `ConnectionToCoordinator` message contains a shutdown reason if | 1258 | | // and only if it sends `StartShutdown` as a response to a shutdown | 1259 | | // initiated by the remote. If the shutdown was initiated locally | 1260 | | // (`api_initiated` is `true`), then it can contain `None`, but it can also | 1261 | | // contain `Some` in case the shutdown was initiated by the remote at the same | 1262 | | // time as it was initiated locally. | 1263 | | | 1264 | 0 | let report_event = match &mut connection.state { | 1265 | | InnerConnectionState::ShuttingDown { | 1266 | | api_initiated: true, | 1267 | | .. | 1268 | 0 | } => false, | 1269 | | InnerConnectionState::ShuttingDown { | 1270 | | api_initiated: false, | 1271 | | .. | 1272 | 0 | } => unreachable!(), | 1273 | 0 | st @ InnerConnectionState::Handshaking => { | 1274 | 0 | *st = InnerConnectionState::ShuttingDown { | 1275 | 0 | api_initiated: false, | 1276 | 0 | was_established: false, | 1277 | 0 | }; | 1278 | 0 | true | 1279 | | } | 1280 | 0 | st @ InnerConnectionState::Established => { | 1281 | 0 | *st = InnerConnectionState::ShuttingDown { | 1282 | 0 | api_initiated: false, | 1283 | 0 | was_established: true, | 1284 | 0 | }; | 1285 | 0 | true | 1286 | | } | 1287 | | }; | 1288 | | | 1289 | | // Control flow can't reach here if `shutting_down_connection` is ̀`Some`. | 1290 | 0 | debug_assert!(self.shutting_down_connection.is_none()); | 1291 | 0 | self.shutting_down_connection = Some(connection_id); | 1292 | 0 |
| 1293 | 0 | if !report_event { | 1294 | | // No `StartShutdown` event is generated if the API user has started | 1295 | | // the shutdown themselves. In that case, `StartShutdown` is merely a | 1296 | | // confirmation. | 1297 | 0 | continue; | 1298 | | } else { | 1299 | 0 | Event::StartShutdown { | 1300 | 0 | id: connection_id, | 1301 | 0 | reason: reason.unwrap(), // See comment above. | 1302 | 0 | } | 1303 | | } | 1304 | | } | 1305 | | ConnectionToCoordinatorInner::ShutdownFinished => { | 1306 | 0 | self.messages_to_connections.push_back(( | 1307 | 0 | connection_id, | 1308 | 0 | CoordinatorToConnectionInner::ShutdownFinishedAck, | 1309 | 0 | )); | 1310 | 0 | continue; | 1311 | | } | 1312 | 0 | ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => { | 1313 | 0 | debug_assert_eq!( | 1314 | 0 | self.ingoing_notification_substreams_by_connection | 1315 | 0 | .range( | 1316 | 0 | (connection_id, established::SubstreamId::MIN) | 1317 | 0 | ..=(connection_id, established::SubstreamId::MAX) | 1318 | 0 | ) | 1319 | 0 | .count(), | 1320 | | 0 | 1321 | | ); | 1322 | 0 | debug_assert_eq!( | 1323 | 0 | self.outgoing_notification_substreams_by_connection | 1324 | 0 | .range( | 1325 | 0 | (connection_id, SubstreamId::MIN) | 1326 | 0 | ..=(connection_id, SubstreamId::MAX) | 1327 | 0 | ) | 1328 | 0 | .count(), | 1329 | | 0 | 1330 | | ); | 1331 | 0 | debug_assert_eq!( | 1332 | 0 | self.ingoing_requests_by_connection | 1333 | 0 | .range( | 1334 | 0 | (connection_id, SubstreamId::MIN) | 1335 | 0 | ..=(connection_id, SubstreamId::MAX) | 1336 | 0 | ) | 1337 | 0 | .count(), | 1338 | | 0 | 1339 | | ); | 1340 | | | 1341 | 0 | match &mut connection.state { | 1342 | | InnerConnectionState::ShuttingDown { | 1343 | 0 | was_established, | 1344 | 0 | api_initiated, | 1345 | 0 | } => { | 1346 | 0 | debug_assert!(!*was_established); | 1347 | 0 | debug_assert!(*api_initiated); | 1348 | 0 | continue; | 1349 | | } | 1350 | 0 | st @ InnerConnectionState::Handshaking => { | 1351 | 0 | *st = InnerConnectionState::Established | 1352 | | } | 1353 | 0 | InnerConnectionState::Established => unreachable!(), | 1354 | | } | 1355 | | | 1356 | 0 | Event::HandshakeFinished { | 1357 | 0 | id: connection_id, | 1358 | 0 | peer_id, | 1359 | 0 | } | 1360 | | } | 1361 | 0 | ConnectionToCoordinatorInner::InboundError(error) => { | 1362 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1363 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1364 | 0 | connection.state | 1365 | | { | 1366 | 0 | debug_assert!(api_initiated); | 1367 | 0 | continue; | 1368 | 0 | } | 1369 | 0 |
| 1370 | 0 | Event::InboundError { | 1371 | 0 | id: connection_id, | 1372 | 0 | error, | 1373 | 0 | } | 1374 | | } | 1375 | | ConnectionToCoordinatorInner::InboundNegotiated { | 1376 | 0 | id: connection_substream_id, | 1377 | 0 | protocol_name, | 1378 | | } => { | 1379 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1380 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1381 | 0 | connection.state | 1382 | | { | 1383 | 0 | debug_assert!(api_initiated); | 1384 | 0 | continue; | 1385 | 0 | } | 1386 | 0 |
| 1387 | 0 | let substream_id = self.next_substream_id; | 1388 | 0 | self.next_substream_id.0 += 1; | 1389 | 0 |
| 1390 | 0 | self.ingoing_negotiated_substreams.insert( | 1391 | 0 | substream_id, | 1392 | 0 | (connection_id, connection_substream_id, false), | 1393 | 0 | ); | 1394 | 0 | self.ingoing_negotiated_substreams_by_connection | 1395 | 0 | .insert((connection_id, connection_substream_id), substream_id); | 1396 | 0 |
| 1397 | 0 | Event::InboundNegotiated { | 1398 | 0 | id: connection_id, | 1399 | 0 | substream_id, | 1400 | 0 | protocol_name, | 1401 | 0 | } | 1402 | | } | 1403 | | ConnectionToCoordinatorInner::InboundAcceptedCancel { | 1404 | 0 | id: connection_substream_id, | 1405 | | } => { | 1406 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1407 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1408 | 0 | connection.state | 1409 | | { | 1410 | 0 | debug_assert!(api_initiated); | 1411 | 0 | continue; | 1412 | 0 | } | 1413 | 0 |
| 1414 | 0 | let substream_id = self | 1415 | 0 | .ingoing_negotiated_substreams_by_connection | 1416 | 0 | .remove(&(connection_id, connection_substream_id)) | 1417 | 0 | .unwrap_or_else(|| unreachable!()); | 1418 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); | 1419 | 0 | debug_assert!(_was_in.is_some()); | 1420 | | | 1421 | 0 | Event::InboundAcceptedCancel { substream_id } | 1422 | | } | 1423 | | ConnectionToCoordinatorInner::RequestIn { | 1424 | 0 | id: connection_substream_id, | 1425 | 0 | request, | 1426 | | } => { | 1427 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1428 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1429 | 0 | connection.state | 1430 | | { | 1431 | 0 | debug_assert!(api_initiated); | 1432 | 0 | continue; | 1433 | 0 | } | 1434 | 0 |
| 1435 | 0 | let substream_id = self | 1436 | 0 | .ingoing_negotiated_substreams_by_connection | 1437 | 0 | .remove(&(connection_id, connection_substream_id)) | 1438 | 0 | .unwrap_or_else(|| unreachable!()); | 1439 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); | 1440 | 0 | debug_assert!(_was_in.is_some()); | 1441 | | | 1442 | 0 | self.ingoing_requests | 1443 | 0 | .insert(substream_id, (connection_id, connection_substream_id)); | 1444 | 0 | self.ingoing_requests_by_connection | 1445 | 0 | .insert((connection_id, substream_id)); | 1446 | 0 |
| 1447 | 0 | Event::RequestIn { | 1448 | 0 | substream_id, | 1449 | 0 | request_payload: request, | 1450 | 0 | } | 1451 | | } | 1452 | | ConnectionToCoordinatorInner::Response { | 1453 | 0 | id: substream_id, | 1454 | 0 | response, | 1455 | | .. | 1456 | | } => { | 1457 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1458 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1459 | 0 | connection.state | 1460 | | { | 1461 | 0 | debug_assert!(api_initiated); | 1462 | 0 | continue; | 1463 | 0 | } | 1464 | 0 |
| 1465 | 0 | let _was_in = self | 1466 | 0 | .outgoing_requests | 1467 | 0 | .remove(&(connection_id, substream_id)); | 1468 | 0 | debug_assert!(_was_in); | 1469 | | | 1470 | 0 | Event::Response { | 1471 | 0 | substream_id, | 1472 | 0 | response: response.map_err(RequestError::Substream), | 1473 | 0 | } | 1474 | | } | 1475 | | ConnectionToCoordinatorInner::NotificationsInOpen { | 1476 | 0 | id: inner_substream_id, | 1477 | 0 | handshake, | 1478 | | } => { | 1479 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1480 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1481 | 0 | connection.state | 1482 | | { | 1483 | 0 | debug_assert!(api_initiated); | 1484 | 0 | continue; | 1485 | 0 | } | 1486 | 0 |
| 1487 | 0 | let substream_id = self | 1488 | 0 | .ingoing_negotiated_substreams_by_connection | 1489 | 0 | .remove(&(connection_id, inner_substream_id)) | 1490 | 0 | .unwrap_or_else(|| unreachable!()); | 1491 | 0 | let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id); | 1492 | 0 | debug_assert!(_was_in.is_some()); | 1493 | | | 1494 | 0 | self.ingoing_notification_substreams.insert( | 1495 | 0 | substream_id, | 1496 | 0 | (connection_id, SubstreamState::Pending, inner_substream_id), | 1497 | 0 | ); | 1498 | 0 | self.ingoing_notification_substreams_by_connection | 1499 | 0 | .insert((connection_id, inner_substream_id), substream_id); | 1500 | 0 |
| 1501 | 0 | Event::NotificationsInOpen { | 1502 | 0 | substream_id, | 1503 | 0 | remote_handshake: handshake, | 1504 | 0 | } | 1505 | | } | 1506 | | ConnectionToCoordinatorInner::NotificationsInOpenCancel { | 1507 | 0 | id: inner_substream_id, | 1508 | | .. | 1509 | | } => { | 1510 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1511 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1512 | 0 | connection.state | 1513 | | { | 1514 | 0 | debug_assert!(api_initiated); | 1515 | 0 | continue; | 1516 | 0 | } | 1517 | | | 1518 | | // The event might concern a substream that we have already accepted or | 1519 | | // refused. In that situation, either reinterpret the event as | 1520 | | // "NotificationsInClose" or discard it. | 1521 | 0 | if let Some(substream_id) = self | 1522 | 0 | .ingoing_notification_substreams_by_connection | 1523 | 0 | .remove(&(connection_id, inner_substream_id)) | 1524 | | { | 1525 | 0 | let (_, state, _) = self | 1526 | 0 | .ingoing_notification_substreams | 1527 | 0 | .remove(&substream_id) | 1528 | 0 | .unwrap(); | 1529 | 0 | match state { | 1530 | | SubstreamState::Open | SubstreamState::RequestedClosing => { | 1531 | 0 | Event::NotificationsInClose { | 1532 | 0 | substream_id, | 1533 | 0 | outcome: Err(NotificationsInClosedErr::Substream( | 1534 | 0 | established::NotificationsInClosedErr::SubstreamReset, | 1535 | 0 | )), | 1536 | 0 | } | 1537 | | } | 1538 | | SubstreamState::Pending => { | 1539 | 0 | Event::NotificationsInOpenCancel { substream_id } | 1540 | | } | 1541 | | } | 1542 | | } else { | 1543 | | // Substream was refused. As documented, we must confirm the reception of | 1544 | | // the event by sending back a rejection. | 1545 | 0 | self.messages_to_connections.push_back(( | 1546 | 0 | connection_id, | 1547 | 0 | CoordinatorToConnectionInner::RejectInNotifications { | 1548 | 0 | substream_id: inner_substream_id, | 1549 | 0 | }, | 1550 | 0 | )); | 1551 | 0 | continue; | 1552 | | } | 1553 | | } | 1554 | | ConnectionToCoordinatorInner::NotificationIn { | 1555 | 0 | id: inner_substream_id, | 1556 | 0 | notification, | 1557 | | } => { | 1558 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1559 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1560 | 0 | connection.state | 1561 | | { | 1562 | 0 | debug_assert!(api_initiated); | 1563 | 0 | continue; | 1564 | 0 | } | 1565 | 0 |
| 1566 | 0 | let substream_id = *self | 1567 | 0 | .ingoing_notification_substreams_by_connection | 1568 | 0 | .get(&(connection_id, inner_substream_id)) | 1569 | 0 | .unwrap(); | 1570 | 0 |
| 1571 | 0 | Event::NotificationsIn { | 1572 | 0 | substream_id, | 1573 | 0 | notification, | 1574 | 0 | } | 1575 | | } | 1576 | | ConnectionToCoordinatorInner::NotificationsInClose { | 1577 | 0 | id: inner_substream_id, | 1578 | 0 | outcome, | 1579 | | .. | 1580 | | } => { | 1581 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1582 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1583 | 0 | connection.state | 1584 | | { | 1585 | 0 | debug_assert!(api_initiated); | 1586 | 0 | continue; | 1587 | 0 | } | 1588 | 0 |
| 1589 | 0 | let substream_id = self | 1590 | 0 | .ingoing_notification_substreams_by_connection | 1591 | 0 | .remove(&(connection_id, inner_substream_id)) | 1592 | 0 | .unwrap(); | 1593 | 0 | let (_, state, _) = self | 1594 | 0 | .ingoing_notification_substreams | 1595 | 0 | .remove(&substream_id) | 1596 | 0 | .unwrap(); | 1597 | 0 | debug_assert!(matches!( | 1598 | 0 | state, | 1599 | | SubstreamState::Open | SubstreamState::RequestedClosing | 1600 | | )); | 1601 | | | 1602 | 0 | if let SubstreamState::Open = state { | 1603 | 0 | // As documented, we must confirm the reception of the event by sending | 1604 | 0 | // back a rejection, provided that no such event has been sent beforehand. | 1605 | 0 | self.messages_to_connections.push_back(( | 1606 | 0 | connection_id, | 1607 | 0 | CoordinatorToConnectionInner::CloseInNotifications { | 1608 | 0 | substream_id: inner_substream_id, | 1609 | 0 | timeout: Duration::new(0, 0), | 1610 | 0 | }, | 1611 | 0 | )); | 1612 | 0 | } | 1613 | | | 1614 | 0 | Event::NotificationsInClose { | 1615 | 0 | substream_id, | 1616 | 0 | outcome: outcome.map_err(NotificationsInClosedErr::Substream), | 1617 | 0 | } | 1618 | | } | 1619 | | ConnectionToCoordinatorInner::NotificationsOutResult { | 1620 | 0 | id: substream_id, | 1621 | 0 | result, | 1622 | | } => { | 1623 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1624 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1625 | 0 | connection.state | 1626 | | { | 1627 | 0 | debug_assert!(api_initiated); | 1628 | 0 | continue; | 1629 | 0 | } | 1630 | | | 1631 | 0 | let mut entry = match self.outgoing_notification_substreams.entry(substream_id) | 1632 | | { | 1633 | 0 | hashbrown::hash_map::Entry::Occupied(e) => e, | 1634 | | hashbrown::hash_map::Entry::Vacant(_) => { | 1635 | | // This can be reached if the API user closed the substream while it | 1636 | | // was being open. | 1637 | 0 | continue; | 1638 | | } | 1639 | | }; | 1640 | | | 1641 | 0 | debug_assert!(matches!(entry.get_mut().1, SubstreamState::Pending)); | 1642 | | | 1643 | 0 | if result.is_ok() { | 1644 | 0 | entry.insert((connection_id, SubstreamState::Open)); | 1645 | 0 | } else { | 1646 | 0 | entry.remove(); | 1647 | 0 |
| 1648 | 0 | let _was_removed = self | 1649 | 0 | .outgoing_notification_substreams_by_connection | 1650 | 0 | .remove(&(connection_id, substream_id)); | 1651 | 0 | debug_assert!(_was_removed); | 1652 | | } | 1653 | | | 1654 | 0 | Event::NotificationsOutResult { | 1655 | 0 | substream_id, | 1656 | 0 | result, | 1657 | 0 | } | 1658 | | } | 1659 | | ConnectionToCoordinatorInner::NotificationsOutCloseDemanded { | 1660 | 0 | id: substream_id, | 1661 | | .. | 1662 | | } => { | 1663 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1664 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1665 | 0 | connection.state | 1666 | | { | 1667 | 0 | debug_assert!(api_initiated); | 1668 | 0 | continue; | 1669 | 0 | } | 1670 | 0 |
| 1671 | 0 | match self.outgoing_notification_substreams.get(&substream_id) { | 1672 | 0 | Some((_connection_id, _substream_state)) => { | 1673 | 0 | debug_assert_eq!(*_connection_id, connection_id); | 1674 | 0 | debug_assert!(matches!(_substream_state, SubstreamState::Open)); | 1675 | | } | 1676 | | None => { | 1677 | | // The substream might already have been destroyed if the user closed | 1678 | | // the substream while this message was pending in the queue. | 1679 | 0 | continue; | 1680 | | } | 1681 | | } | 1682 | | | 1683 | 0 | Event::NotificationsOutCloseDemanded { substream_id } | 1684 | | } | 1685 | 0 | ConnectionToCoordinatorInner::NotificationsOutReset { id: substream_id } => { | 1686 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1687 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1688 | 0 | connection.state | 1689 | | { | 1690 | 0 | debug_assert!(api_initiated); | 1691 | 0 | continue; | 1692 | 0 | } | 1693 | 0 |
| 1694 | 0 | match self.outgoing_notification_substreams.remove(&substream_id) { | 1695 | 0 | Some((_connection_id, _substream_state)) => { | 1696 | 0 | debug_assert_eq!(_connection_id, connection_id); | 1697 | 0 | debug_assert!(matches!(_substream_state, SubstreamState::Open)); | 1698 | | } | 1699 | | None => { | 1700 | | // The substream might already have been destroyed if the user closed | 1701 | | // the substream while this message was pending in the queue. | 1702 | 0 | continue; | 1703 | | } | 1704 | | } | 1705 | | | 1706 | 0 | let _was_removed = self | 1707 | 0 | .outgoing_notification_substreams_by_connection | 1708 | 0 | .remove(&(connection_id, substream_id)); | 1709 | 0 | debug_assert!(_was_removed); | 1710 | | | 1711 | 0 | Event::NotificationsOutReset { substream_id } | 1712 | | } | 1713 | 0 | ConnectionToCoordinatorInner::PingOutSuccess { ping_time } => { | 1714 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1715 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1716 | 0 | connection.state | 1717 | | { | 1718 | 0 | debug_assert!(api_initiated); | 1719 | 0 | continue; | 1720 | 0 | } | 1721 | 0 |
| 1722 | 0 | Event::PingOutSuccess { | 1723 | 0 | id: connection_id, | 1724 | 0 | ping_time, | 1725 | 0 | } | 1726 | | } | 1727 | | ConnectionToCoordinatorInner::PingOutFailed => { | 1728 | | // Ignore events if a shutdown has been initiated by the coordinator. | 1729 | 0 | if let InnerConnectionState::ShuttingDown { api_initiated, .. } = | 1730 | 0 | connection.state | 1731 | | { | 1732 | 0 | debug_assert!(api_initiated); | 1733 | 0 | continue; | 1734 | 0 | } | 1735 | 0 |
| 1736 | 0 | Event::PingOutFailed { id: connection_id } | 1737 | | } | 1738 | | }); | 1739 | | } | 1740 | 147 | } |
|
1741 | | } |
1742 | | |
1743 | | impl<TConn, TNow> ops::Index<ConnectionId> for Network<TConn, TNow> { |
1744 | | type Output = TConn; |
1745 | 0 | fn index(&self, id: ConnectionId) -> &TConn { |
1746 | 0 | &self.connections.get(&id).unwrap().user_data |
1747 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p10collections1_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexB9_ Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationEINtNtNtB2V_3ops5index5IndexNtB5_12ConnectionIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p10collections1_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexB9_ Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexCsiUjFBJteJ7x_17smoldot_full_node |
1748 | | } |
1749 | | |
1750 | | impl<TConn, TNow> ops::IndexMut<ConnectionId> for Network<TConn, TNow> { |
1751 | 0 | fn index_mut(&mut self, id: ConnectionId) -> &mut TConn { |
1752 | 0 | &mut self.connections.get_mut(&id).unwrap().user_data |
1753 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p10collections2_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutB9_ Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationEINtNtNtB2V_3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p10collections2_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutB9_ Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutCsiUjFBJteJ7x_17smoldot_full_node |
1754 | | } |
1755 | | |
1756 | | /// See [`Network::connection_state`]. |
1757 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
1758 | | pub struct ConnectionState { |
1759 | | /// If `true`, the connection has finished its handshaking phase. |
1760 | | pub established: bool, |
1761 | | |
1762 | | /// If `true`, the connection is shutting down. |
1763 | | pub shutting_down: bool, |
1764 | | } |
1765 | | |
1766 | | /// Message from a connection task destined to the coordinator. |
1767 | | pub struct ConnectionToCoordinator { |
1768 | | inner: ConnectionToCoordinatorInner, |
1769 | | } |
1770 | | |
1771 | | enum ConnectionToCoordinatorInner { |
1772 | | HandshakeFinished(PeerId), |
1773 | | |
1774 | | /// See the corresponding event in [`established::Event`]. |
1775 | | InboundError(established::InboundError), |
1776 | | |
1777 | | /// See the corresponding event in [`established::Event`]. |
1778 | | InboundNegotiated { |
1779 | | id: established::SubstreamId, |
1780 | | protocol_name: String, |
1781 | | }, |
1782 | | |
1783 | | /// See the corresponding event in [`established::Event`]. |
1784 | | InboundAcceptedCancel { |
1785 | | id: established::SubstreamId, |
1786 | | }, |
1787 | | |
1788 | | /// See the corresponding event in [`established::Event`]. |
1789 | | RequestIn { |
1790 | | id: established::SubstreamId, |
1791 | | request: Vec<u8>, |
1792 | | }, |
1793 | | |
1794 | | /// See the corresponding event in [`established::Event`]. |
1795 | | Response { |
1796 | | response: Result<Vec<u8>, established::RequestError>, |
1797 | | id: SubstreamId, |
1798 | | }, |
1799 | | |
1800 | | /// See the corresponding event in [`established::Event`]. |
1801 | | NotificationsInOpen { |
1802 | | id: established::SubstreamId, |
1803 | | handshake: Vec<u8>, |
1804 | | }, |
1805 | | /// See the corresponding event in [`established::Event`]. |
1806 | | /// |
1807 | | /// The coordinator should be aware that, due to the asynchronous nature of communications, it |
1808 | | /// might receive this event after having sent a |
1809 | | /// [`CoordinatorToConnectionInner::AcceptInNotifications`] or |
1810 | | /// [`CoordinatorToConnectionInner::RejectInNotifications`]. In that situation, the coordinator |
1811 | | /// should either reinterpret the message as a `NotificationsInClose` (if it had accepted it) |
1812 | | /// or ignore it (if it had rejected it). |
1813 | | /// |
1814 | | /// The connection should be aware that, due to the asynchronous nature of communications, it |
1815 | | /// might later receive an [`CoordinatorToConnectionInner::AcceptInNotifications`] or |
1816 | | /// [`CoordinatorToConnectionInner::RejectInNotifications`] concerning this substream. In that |
1817 | | /// situation, the connection should ignore this message. |
1818 | | /// |
1819 | | /// Because substream IDs can be reused, this introduces an ambiguity in the following sequence |
1820 | | /// of events: send `NotificationsInOpen`, send `NotificationsInOpenCancel`, send |
1821 | | /// `NotificationsInOpen`, receive `AcceptInNotifications`. Does the `AcceptInNotifications` |
1822 | | /// refer to the first `NotificationsInOpen` or to the second? |
1823 | | /// In order to solve this problem, the coordinator must always send back a |
1824 | | /// [`CoordinatorToConnectionInner::RejectInNotifications`] in order to acknowledge a |
1825 | | /// `NotificationsInOpenCancel`. |
1826 | | NotificationsInOpenCancel { |
1827 | | id: established::SubstreamId, |
1828 | | }, |
1829 | | /// See the corresponding event in [`established::Event`]. |
1830 | | NotificationIn { |
1831 | | id: established::SubstreamId, |
1832 | | notification: Vec<u8>, |
1833 | | }, |
1834 | | /// See the corresponding event in [`established::Event`]. |
1835 | | /// |
1836 | | /// In order to avoid race conditions, this must always be acknowledged by sending back a |
1837 | | /// [`CoordinatorToConnectionInner::CloseInNotifications`] message if no such message was |
1838 | | /// sent in the past. |
1839 | | NotificationsInClose { |
1840 | | id: established::SubstreamId, |
1841 | | outcome: Result<(), established::NotificationsInClosedErr>, |
1842 | | }, |
1843 | | /// See the corresponding event in [`established::Event`]. |
1844 | | NotificationsOutResult { |
1845 | | id: SubstreamId, |
1846 | | result: Result<Vec<u8>, NotificationsOutErr>, |
1847 | | }, |
1848 | | /// See the corresponding event in [`established::Event`]. |
1849 | | NotificationsOutCloseDemanded { |
1850 | | id: SubstreamId, |
1851 | | }, |
1852 | | /// See the corresponding event in [`established::Event`]. |
1853 | | NotificationsOutReset { |
1854 | | id: SubstreamId, |
1855 | | }, |
1856 | | /// See the corresponding event in [`established::Event`]. |
1857 | | PingOutSuccess { |
1858 | | ping_time: Duration, |
1859 | | }, |
1860 | | /// See the corresponding event in [`established::Event`]. |
1861 | | PingOutFailed, |
1862 | | |
1863 | | /// Sent either in response to [`ConnectionToCoordinatorInner::StartShutdown`] (in which case |
1864 | | /// the content is `None`) or if the remote has initiated the shutdown (in which case the |
1865 | | /// content is `Some`). After this, no more [`ConnectionToCoordinatorInner`] will be sent |
1866 | | /// anymore except for [`ConnectionToCoordinatorInner::ShutdownFinished`]. |
1867 | | StartShutdown(Option<ShutdownCause>), |
1868 | | |
1869 | | /// Shutdown has now finished. Always sent after |
1870 | | /// [`ConnectionToCoordinatorInner::StartShutdown`]. No message is sent by the connection |
1871 | | /// task anymore after that. |
1872 | | /// |
1873 | | /// Must be confirmed with a [`CoordinatorToConnectionInner::ShutdownFinishedAck`]. |
1874 | | ShutdownFinished, |
1875 | | } |
1876 | | |
1877 | | /// Message from the coordinator destined to a connection task. |
1878 | | pub struct CoordinatorToConnection { |
1879 | | inner: CoordinatorToConnectionInner, |
1880 | | } |
1881 | | |
1882 | | enum CoordinatorToConnectionInner { |
1883 | | /// Connection task must terminate. This is always sent back after a |
1884 | | /// [`ConnectionToCoordinatorInner::ShutdownFinished`]. |
1885 | | /// |
1886 | | /// This final message is necessary in order to make sure that the coordinator doesn't |
1887 | | /// generate messages destined to a connection that isn't alive anymore. |
1888 | | ShutdownFinishedAck, |
1889 | | |
1890 | | /// Connection must start shutting down if it is not already the case. |
1891 | | /// Before of concurrency, it is possible for this message to be sent/received *after* a |
1892 | | /// [`ConnectionToCoordinatorInner::StartShutdown`] has been sent. |
1893 | | StartShutdown, |
1894 | | |
1895 | | AcceptInbound { |
1896 | | substream_id: established::SubstreamId, |
1897 | | /// Configuration of the protocol. |
1898 | | inbound_ty: InboundTy, |
1899 | | }, |
1900 | | RejectInbound { |
1901 | | substream_id: established::SubstreamId, |
1902 | | }, |
1903 | | SetMaxProtocolNameLen { |
1904 | | new_max_length: usize, |
1905 | | }, |
1906 | | |
1907 | | StartRequest { |
1908 | | protocol_name: String, |
1909 | | request_data: Option<Vec<u8>>, |
1910 | | timeout: Duration, |
1911 | | max_response_size: usize, |
1912 | | /// Id of the substream assigned by the coordinator. |
1913 | | /// This is **not** the same as the actual substream used in the connection. |
1914 | | substream_id: SubstreamId, |
1915 | | }, |
1916 | | OpenOutNotifications { |
1917 | | /// Id of the substream assigned by the coordinator. |
1918 | | /// This is **not** the same as the actual substream used in the connection. |
1919 | | substream_id: SubstreamId, |
1920 | | protocol_name: String, |
1921 | | max_handshake_size: usize, |
1922 | | handshake_timeout: Duration, |
1923 | | handshake: Vec<u8>, |
1924 | | }, |
1925 | | CloseOutNotifications { |
1926 | | /// Id of the substream assigned by the coordinator. |
1927 | | /// This is **not** the same as the actual substream used in the connection. |
1928 | | substream_id: SubstreamId, |
1929 | | }, |
1930 | | QueueNotification { |
1931 | | /// Id of the substream assigned by the coordinator. |
1932 | | /// This is **not** the same as the actual substream used in the connection. |
1933 | | substream_id: SubstreamId, |
1934 | | notification: Vec<u8>, |
1935 | | }, |
1936 | | AcceptInNotifications { |
1937 | | substream_id: established::SubstreamId, |
1938 | | handshake: Vec<u8>, |
1939 | | max_notification_size: usize, |
1940 | | }, |
1941 | | RejectInNotifications { |
1942 | | substream_id: established::SubstreamId, |
1943 | | }, |
1944 | | CloseInNotifications { |
1945 | | substream_id: established::SubstreamId, |
1946 | | timeout: Duration, |
1947 | | }, |
1948 | | |
1949 | | /// Answer an incoming request. |
1950 | | /// |
1951 | | /// Since the API doesn't provide any feedback about whether responses have been successfully |
1952 | | /// received by the remote, the response should simply be ignored in case the substream is |
1953 | | /// obsolete. In any case, answering an obsolete request is not an API error because the remote |
1954 | | /// might have canceled their request while the message containing the response was waiting |
1955 | | /// in queue. |
1956 | | AnswerRequest { |
1957 | | substream_id: established::SubstreamId, |
1958 | | response: Result<Vec<u8>, ()>, |
1959 | | }, |
1960 | | } |
1961 | | |
1962 | | /// Event generated by [`Network::next_event`]. |
1963 | | #[derive(Debug)] |
1964 | | pub enum Event<TConn> { |
1965 | | /// Handshake of the given connection has completed. |
1966 | | /// |
1967 | | /// This event can only happen once per connection and only for single-stream connections. |
1968 | | HandshakeFinished { |
1969 | | /// Identifier of the connection whose handshake is finished. |
1970 | | id: ConnectionId, |
1971 | | /// Identity of the peer on the other side of the connection. |
1972 | | peer_id: PeerId, |
1973 | | }, |
1974 | | |
1975 | | /// A transport-level connection (e.g. a TCP socket) is starting its shutdown. |
1976 | | /// |
1977 | | /// It is no longer possible to start requests, open notification substreams, or open |
1978 | | /// notifications on this connection, and no new incoming requests or notification substreams |
1979 | | /// will be reported as events. |
1980 | | /// |
1981 | | /// Further events will close all existing substreams (requests and notifications) one by one. |
1982 | | /// Once all substreams have been closed, a [`Event::Shutdown`] is reported. |
1983 | | /// |
1984 | | /// Keep in mind that this event can also happen for connections that haven't finished their |
1985 | | /// handshake. |
1986 | | /// |
1987 | | /// This event is **not** generated when [`Network::start_shutdown`] is called. |
1988 | | StartShutdown { |
1989 | | /// Identifier of the connection that is starting its shutdown. |
1990 | | id: ConnectionId, |
1991 | | /// Reason why the connection is starting its shutdown. Because this event is not generated |
1992 | | /// when the shutdown is initiated locally, the reason is always cause by the remote. |
1993 | | reason: ShutdownCause, |
1994 | | }, |
1995 | | |
1996 | | /// A transport-level connection (e.g. a TCP socket) has been shut down. |
1997 | | /// |
1998 | | /// This [`ConnectionId`] is no longer valid, and using it will result in panics. |
1999 | | Shutdown { |
2000 | | /// Identifier of the connection that has finished its shutdown. |
2001 | | id: ConnectionId, |
2002 | | /// `true` if the connection was in its established phase before the shutdown. |
2003 | | was_established: bool, |
2004 | | /// User data that was stored in the state machine for this connection. |
2005 | | user_data: TConn, |
2006 | | }, |
2007 | | |
2008 | | /// Received an incoming substream, but this substream has produced an error. |
2009 | | /// |
2010 | | /// > **Note**: This event exists only for diagnostic purposes. No action is expected in |
2011 | | /// > return. |
2012 | | InboundError { |
2013 | | /// Identifier of the connection that has received the substream. |
2014 | | id: ConnectionId, |
2015 | | /// Error that happened. |
2016 | | error: InboundError, |
2017 | | }, |
2018 | | |
2019 | | /// An inbound substream has requested to use a protocol. Call [`Network::accept_inbound`] or |
2020 | | /// [`Network::reject_inbound`] in order to accept or reject this substream. |
2021 | | InboundNegotiated { |
2022 | | /// Identifier of the connection that has received the substream. |
2023 | | id: ConnectionId, |
2024 | | /// Identifier of the substream. Needs to be provided back when accepting or rejecting |
2025 | | /// the substream. |
2026 | | substream_id: SubstreamId, |
2027 | | /// Name of the protocol requested by the remote. |
2028 | | protocol_name: String, |
2029 | | }, |
2030 | | |
2031 | | /// An inbound substream that is waiting for a call to [`Network::accept_inbound`] or |
2032 | | /// [`Network::reject_inbound`] has been abruptly closed. |
2033 | | InboundNegotiatedCancel { |
2034 | | /// Identifier of the substream. |
2035 | | substream_id: SubstreamId, |
2036 | | }, |
2037 | | |
2038 | | /// An inbound substream that was previously accepted using [`Network::accept_inbound`] was |
2039 | | /// closed by the remote or has generated an error. |
2040 | | InboundAcceptedCancel { |
2041 | | /// Identifier of the substream. |
2042 | | substream_id: SubstreamId, |
2043 | | }, |
2044 | | |
2045 | | /// Outcome of a request started using [`Network::start_request`]. |
2046 | | /// |
2047 | | /// *All* requests always lead to an outcome, even if the connection has been closed while the |
2048 | | /// request was in progress. |
2049 | | Response { |
2050 | | /// Substream that was returned by [`Network::start_request`]. |
2051 | | substream_id: SubstreamId, |
2052 | | /// If the request is successful, contains the response sent back by the remote. Otherwise, |
2053 | | /// contains the reason why the request isn't successful. |
2054 | | response: Result<Vec<u8>, RequestError>, |
2055 | | }, |
2056 | | |
2057 | | /// Received a request from a request-response protocol. |
2058 | | RequestIn { |
2059 | | /// Substream on which the request has been received. Must be passed back when providing |
2060 | | /// the response. |
2061 | | substream_id: SubstreamId, |
2062 | | /// Payload that has been sent by the remote. Its interpretation is beyond the scope of |
2063 | | /// this module. |
2064 | | request_payload: Vec<u8>, |
2065 | | }, |
2066 | | |
2067 | | /// Request received earlier has been canceled by the remote. |
2068 | | /// |
2069 | | /// The [`SubstreamId`] is now invalid. |
2070 | | RequestInCancel { substream_id: SubstreamId }, |
2071 | | |
2072 | | /// Outcome of trying to open a substream with [`Network::open_out_notifications`]. |
2073 | | /// |
2074 | | /// If `Ok`, it is now possible to send notifications on this substream. |
2075 | | /// If `Err`, the substream no longer exists and the [`SubstreamId`] becomes invalid. |
2076 | | NotificationsOutResult { |
2077 | | substream_id: SubstreamId, |
2078 | | /// If `Ok`, contains the handshake sent back by the remote. Its interpretation is out of |
2079 | | /// scope of this module. |
2080 | | result: Result<Vec<u8>, NotificationsOutErr>, |
2081 | | }, |
2082 | | |
2083 | | /// Remote has closed an outgoing notifications substream, meaning that it demands the closing |
2084 | | /// of the substream. Use [`Network::close_out_notifications`] as soon as possible, which is |
2085 | | /// typically after all outbound notifications that need to be queued have been queued. |
2086 | | /// |
2087 | | /// This event is only generated for notification substreams that are fully open. |
2088 | | NotificationsOutCloseDemanded { substream_id: SubstreamId }, |
2089 | | |
2090 | | /// A previously open outbound substream has been closed, by the remote or as a consequence of |
2091 | | /// the connection shutting down. |
2092 | | /// |
2093 | | /// This event is only generated for notification substreams that are fully open. |
2094 | | /// |
2095 | | /// The substream no longer exists and the [`SubstreamId`] becomes invalid. |
2096 | | NotificationsOutReset { substream_id: SubstreamId }, |
2097 | | |
2098 | | /// The remote would like to open a notifications substream. |
2099 | | /// |
2100 | | /// The substream needs to be accepted or refused using [`Network::accept_in_notifications`] |
2101 | | /// or [`Network::reject_in_notifications`]. |
2102 | | NotificationsInOpen { |
2103 | | /// Newly-generated identifier for the substream on which the request has been received. |
2104 | | /// Must be passed back when accepting or refusing the substream. |
2105 | | substream_id: SubstreamId, |
2106 | | /// Handshake that has been sent by the remote. Its interpretation is beyond the scope of |
2107 | | /// this module. |
2108 | | remote_handshake: Vec<u8>, |
2109 | | }, |
2110 | | |
2111 | | /// The remote has canceled the opening an incoming notifications substream. |
2112 | | /// |
2113 | | /// This can only happen before the notification substream has been accepted or refused. |
2114 | | NotificationsInOpenCancel { |
2115 | | /// Substream that has been closed. Guaranteed to match a substream that was earlier |
2116 | | /// reported with a [`Event::NotificationsInOpen`]. |
2117 | | substream_id: SubstreamId, |
2118 | | }, |
2119 | | |
2120 | | /// Received a notification on a notifications substream of a connection. |
2121 | | NotificationsIn { |
2122 | | /// Substream on which the notification has been received. Guaranteed to be a substream |
2123 | | /// that has been accepted with [`Network::accept_in_notifications`]. |
2124 | | substream_id: SubstreamId, |
2125 | | /// Notification that the remote has sent. The meaning of this data is out of scope of |
2126 | | /// this module. |
2127 | | notification: Vec<u8>, |
2128 | | }, |
2129 | | |
2130 | | /// The remote has closed an incoming notifications substream. |
2131 | | /// |
2132 | | /// This can only happen after the notification substream has been accepted. |
2133 | | NotificationsInClose { |
2134 | | /// Substream that has been closed. Guaranteed to match a substream that was earlier |
2135 | | /// reported with a [`Event::NotificationsInOpen`]. |
2136 | | substream_id: SubstreamId, |
2137 | | /// Reason why the substream has been closed. |
2138 | | outcome: Result<(), NotificationsInClosedErr>, |
2139 | | }, |
2140 | | |
2141 | | /// An outgoing ping has succeeded. This event is generated automatically over time for each |
2142 | | /// connection in the collection. |
2143 | | PingOutSuccess { |
2144 | | id: ConnectionId, |
2145 | | /// Time between sending the ping and receiving the pong. |
2146 | | ping_time: Duration, |
2147 | | }, |
2148 | | /// An outgoing ping has failed. This event is generated automatically over time for each |
2149 | | /// connection in the collection. |
2150 | | PingOutFailed { id: ConnectionId }, |
2151 | | } |
2152 | | |
2153 | | /// Reason why a connection is shutting down. See [`Event::StartShutdown`]. |
2154 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsD_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_13ShutdownCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsD_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_13ShutdownCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2155 | | pub enum ShutdownCause { |
2156 | | /// Shutdown was demanded by the remote and performed cleanly. |
2157 | | CleanShutdown, |
2158 | | /// Remote has abruptly reset the connection. |
2159 | | RemoteReset, |
2160 | | /// Error in the connection protocol of a fully established connection. |
2161 | | ProtocolError(established::Error), |
2162 | | /// Error in the protocol of the handshake. |
2163 | | HandshakeError(HandshakeError), |
2164 | | /// Handshake phase took too long. |
2165 | | HandshakeTimeout, |
2166 | | } |
2167 | | |
2168 | 0 | #[derive(Debug, derive_more::Display, Clone)] Unexecuted instantiation: _RNvXsF_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsF_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2169 | | pub enum RequestError { |
2170 | | /// Request has been canceled because the connection as a whole is being shut down. |
2171 | | ConnectionShutdown, |
2172 | | |
2173 | | /// Error happened in the context of the substream. |
2174 | | #[display(fmt = "{_0}")] |
2175 | | Substream(established::RequestError), |
2176 | | } |
2177 | | |
2178 | | impl RequestError { |
2179 | | /// Returns `true` if the error is caused by a faulty behavior by the remote. Returns `false` |
2180 | | /// if the error can happen in normal situations. |
2181 | 0 | pub fn is_protocol_error(&self) -> bool { |
2182 | 0 | match self { |
2183 | 0 | RequestError::ConnectionShutdown => false, |
2184 | 0 | RequestError::Substream(err) => err.is_protocol_error(), |
2185 | | } |
2186 | 0 | } Unexecuted instantiation: _RNvMs3_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_12RequestError17is_protocol_error Unexecuted instantiation: _RNvMs3_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_12RequestError17is_protocol_error |
2187 | | } |
2188 | | |
2189 | 0 | #[derive(Debug, derive_more::Display, Clone)] Unexecuted instantiation: _RNvXsI_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsI_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2190 | | pub enum NotificationsOutErr { |
2191 | | /// Opening has been interrupted because the connection as a whole is being shut down. |
2192 | | ConnectionShutdown, |
2193 | | |
2194 | | /// Error happened in the context of the substream. |
2195 | | #[display(fmt = "{_0}")] |
2196 | | Substream(established::NotificationsOutErr), |
2197 | | } |
2198 | | |
2199 | 0 | #[derive(Debug, derive_more::Display, Clone)] Unexecuted instantiation: _RNvXsL_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsL_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2200 | | pub enum NotificationsInClosedErr { |
2201 | | /// Substream has been closed because the connection as a whole is being shut down. |
2202 | | ConnectionShutdown, |
2203 | | |
2204 | | /// Error happened in the context of the substream. |
2205 | | #[display(fmt = "{_0}")] |
2206 | | Substream(established::NotificationsInClosedErr), |
2207 | | } |
2208 | | |
2209 | | /// Error potentially returned by [`Network::queue_notification`]. |
2210 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsO_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_22QueueNotificationErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsO_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_22QueueNotificationErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
2211 | | pub enum QueueNotificationError { |
2212 | | /// Queue of notifications with that peer is full. |
2213 | | QueueFull, |
2214 | | } |