Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/collection.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Collection of libp2p connections.
19
//!
20
//! The [`Network`] struct in this module is a collection of libp2p connections. In the
21
//! documentation below, it is also called the *coordinator*.
22
//!
23
//! When a connection is inserted in the collection with [`Network::insert_single_stream`] or
24
//! [`Network::insert_multi_stream`], two objects are returned: an identifier for this new
25
//! connection assigned by the collection, and a [`SingleStreamConnectionTask`] or
26
//! [`MultiStreamConnectionTask`].
27
//!
28
//! All the [`SingleStreamConnectionTask`]s/[`MultiStreamConnectionTask`] created by the
29
//! [`Network`] communicate with that [`Network`] by passing messages. Passing the messages has
30
//! to be done explicitly by the API user. It is the responsibility of the API user to pull
31
//! messages from the coordinator (i.e. the [`Network`]) and push them onto the
32
//! [`SingleStreamConnectionTask`] or [`MultiStreamConnectionTask`] and vice-versa.
33
//!
34
//! # Usage
35
//!
36
//! - Whenever a new connection is established, use  [`Network::insert_single_stream`] or
37
//! [`Network::insert_multi_stream`] to allocate a connection in the collection.
38
//! - When a connection has received data or is ready to send more data, use
39
//! [`SingleStreamConnectionTask::read_write`], [`SingleStreamConnectionTask::reset`],
40
//! [`MultiStreamConnectionTask::substream_read_write`], [`MultiStreamConnectionTask::reset`],
41
//! [`MultiStreamConnectionTask::add_substream`], and/or
42
//! [`MultiStreamConnectionTask::desired_outbound_substreams`] to synchronize the state of the
43
//! task with the actual state of the connection.
44
//! - Pull messages from the [`SingleStreamConnectionTask`]s and [`MultiStreamConnectionTask`]s
45
//! and inject them into the [`Network`], and vice versa.
46
//! - In parallel, continuously call [`Network::next_event`] to process the events generated by
47
//! the calls to [`Network::inject_connection_message`].
48
//!
49
50
use crate::libp2p::connection::noise;
51
52
use super::connection::{established, single_stream_handshake};
53
use alloc::{
54
    collections::{BTreeMap, BTreeSet, VecDeque},
55
    string::String,
56
    sync::Arc,
57
    vec::Vec,
58
};
59
use core::{
60
    hash::Hash,
61
    marker::PhantomData,
62
    ops::{self, Add, Sub},
63
    time::Duration,
64
};
65
use rand_chacha::{
66
    rand_core::{RngCore as _, SeedableRng as _},
67
    ChaCha20Rng,
68
};
69
70
pub use super::peer_id::PeerId;
71
pub use super::read_write::ReadWrite;
72
pub use established::{InboundError, InboundTy, SubstreamFate};
73
pub use single_stream_handshake::HandshakeError;
74
75
pub use multi_stream::MultiStreamConnectionTask;
76
pub use single_stream::SingleStreamConnectionTask;
77
78
mod multi_stream;
79
mod single_stream;
80
81
/// What kind of handshake to perform on the newly-added connection.
82
pub enum SingleStreamHandshakeKind<'a> {
83
    /// Use the multistream-select protocol to negotiate the Noise encryption, then use the
84
    /// multistream-select protocol to negotiate the Yamux multiplexing.
85
    MultistreamSelectNoiseYamux {
86
        /// Must be `true` if the connection has been initiated locally, or `false` if it has been
87
        /// initiated by the remote.
88
        is_initiator: bool,
89
        /// Local secret key to use for the handshake.
90
        noise_key: &'a noise::NoiseKey,
91
    },
92
}
93
94
/// What kind of handshake to perform on the newly-added connection.
95
pub enum MultiStreamHandshakeKind<'a> {
96
    /// The connection is a WebRTC connection.
97
    ///
98
    /// See <https://github.com/libp2p/specs/pull/412> for details.
99
    ///
100
    /// The reading and writing side of substreams must never be closed. Substreams can only be
101
    /// abruptly destroyed by either side.
102
    WebRtc {
103
        /// Must be `true` if the connection has been initiated locally, or `false` if it has been
104
        /// initiated by the remote.
105
        is_initiator: bool,
106
        /// Local secret key to use for the handshake.
107
        noise_key: &'a noise::NoiseKey,
108
        /// Multihash encoding of the TLS certificate used by the local node at the DTLS layer.
109
        local_tls_certificate_multihash: Vec<u8>,
110
        /// Multihash encoding of the TLS certificate used by the remote node at the DTLS layer.
111
        remote_tls_certificate_multihash: Vec<u8>,
112
    },
113
}
114
115
/// Configuration for a [`Network`].
116
pub struct Config {
117
    /// Seed for the randomness within the networking state machine.
118
    pub randomness_seed: [u8; 32],
119
120
    /// Number of connections containers should initially allocate for.
121
    pub capacity: usize,
122
123
    /// Maximum number of substreams that each remote can have simultaneously opened.
124
    ///
125
    /// > **Note**: This limit is necessary in order to avoid DoS attacks where a remote opens too
126
    /// >           many substreams.
127
    pub max_inbound_substreams: usize,
128
129
    /// Maximum size in bytes of the protocols supported by the local node. Any protocol larger
130
    /// than that requested by the remote is automatically refused. Necessary in order to avoid
131
    /// situations where the remote sends an infinitely-sized protocol name.
132
    pub max_protocol_name_len: usize,
133
134
    /// Amount of time after which a connection handshake is considered to have taken too long
135
    /// and must be aborted.
136
    pub handshake_timeout: Duration,
137
138
    /// Name of the ping protocol on the network.
139
    pub ping_protocol: String,
140
}
141
142
/// Identifier of a connection spawned by the [`Network`].
143
//
144
// Identifiers are never reused.
145
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
146
pub struct ConnectionId(u64);
147
148
impl ConnectionId {
149
    /// Value that compares inferior or equal to any possible [`ConnectionId`̀].
150
    pub const MIN: Self = ConnectionId(u64::MIN);
151
    /// Value that compares superior or equal to any possible [`ConnectionId`̀].
152
    pub const MAX: Self = ConnectionId(u64::MAX);
153
}
154
155
/// Identifier of a request, or an inbound substream, or an outbound substream.
156
//
157
// Identifiers are never reused.
158
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)]
159
pub struct SubstreamId(u64);
160
161
impl SubstreamId {
162
    /// Value that compares inferior or equal to any possible [`SubstreamId`̀].
163
    pub const MIN: Self = SubstreamId(u64::MIN);
164
    /// Value that compares superior or equal to any possible [`SubstreamId`̀].
165
    pub const MAX: Self = SubstreamId(u64::MAX);
166
}
167
168
/// Data structure containing the list of all connections, pending or not, and their latest known
169
/// state. See also [the module-level documentation](..).
170
pub struct Network<TConn, TNow> {
171
    /// Messages waiting to be sent to connection tasks.
172
    messages_to_connections: VecDeque<(ConnectionId, CoordinatorToConnectionInner)>,
173
174
    /// Messages received from connection tasks. Processed in [`Network::next_event`].
175
    pending_incoming_messages: VecDeque<(ConnectionId, ConnectionToCoordinatorInner)>,
176
177
    /// Id to assign to the next connection.
178
    next_connection_id: ConnectionId,
179
180
    /// Id to assign to the next substream, such as the next request or next notifications
181
    /// substream.
182
    ///
183
    /// All substreams share the same pool of ids across all connections.
184
    next_substream_id: SubstreamId,
185
186
    /// List of all connections in the data structure.
187
    connections: hashbrown::HashMap<ConnectionId, Connection<TConn>, fnv::FnvBuildHasher>,
188
189
    /// If `Some`, the given connection is in the process of shutting down. Calling
190
    /// [`Network::next_event`] will cancel all ongoing requests and notification substreams
191
    /// that concern this connection before processing any other incoming message.
192
    shutting_down_connection: Option<ConnectionId>,
193
194
    /// List of connections for which a [`ConnectionToCoordinatorInner::ShutdownFinished`] has
195
    /// been received and a [`CoordinatorToConnectionInner::ShutdownFinishedAck`] has been sent.
196
    /// We can now remove these connections and generate a [`Event::Shutdown`].
197
    shutdown_finished_connections: VecDeque<ConnectionId>,
198
199
    /// List of all outgoing notification substreams that we have opened. Can be either pending
200
    /// (waiting for the connection task to say whether it has been accepted or not) or fully
201
    /// open.
202
    outgoing_notification_substreams:
203
        hashbrown::HashMap<SubstreamId, (ConnectionId, SubstreamState), fnv::FnvBuildHasher>,
204
205
    /// Always contains the same entries as [`Network::outgoing_notification_substreams`] but
206
    /// ordered differently.
207
    // TODO: group with the other similar BTreeSets?
208
    outgoing_notification_substreams_by_connection: BTreeSet<(ConnectionId, SubstreamId)>,
209
210
    /// List of all requests that have been started locally.
211
    outgoing_requests: BTreeSet<(ConnectionId, SubstreamId)>,
212
213
    /// List of inbound negotiated substreams that connections have received and haven't been
214
    /// answered by the API user yet.
215
    ingoing_negotiated_substreams: hashbrown::HashMap<
216
        SubstreamId,
217
        (ConnectionId, established::SubstreamId, bool),
218
        fnv::FnvBuildHasher,
219
    >,
220
221
    /// Always contains the same entries as [`Network::ingoing_negotiated_substreams`] but ordered
222
    /// differently.
223
    // TODO: group with the other similar BTreeSets?
224
    ingoing_negotiated_substreams_by_connection:
225
        BTreeMap<(ConnectionId, established::SubstreamId), SubstreamId>,
226
227
    /// List in incoming notification substreams that connections have received. Can be either
228
    /// pending (waiting to be accepted/refused) or fully opened.
229
    ///
230
    /// The substream ID of the substream is allocated by the connection task, and thus we need
231
    /// to keep a mapping of inner `<->` substream IDs.
232
    ingoing_notification_substreams: hashbrown::HashMap<
233
        SubstreamId,
234
        (ConnectionId, SubstreamState, established::SubstreamId),
235
        fnv::FnvBuildHasher,
236
    >,
237
238
    /// Always contains the same entries as [`Network::ingoing_notification_substreams`] but
239
    /// ordered differently.
240
    // TODO: group with the other similar BTreeSets?
241
    ingoing_notification_substreams_by_connection:
242
        BTreeMap<(ConnectionId, established::SubstreamId), SubstreamId>,
243
244
    /// List of requests that connections have received and haven't been answered by the API user
245
    /// yet.
246
    ingoing_requests: hashbrown::HashMap<
247
        SubstreamId,
248
        (ConnectionId, established::SubstreamId),
249
        fnv::FnvBuildHasher,
250
    >,
251
252
    /// Always contains the same entries as [`Network::ingoing_requests`] but ordered differently.
253
    // TODO: group with the other similar BTreeSets?
254
    ingoing_requests_by_connection: BTreeSet<(ConnectionId, SubstreamId)>,
255
256
    /// Generator for randomness seeds given to the established connections.
257
    randomness_seeds: ChaCha20Rng,
258
259
    /// See [`Config::max_inbound_substreams`].
260
    max_inbound_substreams: usize,
261
262
    /// See [`Config::max_protocol_name_len`].
263
    max_protocol_name_len: usize,
264
265
    /// See [`Config::handshake_timeout`].
266
    handshake_timeout: Duration,
267
268
    /// See [`Config::ping_protocol`].
269
    ping_protocol: Arc<str>,
270
271
    // Phantom data to keep the `TNow` type pinned.
272
    // TODO: considering removing
273
    now_pin: PhantomData<fn() -> TNow>,
274
}
275
276
struct Connection<TConn> {
277
    state: InnerConnectionState,
278
279
    user_data: TConn,
280
}
281
282
enum InnerConnectionState {
283
    /// The connection is still in its handshaking state.
284
    Handshaking,
285
    /// The connection is fully established.
286
    Established,
287
    /// The connection is in the process of shutting down.
288
    ShuttingDown {
289
        /// `true` if the state before the shutdown was [`InnerConnectionState::Established`].
290
        was_established: bool,
291
292
        /// `true` if [`Network::start_shutdown`] has been called on this connection.
293
        ///
294
        /// Even if the remote starts the shutdown at the same time, from an API perspective if
295
        /// this flag is `true` it will be considered as if the API user had initiated the
296
        /// shutdown.
297
        api_initiated: bool,
298
    },
299
}
300
301
/// See [`Network::outgoing_notification_substreams`] and
302
/// [`Network::ingoing_notification_substreams`].
303
///
304
/// > **Note**: There is no `Closed` variant, as this corresponds to a lack of entry in the map.
305
#[derive(Debug, Copy, Clone, PartialEq, Eq, Ord, PartialOrd, Hash)]
306
enum SubstreamState {
307
    /// Substream hasn't been accepted or refused yet.
308
    Pending,
309
    Open,
310
    /// Substream is in the process of being closed. Only relevant for inbound substreams.
311
    RequestedClosing,
312
}
313
314
impl<TConn, TNow> Network<TConn, TNow>
315
where
316
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
317
{
318
    /// Initializes a new network data structure.
319
21
    pub fn new(config: Config) -> Self {
320
21
        // The initial capacities given to the containers below are more or less arbitrary, the
321
21
        // objective being to avoid relocating the containers.
322
21
        Network {
323
21
            messages_to_connections: VecDeque::with_capacity(config.capacity * 2),
324
21
            pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2),
325
21
            next_substream_id: SubstreamId(0),
326
21
            handshake_timeout: config.handshake_timeout,
327
21
            next_connection_id: ConnectionId(0),
328
21
            connections: hashbrown::HashMap::with_capacity_and_hasher(
329
21
                config.capacity,
330
21
                Default::default(),
331
21
            ),
332
21
            shutting_down_connection: None,
333
21
            shutdown_finished_connections: VecDeque::with_capacity(config.capacity),
334
21
            outgoing_requests: BTreeSet::new(),
335
21
            ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher(
336
21
                4 * config.capacity,
337
21
                Default::default(),
338
21
            ),
339
21
            ingoing_requests_by_connection: BTreeSet::new(),
340
21
            outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
341
21
                4 * config.capacity,
342
21
                Default::default(),
343
21
            ),
344
21
            outgoing_notification_substreams_by_connection: BTreeSet::new(),
345
21
            ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
346
21
                4 * config.capacity,
347
21
                Default::default(),
348
21
            ),
349
21
            ingoing_notification_substreams_by_connection: BTreeMap::new(),
350
21
            ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher(
351
21
                4 * config.capacity,
352
21
                Default::default(),
353
21
            ),
354
21
            ingoing_negotiated_substreams_by_connection: BTreeMap::new(),
355
21
            randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed),
356
21
            max_inbound_substreams: config.max_inbound_substreams,
357
21
            max_protocol_name_len: config.max_protocol_name_len,
358
21
            ping_protocol: config.ping_protocol.into(),
359
21
            now_pin: PhantomData,
360
21
        }
361
21
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE3newB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE3newB9_
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
319
2
    pub fn new(config: Config) -> Self {
320
2
        // The initial capacities given to the containers below are more or less arbitrary, the
321
2
        // objective being to avoid relocating the containers.
322
2
        Network {
323
2
            messages_to_connections: VecDeque::with_capacity(config.capacity * 2),
324
2
            pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2),
325
2
            next_substream_id: SubstreamId(0),
326
2
            handshake_timeout: config.handshake_timeout,
327
2
            next_connection_id: ConnectionId(0),
328
2
            connections: hashbrown::HashMap::with_capacity_and_hasher(
329
2
                config.capacity,
330
2
                Default::default(),
331
2
            ),
332
2
            shutting_down_connection: None,
333
2
            shutdown_finished_connections: VecDeque::with_capacity(config.capacity),
334
2
            outgoing_requests: BTreeSet::new(),
335
2
            ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher(
336
2
                4 * config.capacity,
337
2
                Default::default(),
338
2
            ),
339
2
            ingoing_requests_by_connection: BTreeSet::new(),
340
2
            outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
341
2
                4 * config.capacity,
342
2
                Default::default(),
343
2
            ),
344
2
            outgoing_notification_substreams_by_connection: BTreeSet::new(),
345
2
            ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
346
2
                4 * config.capacity,
347
2
                Default::default(),
348
2
            ),
349
2
            ingoing_notification_substreams_by_connection: BTreeMap::new(),
350
2
            ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher(
351
2
                4 * config.capacity,
352
2
                Default::default(),
353
2
            ),
354
2
            ingoing_negotiated_substreams_by_connection: BTreeMap::new(),
355
2
            randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed),
356
2
            max_inbound_substreams: config.max_inbound_substreams,
357
2
            max_protocol_name_len: config.max_protocol_name_len,
358
2
            ping_protocol: config.ping_protocol.into(),
359
2
            now_pin: PhantomData,
360
2
        }
361
2
    }
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCscDgN54JpMGG_6author
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3newCsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
319
19
    pub fn new(config: Config) -> Self {
320
19
        // The initial capacities given to the containers below are more or less arbitrary, the
321
19
        // objective being to avoid relocating the containers.
322
19
        Network {
323
19
            messages_to_connections: VecDeque::with_capacity(config.capacity * 2),
324
19
            pending_incoming_messages: VecDeque::with_capacity(config.capacity * 2),
325
19
            next_substream_id: SubstreamId(0),
326
19
            handshake_timeout: config.handshake_timeout,
327
19
            next_connection_id: ConnectionId(0),
328
19
            connections: hashbrown::HashMap::with_capacity_and_hasher(
329
19
                config.capacity,
330
19
                Default::default(),
331
19
            ),
332
19
            shutting_down_connection: None,
333
19
            shutdown_finished_connections: VecDeque::with_capacity(config.capacity),
334
19
            outgoing_requests: BTreeSet::new(),
335
19
            ingoing_requests: hashbrown::HashMap::with_capacity_and_hasher(
336
19
                4 * config.capacity,
337
19
                Default::default(),
338
19
            ),
339
19
            ingoing_requests_by_connection: BTreeSet::new(),
340
19
            outgoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
341
19
                4 * config.capacity,
342
19
                Default::default(),
343
19
            ),
344
19
            outgoing_notification_substreams_by_connection: BTreeSet::new(),
345
19
            ingoing_notification_substreams: hashbrown::HashMap::with_capacity_and_hasher(
346
19
                4 * config.capacity,
347
19
                Default::default(),
348
19
            ),
349
19
            ingoing_notification_substreams_by_connection: BTreeMap::new(),
350
19
            ingoing_negotiated_substreams: hashbrown::HashMap::with_capacity_and_hasher(
351
19
                4 * config.capacity,
352
19
                Default::default(),
353
19
            ),
354
19
            ingoing_negotiated_substreams_by_connection: BTreeMap::new(),
355
19
            randomness_seeds: ChaCha20Rng::from_seed(config.randomness_seed),
356
19
            max_inbound_substreams: config.max_inbound_substreams,
357
19
            max_protocol_name_len: config.max_protocol_name_len,
358
19
            ping_protocol: config.ping_protocol.into(),
359
19
            now_pin: PhantomData,
360
19
        }
361
19
    }
362
363
    /// Adds a new single-stream connection to the collection.
364
    ///
365
    /// Must be passed the moment (as a `TNow`) when the connection process has been started, in
366
    /// order to determine when the handshake timeout expires.
367
0
    pub fn insert_single_stream(
368
0
        &mut self,
369
0
        when_connection_start: TNow,
370
0
        handshake_kind: SingleStreamHandshakeKind,
371
0
        substreams_capacity: usize,
372
0
        user_data: TConn,
373
0
    ) -> (ConnectionId, SingleStreamConnectionTask<TNow>) {
374
0
        let connection_id = self.next_connection_id;
375
0
        self.next_connection_id.0 += 1;
376
0
377
0
        // We only support one kind of handshake at the moment. Make sure (at compile time) that
378
0
        // the value provided as parameter is indeed the one expected.
379
0
        let SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
380
0
            is_initiator,
381
0
            noise_key,
382
0
        } = handshake_kind;
383
0
384
0
        let connection_task = SingleStreamConnectionTask::new(single_stream::Config {
385
0
            randomness_seed: {
386
0
                let mut seed = [0; 32];
387
0
                self.randomness_seeds.fill_bytes(&mut seed);
388
0
                seed
389
0
            },
390
0
            handshake: {
391
0
                let mut ephemeral_secret_key = zeroize::Zeroizing::new([0; 32]);
392
0
                self.randomness_seeds.fill_bytes(&mut *ephemeral_secret_key);
393
0
                single_stream_handshake::HealthyHandshake::noise_yamux(
394
0
                    noise_key,
395
0
                    &ephemeral_secret_key,
396
0
                    is_initiator,
397
0
                )
398
0
            },
399
0
            handshake_timeout: when_connection_start + self.handshake_timeout,
400
0
            max_inbound_substreams: self.max_inbound_substreams,
401
0
            substreams_capacity,
402
0
            max_protocol_name_len: self.max_protocol_name_len,
403
0
            ping_protocol: self.ping_protocol.clone(),
404
0
        });
405
0
406
0
        let _previous_value = self.connections.insert(
407
0
            connection_id,
408
0
            Connection {
409
0
                state: InnerConnectionState::Handshaking,
410
0
                user_data,
411
0
            },
412
0
        );
413
0
        debug_assert!(_previous_value.is_none());
414
415
0
        (connection_id, connection_task)
416
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE20insert_single_streamB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE20insert_single_streamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE20insert_single_streamB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE20insert_single_streamCsiUjFBJteJ7x_17smoldot_full_node
417
418
    /// Adds a new multi-stream connection to the collection.
419
    ///
420
    /// Must be passed the moment (as a `TNow`) when the connection process has been started, in
421
    /// order to determine when the handshake timeout expires.
422
0
    pub fn insert_multi_stream<TSubId>(
423
0
        &mut self,
424
0
        when_connection_start: TNow,
425
0
        handshake_kind: MultiStreamHandshakeKind,
426
0
        substreams_capacity: usize,
427
0
        user_data: TConn,
428
0
    ) -> (ConnectionId, MultiStreamConnectionTask<TNow, TSubId>)
429
0
    where
430
0
        TSubId: Clone + PartialEq + Eq + Hash,
431
0
    {
432
0
        let connection_id = self.next_connection_id;
433
0
        self.next_connection_id.0 += 1;
434
435
        // In the WebRTC handshake, the Noise prologue must be set to `"libp2p-webrtc-noise:"`
436
        // followed with the multihash-encoded fingerprints of the initiator's certificate
437
        // and the receiver's certificate.
438
        // See <https://github.com/libp2p/specs/pull/412>.
439
0
        let (noise_key, noise_prologue, local_is_noise_initiator) = {
440
            let MultiStreamHandshakeKind::WebRtc {
441
0
                noise_key,
442
0
                is_initiator,
443
0
                local_tls_certificate_multihash,
444
0
                remote_tls_certificate_multihash,
445
0
            } = handshake_kind;
446
0
            const PREFIX: &[u8] = b"libp2p-webrtc-noise:";
447
0
            let mut out = Vec::with_capacity(
448
0
                PREFIX.len()
449
0
                    + local_tls_certificate_multihash.len()
450
0
                    + remote_tls_certificate_multihash.len(),
451
0
            );
452
0
            out.extend_from_slice(PREFIX);
453
0
            if is_initiator {
454
0
                out.extend_from_slice(&local_tls_certificate_multihash);
455
0
                out.extend_from_slice(&remote_tls_certificate_multihash);
456
0
            } else {
457
0
                out.extend_from_slice(&remote_tls_certificate_multihash);
458
0
                out.extend_from_slice(&local_tls_certificate_multihash);
459
0
            }
460
461
            // In the WebRTC libp2p protocol, the initiator of the connection is *not* the
462
            // initiator of the Noise handshake. Instead, it's the "server" that initiates the
463
            // Noise handshake. This saves a round-trip.
464
0
            (noise_key, out, !is_initiator)
465
0
        };
466
0
467
0
        let handshake = {
468
0
            let mut noise_ephemeral_key = zeroize::Zeroizing::new([0; 32]);
469
0
            self.randomness_seeds.fill_bytes(&mut *noise_ephemeral_key);
470
0
            noise::HandshakeInProgress::new(noise::Config {
471
0
                key: noise_key,
472
0
                is_initiator: local_is_noise_initiator,
473
0
                prologue: &noise_prologue,
474
0
                ephemeral_secret_key: &noise_ephemeral_key,
475
0
            })
476
0
        };
477
0
478
0
        let connection_task = MultiStreamConnectionTask::new(
479
0
            {
480
0
                let mut seed = [0; 32];
481
0
                self.randomness_seeds.fill_bytes(&mut seed);
482
0
                seed
483
0
            },
484
0
            when_connection_start,
485
0
            handshake,
486
0
            self.max_inbound_substreams,
487
0
            substreams_capacity,
488
0
            self.max_protocol_name_len,
489
0
            self.ping_protocol.clone(),
490
0
        );
491
0
492
0
        let _previous_value = self.connections.insert(
493
0
            connection_id,
494
0
            Connection {
495
0
                state: InnerConnectionState::Handshaking,
496
0
                user_data,
497
0
            },
498
0
        );
499
0
        debug_assert!(_previous_value.is_none());
500
501
0
        (connection_id, connection_task)
502
0
    }
Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE19insert_multi_streampEBa_
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE19insert_multi_streamjECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE19insert_multi_streampEBa_
503
504
    /// Switches the connection to a state where it will shut down soon.
505
    ///
506
    /// Calling this function does **not** generate a [`Event::StartShutdown`] event for this
507
    /// connection. The event is implied.
508
    ///
509
    /// It is no longer possible to start requests, open notifications substreams, or send
510
    /// notifications. No new incoming requests or notification substreams will be reported. The
511
    /// incoming notifications that were sent by the remote before the shutdown started will still
512
    /// be reported.
513
    ///
514
    /// It is possible to call this method on connection that are still in their handshaking
515
    /// phase.
516
    ///
517
    /// This function generates a message destined to the connection. Use
518
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
519
    ///
520
    /// # Panic
521
    ///
522
    /// Panics if the connection is already shutting down, either because
523
    /// [`Network::start_shutdown`] was called or a [`Event::StartShutdown`] event was yielded.
524
    ///
525
    #[track_caller]
526
0
    pub fn start_shutdown(&mut self, connection_id: ConnectionId) {
527
0
        let connection = match self.connections.get_mut(&connection_id) {
528
0
            Some(c) => c,
529
0
            None => panic!(),
530
        };
531
532
0
        let is_established = match connection.state {
533
0
            InnerConnectionState::Handshaking => false,
534
0
            InnerConnectionState::Established => true,
535
            InnerConnectionState::ShuttingDown {
536
                api_initiated: true,
537
                ..
538
0
            } => panic!("start_shutdown called twice on same connection"), // Forbidden.
539
            InnerConnectionState::ShuttingDown {
540
                api_initiated: false,
541
                ..
542
0
            } => panic!("start_shutdown called after StartShutdown event"), // Forbidden.
543
        };
544
545
0
        connection.state = InnerConnectionState::ShuttingDown {
546
0
            was_established: is_established,
547
0
            api_initiated: true,
548
0
        };
549
0
550
0
        self.messages_to_connections
551
0
            .push_back((connection_id, CoordinatorToConnectionInner::StartShutdown));
552
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14start_shutdownB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14start_shutdownCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14start_shutdownB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14start_shutdownCsiUjFBJteJ7x_17smoldot_full_node
553
554
    /// Returns true if the collection doesn't contain any connection.
555
0
    pub fn is_empty(&self) -> bool {
556
0
        self.connections.is_empty()
557
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE8is_emptyB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE8is_emptyB9_
558
559
    /// Returns the number of connections in the collection.
560
0
    pub fn len(&self) -> usize {
561
0
        self.connections.len()
562
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE3lenB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE3lenB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE3lenCsiUjFBJteJ7x_17smoldot_full_node
563
564
    /// Returns the state of the given connection.
565
    ///
566
    /// # Panic
567
    ///
568
    /// Panics if the identifier is invalid or corresponds to a connection that has already
569
    /// entirely shut down.
570
    ///
571
0
    pub fn connection_state(&self, connection_id: ConnectionId) -> ConnectionState {
572
0
        let connection = self.connections.get(&connection_id).unwrap();
573
0
        match connection.state {
574
0
            InnerConnectionState::Handshaking => ConnectionState {
575
0
                established: false,
576
0
                shutting_down: false,
577
0
            },
578
0
            InnerConnectionState::Established => ConnectionState {
579
0
                established: true,
580
0
                shutting_down: false,
581
0
            },
582
            InnerConnectionState::ShuttingDown {
583
0
                was_established, ..
584
0
            } => ConnectionState {
585
0
                established: was_established,
586
0
                shutting_down: true,
587
0
            },
588
        }
589
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE16connection_stateB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE16connection_stateCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE16connection_stateB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE16connection_stateCsiUjFBJteJ7x_17smoldot_full_node
590
591
    /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`].
592
    ///
593
    /// The new value only applies to substreams opened after this function has been called.
594
21
    pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) {
595
21
        if self.max_protocol_name_len == new_max_length {
596
21
            return;
597
0
        }
598
0
599
0
        self.max_protocol_name_len = new_max_length;
600
0
601
0
        // Send a message to all connections to update this value.
602
0
        self.messages_to_connections.reserve(self.connections.len());
603
0
        for connection_id in self.connections.keys() {
604
0
            self.messages_to_connections.push_back((
605
0
                *connection_id,
606
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
607
0
            ));
608
0
        }
609
21
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE25set_max_protocol_name_lenB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE25set_max_protocol_name_lenB9_
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
594
2
    pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) {
595
2
        if self.max_protocol_name_len == new_max_length {
596
2
            return;
597
0
        }
598
0
599
0
        self.max_protocol_name_len = new_max_length;
600
0
601
0
        // Send a message to all connections to update this value.
602
0
        self.messages_to_connections.reserve(self.connections.len());
603
0
        for connection_id in self.connections.keys() {
604
0
            self.messages_to_connections.push_back((
605
0
                *connection_id,
606
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
607
0
            ));
608
0
        }
609
2
    }
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCscDgN54JpMGG_6author
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25set_max_protocol_name_lenCsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
594
19
    pub fn set_max_protocol_name_len(&mut self, new_max_length: usize) {
595
19
        if self.max_protocol_name_len == new_max_length {
596
19
            return;
597
0
        }
598
0
599
0
        self.max_protocol_name_len = new_max_length;
600
0
601
0
        // Send a message to all connections to update this value.
602
0
        self.messages_to_connections.reserve(self.connections.len());
603
0
        for connection_id in self.connections.keys() {
604
0
            self.messages_to_connections.push_back((
605
0
                *connection_id,
606
0
                CoordinatorToConnectionInner::SetMaxProtocolNameLen { new_max_length },
607
0
            ));
608
0
        }
609
19
    }
610
611
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol
612
    /// name and indicate the type of the protocol.
613
    ///
614
    /// # Panic
615
    ///
616
    /// Panics if the substream is not in the correct state.
617
    ///
618
0
    pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy) {
619
0
        let (connection_id, inner_substream_id, already_accepted) =
620
0
            match self.ingoing_negotiated_substreams.get_mut(&substream_id) {
621
0
                Some(s) => s,
622
0
                None => panic!(),
623
            };
624
0
        assert!(!*already_accepted);
625
626
0
        self.messages_to_connections.push_back((
627
0
            *connection_id,
628
0
            CoordinatorToConnectionInner::AcceptInbound {
629
0
                substream_id: *inner_substream_id,
630
0
                inbound_ty: ty,
631
0
            },
632
0
        ));
633
0
634
0
        *already_accepted = true;
635
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14accept_inboundB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14accept_inboundB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node
636
637
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the
638
    /// protocol name as not supported.
639
    ///
640
    /// # Panic
641
    ///
642
    /// Panics if the substream is not in the correct state.
643
    ///
644
0
    pub fn reject_inbound(&mut self, substream_id: SubstreamId) {
645
0
        let (connection_id, inner_substream_id, already_accepted) =
646
0
            match self.ingoing_negotiated_substreams.remove(&substream_id) {
647
0
                Some(s) => s,
648
0
                None => panic!(),
649
            };
650
0
        let _was_in = self
651
0
            .ingoing_negotiated_substreams_by_connection
652
0
            .remove(&(connection_id, inner_substream_id));
653
0
        debug_assert!(_was_in.is_some());
654
0
        assert!(!already_accepted);
655
656
0
        self.messages_to_connections.push_back((
657
0
            connection_id,
658
0
            CoordinatorToConnectionInner::RejectInbound {
659
0
                substream_id: inner_substream_id,
660
0
            },
661
0
        ));
662
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE14reject_inboundB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE14reject_inboundB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node
663
664
    /// Sends a request to the given peer.
665
    ///
666
    /// A [`Event::Response`] event will later be generated containing the result of the request.
667
    /// This event is generated even if the connection the request was sent on has been closed.
668
    ///
669
    /// It is invalid to start a request on a connection before a [`Event::HandshakeFinished`]
670
    /// or after a [`Event::StartShutdown`] has been generated, or after
671
    /// [`Network::start_shutdown`] has been called.
672
    ///
673
    /// Returns a newly-allocated identifier for this substream.
674
    ///
675
    /// This function generates a message destined to the connection. Use
676
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
677
    ///
678
    /// # Requests
679
    ///
680
    /// A request consists in:
681
    ///
682
    /// - Opening a substream on an established connection with the target.
683
    /// - Negotiating the requested protocol (`protocol_index`) on this substream using the
684
    ///   *multistream-select* protocol.
685
    /// - Sending the request (`request_data` parameter), prefixed with its length.
686
    /// - Waiting for the response (prefixed with its length), which is then returned.
687
    ///
688
    /// An error happens if the connection closes while the request is in progress, if the remote
689
    /// doesn't support the given protocol, if the request or response doesn't respect the protocol
690
    /// limits, or if the remote takes too much time to answer.
691
    ///
692
    /// The timeout is the time between the moment the substream is opened and the moment the
693
    /// response is sent back. It starts ticking only after the request starts being sent. If the
694
    /// emitter doesn't send the request or if the receiver doesn't answer during this time
695
    /// window, the request is considered failed.
696
    ///
697
    /// # Panic
698
    ///
699
    /// Panics if the [`ConnectionId`] is invalid or is a connection that hasn't finished its
700
    /// handshake or is shutting down.
701
    ///
702
    #[track_caller]
703
0
    pub fn start_request(
704
0
        &mut self,
705
0
        target: ConnectionId,
706
0
        protocol_name: String,
707
0
        request_data: Option<Vec<u8>>,
708
0
        timeout: Duration,
709
0
        max_response_size: usize,
710
0
    ) -> SubstreamId {
711
0
        let connection = match self.connections.get(&target) {
712
0
            Some(c) => c,
713
0
            None => panic!(),
714
        };
715
0
        assert!(matches!(
716
0
            connection.state,
717
            InnerConnectionState::Established
718
        ));
719
720
0
        let substream_id = self.next_substream_id;
721
0
        self.next_substream_id.0 += 1;
722
0
723
0
        let _was_inserted = self.outgoing_requests.insert((target, substream_id));
724
0
        debug_assert!(_was_inserted);
725
726
0
        self.messages_to_connections.push_back((
727
0
            target,
728
0
            CoordinatorToConnectionInner::StartRequest {
729
0
                protocol_name,
730
0
                request_data,
731
0
                timeout,
732
0
                max_response_size,
733
0
                substream_id,
734
0
            },
735
0
        ));
736
0
737
0
        substream_id
738
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE13start_requestB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE13start_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE13start_requestB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE13start_requestCsiUjFBJteJ7x_17smoldot_full_node
739
740
    /// Start opening a notifications substream.
741
    ///
742
    /// It is invalid to open a notifications substream on a connection before a
743
    /// [`Event::HandshakeFinished`] or after a [`Event::StartShutdown`] has been generated, or
744
    /// after [`Network::start_shutdown`] has been called.
745
    ///
746
    /// Returns a newly-allocated identifier for this substream.
747
    ///
748
    /// This function generates a message destined to the connection. Use
749
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
750
    ///
751
    /// # Panic
752
    ///
753
    /// Panics if the [`ConnectionId`] is invalid or is a connection that hasn't finished its
754
    /// handshake or is shutting down.
755
    ///
756
    #[track_caller]
757
0
    pub fn open_out_notifications(
758
0
        &mut self,
759
0
        connection_id: ConnectionId,
760
0
        protocol_name: String,
761
0
        handshake_timeout: Duration,
762
0
        handshake: impl Into<Vec<u8>>,
763
0
        max_handshake_size: usize,
764
0
    ) -> SubstreamId {
765
0
        let connection = match self.connections.get(&connection_id) {
766
0
            Some(c) => c,
767
0
            None => panic!(),
768
        };
769
0
        assert!(matches!(
770
0
            connection.state,
771
            InnerConnectionState::Established
772
        ));
773
774
0
        let substream_id = self.next_substream_id;
775
0
        self.next_substream_id.0 += 1;
776
0
777
0
        let _prev_value = self
778
0
            .outgoing_notification_substreams
779
0
            .insert(substream_id, (connection_id, SubstreamState::Pending));
780
0
        debug_assert!(_prev_value.is_none());
781
0
        let _was_inserted = self
782
0
            .outgoing_notification_substreams_by_connection
783
0
            .insert((connection_id, substream_id));
784
0
        debug_assert!(_was_inserted);
785
786
0
        self.messages_to_connections.push_back((
787
0
            connection_id,
788
0
            CoordinatorToConnectionInner::OpenOutNotifications {
789
0
                protocol_name,
790
0
                handshake: handshake.into(),
791
0
                handshake_timeout,
792
0
                max_handshake_size,
793
0
                substream_id,
794
0
            },
795
0
        ));
796
0
797
0
        substream_id
798
0
    }
Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE22open_out_notificationspEBa_
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE22open_out_notificationsINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE22open_out_notificationspEBa_
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE22open_out_notificationsINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node
799
800
    /// Start closing a previously-open notifications substream, or cancels opening a
801
    /// notifications substream.
802
    ///
803
    /// All the notifications that have been queued remain queued. The substream actually closes
804
    /// only once the queue is empty.
805
    ///
806
    /// Calling this method does *not* emit any event. The [`SubstreamId`] is considered invalid
807
    /// after this function returns.
808
    ///
809
    /// This function generates a message destined to the connection. Use
810
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
811
    ///
812
    /// # Panic
813
    ///
814
    /// Panics if [`SubstreamId`] doesn't correspond to an outbound notifications substream.
815
    ///
816
    #[track_caller]
817
0
    pub fn close_out_notifications(&mut self, substream_id: SubstreamId) {
818
        // Both `Pending` and `Open` states are accepted.
819
0
        let (connection_id, _state) =
820
0
            match self.outgoing_notification_substreams.remove(&substream_id) {
821
0
                Some(s) => s,
822
0
                None => panic!(),
823
            };
824
0
        let _was_in = self
825
0
            .outgoing_notification_substreams_by_connection
826
0
            .remove(&(connection_id, substream_id));
827
0
        debug_assert!(_was_in);
828
829
0
        self.messages_to_connections.push_back((
830
0
            connection_id,
831
0
            CoordinatorToConnectionInner::CloseOutNotifications { substream_id },
832
0
        ));
833
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23close_out_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23close_out_notificationsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23close_out_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23close_out_notificationsCsiUjFBJteJ7x_17smoldot_full_node
834
835
    /// Adds a notification to the queue of notifications to send to the given peer.
836
    ///
837
    /// It is invalid to call this on a [`SubstreamId`] before a successful
838
    /// [`Event::NotificationsOutResult`] has been yielded.
839
    ///
840
    /// Each substream maintains a queue of notifications to be sent to the remote. This method
841
    /// attempts to push a notification to this queue.
842
    ///
843
    /// An error is also returned if the queue exceeds a certain size in bytes, for two reasons:
844
    ///
845
    /// - Since the content of the queue is transferred at a limited rate, each notification
846
    /// pushed at the end of the queue will take more time than the previous one to reach the
847
    /// destination. Once the queue reaches a certain size, the time it would take for
848
    /// newly-pushed notifications to reach the destination would start being unreasonably large.
849
    ///
850
    /// - If the remote deliberately applies back-pressure on the substream, it is undesirable to
851
    /// increase the memory usage of the local node.
852
    ///
853
    /// Similarly, the queue being full is a normal situation and notification protocols should
854
    /// be designed in such a way that discarding notifications shouldn't have a too negative
855
    /// impact.
856
    ///
857
    /// Regardless of the success of this function, no guarantee exists about the successful
858
    /// delivery of notifications.
859
    ///
860
    /// This function generates a message destined to the connection. Use
861
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
862
    ///
863
    /// # Panics
864
    ///
865
    /// Panics if [`SubstreamId`] is not a fully open outbound notifications substream.
866
    ///
867
    #[track_caller]
868
0
    pub fn queue_notification(
869
0
        &mut self,
870
0
        substream_id: SubstreamId,
871
0
        notification: impl Into<Vec<u8>>,
872
0
    ) -> Result<(), QueueNotificationError> {
873
0
        let (connection_id, state) = match self.outgoing_notification_substreams.get(&substream_id)
874
        {
875
0
            Some(s) => s,
876
0
            None => panic!(),
877
        };
878
0
        assert!(matches!(state, SubstreamState::Open));
879
880
        //  TODO: add some back-pressure system and return a `QueueNotificationError` if full
881
882
0
        self.messages_to_connections.push_back((
883
0
            *connection_id,
884
0
            CoordinatorToConnectionInner::QueueNotification {
885
0
                substream_id,
886
0
                notification: notification.into(),
887
0
            },
888
0
        ));
889
0
890
0
        Ok(())
891
0
    }
Unexecuted instantiation: _RINvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB6_7NetworkppE18queue_notificationpEBa_
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE18queue_notificationINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkppE18queue_notificationpEBa_
Unexecuted instantiation: _RINvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB6_7NetworkINtNtNtBa_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB6_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE18queue_notificationINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node
892
893
    /// Accepts a request for an inbound notifications substream reported by an
894
    /// [`Event::NotificationsInOpen`].
895
    ///
896
    /// If a [`Event::NotificationsInClose`] event is yielded, then this function must not be
897
    /// called and will panic.
898
    ///
899
    /// This function generates a message destined to the connection. Use
900
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
901
    ///
902
    /// # Panic
903
    ///
904
    /// Panics if the [`SubstreamId`] doesn't correspond to an inbound notifications substream.
905
    ///
906
    #[track_caller]
907
0
    pub fn accept_in_notifications(
908
0
        &mut self,
909
0
        substream_id: SubstreamId,
910
0
        handshake: Vec<u8>,
911
0
        max_notification_size: usize,
912
0
    ) {
913
0
        let (connection_id, state, inner_substream_id) =
914
0
            match self.ingoing_notification_substreams.get_mut(&substream_id) {
915
0
                Some(s) => s,
916
0
                None => panic!(),
917
            };
918
0
        assert!(matches!(state, SubstreamState::Pending));
919
920
0
        self.messages_to_connections.push_back((
921
0
            *connection_id,
922
0
            CoordinatorToConnectionInner::AcceptInNotifications {
923
0
                substream_id: *inner_substream_id,
924
0
                handshake,
925
0
                max_notification_size,
926
0
            },
927
0
        ));
928
0
929
0
        *state = SubstreamState::Open;
930
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23accept_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23accept_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23accept_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23accept_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node
931
932
    /// Rejects a request for an inbound notifications substream reported by an
933
    /// [`Event::NotificationsInOpen`].
934
    ///
935
    /// If a [`Event::NotificationsInClose`] event is yielded, then this function must not be
936
    /// called and will panic.
937
    ///
938
    /// The [`SubstreamId`] is considered no longer valid after this function returns.
939
    ///
940
    /// This function generates a message destined to the connection. Use
941
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
942
    ///
943
    /// # Panic
944
    ///
945
    /// Panics if the [`SubstreamId`] doesn't correspond to an inbound notifications substream.
946
    ///
947
    #[track_caller]
948
0
    pub fn reject_in_notifications(&mut self, substream_id: SubstreamId) {
949
0
        if let Some((connection_id, SubstreamState::Pending, inner_substream_id)) =
950
0
            self.ingoing_notification_substreams.remove(&substream_id)
951
        {
952
0
            let _was_in = self
953
0
                .ingoing_notification_substreams_by_connection
954
0
                .remove(&(connection_id, inner_substream_id));
955
0
            debug_assert_eq!(_was_in, Some(substream_id));
956
957
0
            self.messages_to_connections.push_back((
958
0
                connection_id,
959
0
                CoordinatorToConnectionInner::RejectInNotifications {
960
0
                    substream_id: inner_substream_id,
961
0
                },
962
0
            ));
963
        } else {
964
            // Note that, if this is reached, the pending substream is not inserted back
965
            // in the state machine, meaning that `self` is now in an inconsistent state.
966
            // But considering that we panic, this state mismatch isn't actually observable.
967
0
            panic!()
968
        }
969
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE23reject_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE23reject_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE23reject_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE23reject_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node
970
971
    /// Start the closing of an inbound notifications substream that was previously accepted with
972
    /// [`Network::accept_in_notifications`]
973
    ///
974
    /// Calling this function will later generate a [`Event::NotificationsInClose`] event once the
975
    /// substream is effectively closed.
976
    /// This function gracefully asks the remote to close the substream. The remote has the
977
    /// duration indicated with `timeout` to effectively close the substream. In the meanwhile,
978
    /// notifications can still be received.
979
    ///
980
    /// This function generates a message destined to the connection. Use
981
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
982
    ///
983
    /// # Panic
984
    ///
985
    /// Panics if the [`SubstreamId`] doesn't correspond to an accepted inbound notifications
986
    /// substream.
987
    ///
988
    #[track_caller]
989
0
    pub fn start_close_in_notifications(&mut self, substream_id: SubstreamId, timeout: Duration) {
990
0
        let (connection_id, state, inner_substream_id) =
991
0
            match self.ingoing_notification_substreams.get_mut(&substream_id) {
992
0
                Some(s) => s,
993
0
                None => panic!(),
994
            };
995
0
        assert!(matches!(state, SubstreamState::Open));
996
997
0
        self.messages_to_connections.push_back((
998
0
            *connection_id,
999
0
            CoordinatorToConnectionInner::CloseInNotifications {
1000
0
                substream_id: *inner_substream_id,
1001
0
                timeout,
1002
0
            },
1003
0
        ));
1004
0
1005
0
        *state = SubstreamState::RequestedClosing;
1006
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE28start_close_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE28start_close_in_notificationsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE28start_close_in_notificationsB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE28start_close_in_notificationsCsiUjFBJteJ7x_17smoldot_full_node
1007
1008
    /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`].
1009
    ///
1010
    /// If the substream was in the meanwhile yielded in an [`Event::RequestInCancel`], then this
1011
    /// function must not be called and will panic.
1012
    ///
1013
    /// The [`SubstreamId`] is considered no longer valid after this function returns.
1014
    ///
1015
    /// This function generates a message destined to the connection. Use
1016
    /// [`Network::pull_message_to_connection`] to process these messages after it has returned.
1017
    ///
1018
    /// # Panic
1019
    ///
1020
    /// Panics if the [`SubstreamId`] doesn't correspond to an active incoming request.
1021
    ///
1022
    #[track_caller]
1023
0
    pub fn respond_in_request(&mut self, substream_id: SubstreamId, response: Result<Vec<u8>, ()>) {
1024
0
        let (connection_id, inner_substream_id) = match self.ingoing_requests.remove(&substream_id)
1025
        {
1026
0
            Some(s) => s,
1027
0
            None => panic!(),
1028
        };
1029
1030
0
        self.ingoing_requests_by_connection
1031
0
            .remove(&(connection_id, substream_id));
1032
0
1033
0
        self.messages_to_connections.push_back((
1034
0
            connection_id,
1035
0
            CoordinatorToConnectionInner::AnswerRequest {
1036
0
                substream_id: inner_substream_id,
1037
0
                response,
1038
0
            },
1039
0
        ));
1040
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE18respond_in_requestB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE18respond_in_requestB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node
1041
1042
    /// Pulls a message that must be sent to a connection.
1043
    ///
1044
    /// The message must be passed to [`SingleStreamConnectionTask::inject_coordinator_message`]
1045
    /// or [`MultiStreamConnectionTask::inject_coordinator_message`] in the appropriate connection.
1046
    ///
1047
    /// This function guarantees that the [`ConnectionId`] always refers to a connection that
1048
    /// is still alive, in the sense that [`SingleStreamConnectionTask::inject_coordinator_message`]
1049
    /// or [`MultiStreamConnectionTask::inject_coordinator_message`] has never returned `None`
1050
    /// and that no [`Event::Shutdown`] has been generated for this connection.
1051
147
    pub fn pull_message_to_connection(
1052
147
        &mut self,
1053
147
    ) -> Option<(ConnectionId, CoordinatorToConnection)> {
1054
147
        let 
message0
= self
1055
147
            .messages_to_connections
1056
147
            .pop_front()
1057
147
            .map(|(id, inner)| 
(id, CoordinatorToConnection { inner })0
)?;
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE26pull_message_to_connection0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE26pull_message_to_connection0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE26pull_message_to_connection0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE26pull_message_to_connection0CsiUjFBJteJ7x_17smoldot_full_node
1058
1059
0
        if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner {
1060
0
            self.shutdown_finished_connections.push_back(message.0);
1061
0
        }
1062
1063
0
        Some(message)
1064
147
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE26pull_message_to_connectionB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE26pull_message_to_connectionCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE26pull_message_to_connectionB9_
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE26pull_message_to_connectionCsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
1051
147
    pub fn pull_message_to_connection(
1052
147
        &mut self,
1053
147
    ) -> Option<(ConnectionId, CoordinatorToConnection)> {
1054
147
        let 
message0
= self
1055
147
            .messages_to_connections
1056
147
            .pop_front()
1057
147
            .map(|(id, inner)| (id, CoordinatorToConnection { inner }))?;
1058
1059
0
        if let CoordinatorToConnectionInner::ShutdownFinishedAck = message.1.inner {
1060
0
            self.shutdown_finished_connections.push_back(message.0);
1061
0
        }
1062
1063
0
        Some(message)
1064
147
    }
1065
1066
    /// Injects into the state machine a message generated by
1067
    /// [`SingleStreamConnectionTask::pull_message_to_coordinator`] or
1068
    /// [`MultiStreamConnectionTask::pull_message_to_coordinator`].
1069
    ///
1070
    /// This message is queued and is later processed in [`Network::next_event`]. This means that
1071
    /// it is [`Network::next_event`] and not [`Network::inject_connection_message`] that updates
1072
    /// the internals of the state machine according to the content of the message. For example,
1073
    /// if a [`SingleStreamConnectionTask`] sends a message to the coordinator indicating that a
1074
    /// notifications substream has been closed, the coordinator will still believe that it is
1075
    /// open until [`Network::next_event`] processes this message and at the same time returns a
1076
    /// corresponding [`Event`]. Processing messages directly in
1077
    /// [`Network::inject_connection_message`] would introduce "race conditions" where the API
1078
    /// user can't be sure in which state a connection or a substream is.
1079
0
    pub fn inject_connection_message(
1080
0
        &mut self,
1081
0
        connection_id: ConnectionId,
1082
0
        message: ConnectionToCoordinator,
1083
0
    ) {
1084
0
        assert!(self.connections.contains_key(&connection_id));
1085
1086
        // TODO: add a limit for a back-pressure-like system?
1087
0
        self.pending_incoming_messages
1088
0
            .push_back((connection_id, message.inner));
1089
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE25inject_connection_messageB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE25inject_connection_messageCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE25inject_connection_messageB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE25inject_connection_messageCsiUjFBJteJ7x_17smoldot_full_node
1090
1091
    /// Returns the next event produced by the service. Returns `None` if no event is available.
1092
    ///
1093
    /// Call this function in a loop after having injected messages using
1094
    /// [`Network::inject_connection_message`].
1095
147
    pub fn next_event(&mut self) -> Option<Event<TConn>> {
1096
        loop {
1097
147
            if let Some(
connection_id0
) = self.shutdown_finished_connections.pop_front() {
1098
0
                let connection = self.connections.remove(&connection_id).unwrap();
1099
0
                let was_established = match &connection.state {
1100
                    InnerConnectionState::ShuttingDown {
1101
0
                        was_established, ..
1102
0
                    } => *was_established,
1103
0
                    _ => unreachable!(),
1104
                };
1105
1106
0
                return Some(Event::Shutdown {
1107
0
                    id: connection_id,
1108
0
                    was_established,
1109
0
                    user_data: connection.user_data,
1110
0
                });
1111
147
            }
1112
1113
            // When a connection starts its shutdown, its id is put in `shutting_down_connection`.
1114
            // When that happens, we go through the local state and clean up all requests and
1115
            // notification substreams that are in progress/open and return the cancellations
1116
            // as events.
1117
            //
1118
            // `shutting_down_connection` is set back to `None` only if it turns out that there
1119
            // is no request or notification substream in progress/open anymore.
1120
147
            if let Some(
shutting_down_connection0
) = self.shutting_down_connection {
1121
                // Outgoing notification substreams to close.
1122
0
                if let Some((_, substream_id)) = self
1123
0
                    .outgoing_notification_substreams_by_connection
1124
0
                    .range(
1125
0
                        (shutting_down_connection, SubstreamId::MIN)
1126
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1127
0
                    )
1128
0
                    .cloned()
1129
0
                    .next()
1130
                {
1131
0
                    self.outgoing_notification_substreams_by_connection
1132
0
                        .remove(&(shutting_down_connection, substream_id));
1133
0
                    let (_, state) = self
1134
0
                        .outgoing_notification_substreams
1135
0
                        .remove(&substream_id)
1136
0
                        .unwrap();
1137
0
                    return Some(match state {
1138
0
                        SubstreamState::Open => Event::NotificationsOutReset { substream_id },
1139
0
                        SubstreamState::Pending => Event::NotificationsOutResult {
1140
0
                            substream_id,
1141
0
                            result: Err(NotificationsOutErr::ConnectionShutdown),
1142
0
                        },
1143
0
                        SubstreamState::RequestedClosing => unreachable!(), // Never set for outgoing notification substreams.
1144
                    });
1145
0
                }
1146
1147
                // Ingoing notification substreams to close.
1148
0
                if let Some((key, substream_id)) = self
1149
0
                    .ingoing_notification_substreams_by_connection
1150
0
                    .range(
1151
0
                        (shutting_down_connection, established::SubstreamId::MIN)
1152
0
                            ..=(shutting_down_connection, established::SubstreamId::MAX),
1153
0
                    )
1154
0
                    .map(|(k, v)| (*k, *v))
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_event0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_event0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_event0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_event0CsiUjFBJteJ7x_17smoldot_full_node
1155
0
                    .next()
1156
                {
1157
0
                    let (_, state, _) = self
1158
0
                        .ingoing_notification_substreams
1159
0
                        .remove(&substream_id)
1160
0
                        .unwrap();
1161
0
                    self.ingoing_notification_substreams_by_connection
1162
0
                        .remove(&key)
1163
0
                        .unwrap();
1164
0
1165
0
                    return Some(match state {
1166
                        SubstreamState::Open | SubstreamState::RequestedClosing => {
1167
0
                            Event::NotificationsInClose {
1168
0
                                substream_id,
1169
0
                                outcome: Err(NotificationsInClosedErr::ConnectionShutdown),
1170
0
                            }
1171
                        }
1172
                        SubstreamState::Pending => {
1173
0
                            Event::NotificationsInOpenCancel { substream_id }
1174
                        }
1175
                    });
1176
0
                }
1177
1178
                // Find outgoing requests to cancel.
1179
0
                if let Some((_, substream_id)) = self
1180
0
                    .outgoing_requests
1181
0
                    .range(
1182
0
                        (shutting_down_connection, SubstreamId::MIN)
1183
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1184
0
                    )
1185
0
                    .next()
1186
                {
1187
0
                    let substream_id = *substream_id;
1188
0
                    self.outgoing_requests
1189
0
                        .remove(&(shutting_down_connection, substream_id));
1190
0
1191
0
                    return Some(Event::Response {
1192
0
                        substream_id,
1193
0
                        response: Err(RequestError::ConnectionShutdown),
1194
0
                    });
1195
0
                }
1196
1197
                // Find ingoing requests to cancel.
1198
0
                if let Some((_, substream_id)) = self
1199
0
                    .ingoing_requests_by_connection
1200
0
                    .range(
1201
0
                        (shutting_down_connection, SubstreamId::MIN)
1202
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1203
0
                    )
1204
0
                    .next()
1205
                {
1206
0
                    let substream_id = *substream_id;
1207
0
1208
0
                    let _was_in = self.ingoing_requests.remove(&substream_id);
1209
0
                    debug_assert!(_was_in.is_some());
1210
0
                    let _was_in = self
1211
0
                        .ingoing_requests_by_connection
1212
0
                        .remove(&(shutting_down_connection, substream_id));
1213
0
                    debug_assert!(_was_in);
1214
1215
0
                    return Some(Event::RequestInCancel { substream_id });
1216
0
                }
1217
1218
                // Find ingoing negotiated substreams to cancel.
1219
0
                if let Some((key, substream_id)) = self
1220
0
                    .ingoing_negotiated_substreams_by_connection
1221
0
                    .range(
1222
0
                        (shutting_down_connection, established::SubstreamId::MIN)
1223
0
                            ..=(shutting_down_connection, established::SubstreamId::MAX),
1224
0
                    )
1225
0
                    .map(|(k, v)| (*k, *v))
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events_0CsiUjFBJteJ7x_17smoldot_full_node
1226
0
                    .next()
1227
                {
1228
0
                    let Some((_, _, was_accepted)) =
1229
0
                        self.ingoing_negotiated_substreams.remove(&substream_id)
1230
                    else {
1231
0
                        unreachable!()
1232
                    };
1233
0
                    let _was_in = self
1234
0
                        .ingoing_negotiated_substreams_by_connection
1235
0
                        .remove(&key);
1236
0
                    debug_assert!(_was_in.is_some());
1237
1238
0
                    if was_accepted {
1239
0
                        return Some(Event::InboundAcceptedCancel { substream_id });
1240
                    } else {
1241
0
                        return Some(Event::InboundNegotiatedCancel { substream_id });
1242
                    }
1243
0
                }
1244
0
1245
0
                // If this is reached, this connection has no more request or notifications
1246
0
                // substream that is still in progress or open. The connection is successfully
1247
0
                // shut down.
1248
0
                self.shutting_down_connection = None;
1249
147
            }
1250
1251
            // Now actually process messages.
1252
147
            let (
connection_id, message0
) = self.pending_incoming_messages.pop_front()?;
1253
0
            let connection = &mut self.connections.get_mut(&connection_id).unwrap();
1254
0
1255
0
            break Some(match message {
1256
0
                ConnectionToCoordinatorInner::StartShutdown(reason) => {
1257
                    // The `ConnectionToCoordinator` message contains a shutdown reason if
1258
                    // and only if it sends `StartShutdown` as a response to a shutdown
1259
                    // initiated by the remote. If the shutdown was initiated locally
1260
                    // (`api_initiated` is `true`), then it can contain `None`, but it can also
1261
                    // contain `Some` in case the shutdown was initiated by the remote at the same
1262
                    // time as it was initiated locally.
1263
1264
0
                    let report_event = match &mut connection.state {
1265
                        InnerConnectionState::ShuttingDown {
1266
                            api_initiated: true,
1267
                            ..
1268
0
                        } => false,
1269
                        InnerConnectionState::ShuttingDown {
1270
                            api_initiated: false,
1271
                            ..
1272
0
                        } => unreachable!(),
1273
0
                        st @ InnerConnectionState::Handshaking => {
1274
0
                            *st = InnerConnectionState::ShuttingDown {
1275
0
                                api_initiated: false,
1276
0
                                was_established: false,
1277
0
                            };
1278
0
                            true
1279
                        }
1280
0
                        st @ InnerConnectionState::Established => {
1281
0
                            *st = InnerConnectionState::ShuttingDown {
1282
0
                                api_initiated: false,
1283
0
                                was_established: true,
1284
0
                            };
1285
0
                            true
1286
                        }
1287
                    };
1288
1289
                    // Control flow can't reach here if `shutting_down_connection` is ̀`Some`.
1290
0
                    debug_assert!(self.shutting_down_connection.is_none());
1291
0
                    self.shutting_down_connection = Some(connection_id);
1292
0
1293
0
                    if !report_event {
1294
                        // No `StartShutdown` event is generated if the API user has started
1295
                        // the shutdown themselves. In that case, `StartShutdown` is merely a
1296
                        // confirmation.
1297
0
                        continue;
1298
                    } else {
1299
0
                        Event::StartShutdown {
1300
0
                            id: connection_id,
1301
0
                            reason: reason.unwrap(), // See comment above.
1302
0
                        }
1303
                    }
1304
                }
1305
                ConnectionToCoordinatorInner::ShutdownFinished => {
1306
0
                    self.messages_to_connections.push_back((
1307
0
                        connection_id,
1308
0
                        CoordinatorToConnectionInner::ShutdownFinishedAck,
1309
0
                    ));
1310
0
                    continue;
1311
                }
1312
0
                ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => {
1313
0
                    debug_assert_eq!(
1314
0
                        self.ingoing_notification_substreams_by_connection
1315
0
                            .range(
1316
0
                                (connection_id, established::SubstreamId::MIN)
1317
0
                                    ..=(connection_id, established::SubstreamId::MAX)
1318
0
                            )
1319
0
                            .count(),
1320
                        0
1321
                    );
1322
0
                    debug_assert_eq!(
1323
0
                        self.outgoing_notification_substreams_by_connection
1324
0
                            .range(
1325
0
                                (connection_id, SubstreamId::MIN)
1326
0
                                    ..=(connection_id, SubstreamId::MAX)
1327
0
                            )
1328
0
                            .count(),
1329
                        0
1330
                    );
1331
0
                    debug_assert_eq!(
1332
0
                        self.ingoing_requests_by_connection
1333
0
                            .range(
1334
0
                                (connection_id, SubstreamId::MIN)
1335
0
                                    ..=(connection_id, SubstreamId::MAX)
1336
0
                            )
1337
0
                            .count(),
1338
                        0
1339
                    );
1340
1341
0
                    match &mut connection.state {
1342
                        InnerConnectionState::ShuttingDown {
1343
0
                            was_established,
1344
0
                            api_initiated,
1345
0
                        } => {
1346
0
                            debug_assert!(!*was_established);
1347
0
                            debug_assert!(*api_initiated);
1348
0
                            continue;
1349
                        }
1350
0
                        st @ InnerConnectionState::Handshaking => {
1351
0
                            *st = InnerConnectionState::Established
1352
                        }
1353
0
                        InnerConnectionState::Established => unreachable!(),
1354
                    }
1355
1356
0
                    Event::HandshakeFinished {
1357
0
                        id: connection_id,
1358
0
                        peer_id,
1359
0
                    }
1360
                }
1361
0
                ConnectionToCoordinatorInner::InboundError(error) => {
1362
                    // Ignore events if a shutdown has been initiated by the coordinator.
1363
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1364
0
                        connection.state
1365
                    {
1366
0
                        debug_assert!(api_initiated);
1367
0
                        continue;
1368
0
                    }
1369
0
1370
0
                    Event::InboundError {
1371
0
                        id: connection_id,
1372
0
                        error,
1373
0
                    }
1374
                }
1375
                ConnectionToCoordinatorInner::InboundNegotiated {
1376
0
                    id: connection_substream_id,
1377
0
                    protocol_name,
1378
                } => {
1379
                    // Ignore events if a shutdown has been initiated by the coordinator.
1380
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1381
0
                        connection.state
1382
                    {
1383
0
                        debug_assert!(api_initiated);
1384
0
                        continue;
1385
0
                    }
1386
0
1387
0
                    let substream_id = self.next_substream_id;
1388
0
                    self.next_substream_id.0 += 1;
1389
0
1390
0
                    self.ingoing_negotiated_substreams.insert(
1391
0
                        substream_id,
1392
0
                        (connection_id, connection_substream_id, false),
1393
0
                    );
1394
0
                    self.ingoing_negotiated_substreams_by_connection
1395
0
                        .insert((connection_id, connection_substream_id), substream_id);
1396
0
1397
0
                    Event::InboundNegotiated {
1398
0
                        id: connection_id,
1399
0
                        substream_id,
1400
0
                        protocol_name,
1401
0
                    }
1402
                }
1403
                ConnectionToCoordinatorInner::InboundAcceptedCancel {
1404
0
                    id: connection_substream_id,
1405
                } => {
1406
                    // Ignore events if a shutdown has been initiated by the coordinator.
1407
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1408
0
                        connection.state
1409
                    {
1410
0
                        debug_assert!(api_initiated);
1411
0
                        continue;
1412
0
                    }
1413
0
1414
0
                    let substream_id = self
1415
0
                        .ingoing_negotiated_substreams_by_connection
1416
0
                        .remove(&(connection_id, connection_substream_id))
1417
0
                        .unwrap_or_else(|| unreachable!());
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events0_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events0_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events0_0CsiUjFBJteJ7x_17smoldot_full_node
1418
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1419
0
                    debug_assert!(_was_in.is_some());
1420
1421
0
                    Event::InboundAcceptedCancel { substream_id }
1422
                }
1423
                ConnectionToCoordinatorInner::RequestIn {
1424
0
                    id: connection_substream_id,
1425
0
                    request,
1426
                } => {
1427
                    // Ignore events if a shutdown has been initiated by the coordinator.
1428
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1429
0
                        connection.state
1430
                    {
1431
0
                        debug_assert!(api_initiated);
1432
0
                        continue;
1433
0
                    }
1434
0
1435
0
                    let substream_id = self
1436
0
                        .ingoing_negotiated_substreams_by_connection
1437
0
                        .remove(&(connection_id, connection_substream_id))
1438
0
                        .unwrap_or_else(|| unreachable!());
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events1_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events1_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events1_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events1_0CsiUjFBJteJ7x_17smoldot_full_node
1439
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1440
0
                    debug_assert!(_was_in.is_some());
1441
1442
0
                    self.ingoing_requests
1443
0
                        .insert(substream_id, (connection_id, connection_substream_id));
1444
0
                    self.ingoing_requests_by_connection
1445
0
                        .insert((connection_id, substream_id));
1446
0
1447
0
                    Event::RequestIn {
1448
0
                        substream_id,
1449
0
                        request_payload: request,
1450
0
                    }
1451
                }
1452
                ConnectionToCoordinatorInner::Response {
1453
0
                    id: substream_id,
1454
0
                    response,
1455
                    ..
1456
                } => {
1457
                    // Ignore events if a shutdown has been initiated by the coordinator.
1458
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1459
0
                        connection.state
1460
                    {
1461
0
                        debug_assert!(api_initiated);
1462
0
                        continue;
1463
0
                    }
1464
0
1465
0
                    let _was_in = self
1466
0
                        .outgoing_requests
1467
0
                        .remove(&(connection_id, substream_id));
1468
0
                    debug_assert!(_was_in);
1469
1470
0
                    Event::Response {
1471
0
                        substream_id,
1472
0
                        response: response.map_err(RequestError::Substream),
1473
0
                    }
1474
                }
1475
                ConnectionToCoordinatorInner::NotificationsInOpen {
1476
0
                    id: inner_substream_id,
1477
0
                    handshake,
1478
                } => {
1479
                    // Ignore events if a shutdown has been initiated by the coordinator.
1480
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1481
0
                        connection.state
1482
                    {
1483
0
                        debug_assert!(api_initiated);
1484
0
                        continue;
1485
0
                    }
1486
0
1487
0
                    let substream_id = self
1488
0
                        .ingoing_negotiated_substreams_by_connection
1489
0
                        .remove(&(connection_id, inner_substream_id))
1490
0
                        .unwrap_or_else(|| unreachable!());
Unexecuted instantiation: _RNCNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events2_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_events2_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkppE10next_events2_0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB7_7NetworkINtNtNtBb_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB7_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_events2_0CsiUjFBJteJ7x_17smoldot_full_node
1491
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1492
0
                    debug_assert!(_was_in.is_some());
1493
1494
0
                    self.ingoing_notification_substreams.insert(
1495
0
                        substream_id,
1496
0
                        (connection_id, SubstreamState::Pending, inner_substream_id),
1497
0
                    );
1498
0
                    self.ingoing_notification_substreams_by_connection
1499
0
                        .insert((connection_id, inner_substream_id), substream_id);
1500
0
1501
0
                    Event::NotificationsInOpen {
1502
0
                        substream_id,
1503
0
                        remote_handshake: handshake,
1504
0
                    }
1505
                }
1506
                ConnectionToCoordinatorInner::NotificationsInOpenCancel {
1507
0
                    id: inner_substream_id,
1508
                    ..
1509
                } => {
1510
                    // Ignore events if a shutdown has been initiated by the coordinator.
1511
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1512
0
                        connection.state
1513
                    {
1514
0
                        debug_assert!(api_initiated);
1515
0
                        continue;
1516
0
                    }
1517
1518
                    // The event might concern a substream that we have already accepted or
1519
                    // refused. In that situation, either reinterpret the event as
1520
                    // "NotificationsInClose" or discard it.
1521
0
                    if let Some(substream_id) = self
1522
0
                        .ingoing_notification_substreams_by_connection
1523
0
                        .remove(&(connection_id, inner_substream_id))
1524
                    {
1525
0
                        let (_, state, _) = self
1526
0
                            .ingoing_notification_substreams
1527
0
                            .remove(&substream_id)
1528
0
                            .unwrap();
1529
0
                        match state {
1530
                            SubstreamState::Open | SubstreamState::RequestedClosing => {
1531
0
                                Event::NotificationsInClose {
1532
0
                                    substream_id,
1533
0
                                    outcome: Err(NotificationsInClosedErr::Substream(
1534
0
                                        established::NotificationsInClosedErr::SubstreamReset,
1535
0
                                    )),
1536
0
                                }
1537
                            }
1538
                            SubstreamState::Pending => {
1539
0
                                Event::NotificationsInOpenCancel { substream_id }
1540
                            }
1541
                        }
1542
                    } else {
1543
                        // Substream was refused. As documented, we must confirm the reception of
1544
                        // the event by sending back a rejection.
1545
0
                        self.messages_to_connections.push_back((
1546
0
                            connection_id,
1547
0
                            CoordinatorToConnectionInner::RejectInNotifications {
1548
0
                                substream_id: inner_substream_id,
1549
0
                            },
1550
0
                        ));
1551
0
                        continue;
1552
                    }
1553
                }
1554
                ConnectionToCoordinatorInner::NotificationIn {
1555
0
                    id: inner_substream_id,
1556
0
                    notification,
1557
                } => {
1558
                    // Ignore events if a shutdown has been initiated by the coordinator.
1559
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1560
0
                        connection.state
1561
                    {
1562
0
                        debug_assert!(api_initiated);
1563
0
                        continue;
1564
0
                    }
1565
0
1566
0
                    let substream_id = *self
1567
0
                        .ingoing_notification_substreams_by_connection
1568
0
                        .get(&(connection_id, inner_substream_id))
1569
0
                        .unwrap();
1570
0
1571
0
                    Event::NotificationsIn {
1572
0
                        substream_id,
1573
0
                        notification,
1574
0
                    }
1575
                }
1576
                ConnectionToCoordinatorInner::NotificationsInClose {
1577
0
                    id: inner_substream_id,
1578
0
                    outcome,
1579
                    ..
1580
                } => {
1581
                    // Ignore events if a shutdown has been initiated by the coordinator.
1582
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1583
0
                        connection.state
1584
                    {
1585
0
                        debug_assert!(api_initiated);
1586
0
                        continue;
1587
0
                    }
1588
0
1589
0
                    let substream_id = self
1590
0
                        .ingoing_notification_substreams_by_connection
1591
0
                        .remove(&(connection_id, inner_substream_id))
1592
0
                        .unwrap();
1593
0
                    let (_, state, _) = self
1594
0
                        .ingoing_notification_substreams
1595
0
                        .remove(&substream_id)
1596
0
                        .unwrap();
1597
0
                    debug_assert!(matches!(
1598
0
                        state,
1599
                        SubstreamState::Open | SubstreamState::RequestedClosing
1600
                    ));
1601
1602
0
                    if let SubstreamState::Open = state {
1603
0
                        // As documented, we must confirm the reception of the event by sending
1604
0
                        // back a rejection, provided that no such event has been sent beforehand.
1605
0
                        self.messages_to_connections.push_back((
1606
0
                            connection_id,
1607
0
                            CoordinatorToConnectionInner::CloseInNotifications {
1608
0
                                substream_id: inner_substream_id,
1609
0
                                timeout: Duration::new(0, 0),
1610
0
                            },
1611
0
                        ));
1612
0
                    }
1613
1614
0
                    Event::NotificationsInClose {
1615
0
                        substream_id,
1616
0
                        outcome: outcome.map_err(NotificationsInClosedErr::Substream),
1617
0
                    }
1618
                }
1619
                ConnectionToCoordinatorInner::NotificationsOutResult {
1620
0
                    id: substream_id,
1621
0
                    result,
1622
                } => {
1623
                    // Ignore events if a shutdown has been initiated by the coordinator.
1624
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1625
0
                        connection.state
1626
                    {
1627
0
                        debug_assert!(api_initiated);
1628
0
                        continue;
1629
0
                    }
1630
1631
0
                    let mut entry = match self.outgoing_notification_substreams.entry(substream_id)
1632
                    {
1633
0
                        hashbrown::hash_map::Entry::Occupied(e) => e,
1634
                        hashbrown::hash_map::Entry::Vacant(_) => {
1635
                            // This can be reached if the API user closed the substream while it
1636
                            // was being open.
1637
0
                            continue;
1638
                        }
1639
                    };
1640
1641
0
                    debug_assert!(matches!(entry.get_mut().1, SubstreamState::Pending));
1642
1643
0
                    if result.is_ok() {
1644
0
                        entry.insert((connection_id, SubstreamState::Open));
1645
0
                    } else {
1646
0
                        entry.remove();
1647
0
1648
0
                        let _was_removed = self
1649
0
                            .outgoing_notification_substreams_by_connection
1650
0
                            .remove(&(connection_id, substream_id));
1651
0
                        debug_assert!(_was_removed);
1652
                    }
1653
1654
0
                    Event::NotificationsOutResult {
1655
0
                        substream_id,
1656
0
                        result,
1657
0
                    }
1658
                }
1659
                ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
1660
0
                    id: substream_id,
1661
                    ..
1662
                } => {
1663
                    // Ignore events if a shutdown has been initiated by the coordinator.
1664
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1665
0
                        connection.state
1666
                    {
1667
0
                        debug_assert!(api_initiated);
1668
0
                        continue;
1669
0
                    }
1670
0
1671
0
                    match self.outgoing_notification_substreams.get(&substream_id) {
1672
0
                        Some((_connection_id, _substream_state)) => {
1673
0
                            debug_assert_eq!(*_connection_id, connection_id);
1674
0
                            debug_assert!(matches!(_substream_state, SubstreamState::Open));
1675
                        }
1676
                        None => {
1677
                            // The substream might already have been destroyed if the user closed
1678
                            // the substream while this message was pending in the queue.
1679
0
                            continue;
1680
                        }
1681
                    }
1682
1683
0
                    Event::NotificationsOutCloseDemanded { substream_id }
1684
                }
1685
0
                ConnectionToCoordinatorInner::NotificationsOutReset { id: substream_id } => {
1686
                    // Ignore events if a shutdown has been initiated by the coordinator.
1687
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1688
0
                        connection.state
1689
                    {
1690
0
                        debug_assert!(api_initiated);
1691
0
                        continue;
1692
0
                    }
1693
0
1694
0
                    match self.outgoing_notification_substreams.remove(&substream_id) {
1695
0
                        Some((_connection_id, _substream_state)) => {
1696
0
                            debug_assert_eq!(_connection_id, connection_id);
1697
0
                            debug_assert!(matches!(_substream_state, SubstreamState::Open));
1698
                        }
1699
                        None => {
1700
                            // The substream might already have been destroyed if the user closed
1701
                            // the substream while this message was pending in the queue.
1702
0
                            continue;
1703
                        }
1704
                    }
1705
1706
0
                    let _was_removed = self
1707
0
                        .outgoing_notification_substreams_by_connection
1708
0
                        .remove(&(connection_id, substream_id));
1709
0
                    debug_assert!(_was_removed);
1710
1711
0
                    Event::NotificationsOutReset { substream_id }
1712
                }
1713
0
                ConnectionToCoordinatorInner::PingOutSuccess { ping_time } => {
1714
                    // Ignore events if a shutdown has been initiated by the coordinator.
1715
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1716
0
                        connection.state
1717
                    {
1718
0
                        debug_assert!(api_initiated);
1719
0
                        continue;
1720
0
                    }
1721
0
1722
0
                    Event::PingOutSuccess {
1723
0
                        id: connection_id,
1724
0
                        ping_time,
1725
0
                    }
1726
                }
1727
                ConnectionToCoordinatorInner::PingOutFailed => {
1728
                    // Ignore events if a shutdown has been initiated by the coordinator.
1729
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1730
0
                        connection.state
1731
                    {
1732
0
                        debug_assert!(api_initiated);
1733
0
                        continue;
1734
0
                    }
1735
0
1736
0
                    Event::PingOutFailed { id: connection_id }
1737
                }
1738
            });
1739
        }
1740
147
    }
Unexecuted instantiation: _RNvMs0_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionINtB5_7NetworkppE10next_eventB9_
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationE10next_eventCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkppE10next_eventB9_
_RNvMs0_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantE10next_eventCsiUjFBJteJ7x_17smoldot_full_node
Line
Count
Source
1095
147
    pub fn next_event(&mut self) -> Option<Event<TConn>> {
1096
        loop {
1097
147
            if let Some(
connection_id0
) = self.shutdown_finished_connections.pop_front() {
1098
0
                let connection = self.connections.remove(&connection_id).unwrap();
1099
0
                let was_established = match &connection.state {
1100
                    InnerConnectionState::ShuttingDown {
1101
0
                        was_established, ..
1102
0
                    } => *was_established,
1103
0
                    _ => unreachable!(),
1104
                };
1105
1106
0
                return Some(Event::Shutdown {
1107
0
                    id: connection_id,
1108
0
                    was_established,
1109
0
                    user_data: connection.user_data,
1110
0
                });
1111
147
            }
1112
1113
            // When a connection starts its shutdown, its id is put in `shutting_down_connection`.
1114
            // When that happens, we go through the local state and clean up all requests and
1115
            // notification substreams that are in progress/open and return the cancellations
1116
            // as events.
1117
            //
1118
            // `shutting_down_connection` is set back to `None` only if it turns out that there
1119
            // is no request or notification substream in progress/open anymore.
1120
147
            if let Some(
shutting_down_connection0
) = self.shutting_down_connection {
1121
                // Outgoing notification substreams to close.
1122
0
                if let Some((_, substream_id)) = self
1123
0
                    .outgoing_notification_substreams_by_connection
1124
0
                    .range(
1125
0
                        (shutting_down_connection, SubstreamId::MIN)
1126
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1127
0
                    )
1128
0
                    .cloned()
1129
0
                    .next()
1130
                {
1131
0
                    self.outgoing_notification_substreams_by_connection
1132
0
                        .remove(&(shutting_down_connection, substream_id));
1133
0
                    let (_, state) = self
1134
0
                        .outgoing_notification_substreams
1135
0
                        .remove(&substream_id)
1136
0
                        .unwrap();
1137
0
                    return Some(match state {
1138
0
                        SubstreamState::Open => Event::NotificationsOutReset { substream_id },
1139
0
                        SubstreamState::Pending => Event::NotificationsOutResult {
1140
0
                            substream_id,
1141
0
                            result: Err(NotificationsOutErr::ConnectionShutdown),
1142
0
                        },
1143
0
                        SubstreamState::RequestedClosing => unreachable!(), // Never set for outgoing notification substreams.
1144
                    });
1145
0
                }
1146
1147
                // Ingoing notification substreams to close.
1148
0
                if let Some((key, substream_id)) = self
1149
0
                    .ingoing_notification_substreams_by_connection
1150
0
                    .range(
1151
0
                        (shutting_down_connection, established::SubstreamId::MIN)
1152
0
                            ..=(shutting_down_connection, established::SubstreamId::MAX),
1153
0
                    )
1154
0
                    .map(|(k, v)| (*k, *v))
1155
0
                    .next()
1156
                {
1157
0
                    let (_, state, _) = self
1158
0
                        .ingoing_notification_substreams
1159
0
                        .remove(&substream_id)
1160
0
                        .unwrap();
1161
0
                    self.ingoing_notification_substreams_by_connection
1162
0
                        .remove(&key)
1163
0
                        .unwrap();
1164
0
1165
0
                    return Some(match state {
1166
                        SubstreamState::Open | SubstreamState::RequestedClosing => {
1167
0
                            Event::NotificationsInClose {
1168
0
                                substream_id,
1169
0
                                outcome: Err(NotificationsInClosedErr::ConnectionShutdown),
1170
0
                            }
1171
                        }
1172
                        SubstreamState::Pending => {
1173
0
                            Event::NotificationsInOpenCancel { substream_id }
1174
                        }
1175
                    });
1176
0
                }
1177
1178
                // Find outgoing requests to cancel.
1179
0
                if let Some((_, substream_id)) = self
1180
0
                    .outgoing_requests
1181
0
                    .range(
1182
0
                        (shutting_down_connection, SubstreamId::MIN)
1183
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1184
0
                    )
1185
0
                    .next()
1186
                {
1187
0
                    let substream_id = *substream_id;
1188
0
                    self.outgoing_requests
1189
0
                        .remove(&(shutting_down_connection, substream_id));
1190
0
1191
0
                    return Some(Event::Response {
1192
0
                        substream_id,
1193
0
                        response: Err(RequestError::ConnectionShutdown),
1194
0
                    });
1195
0
                }
1196
1197
                // Find ingoing requests to cancel.
1198
0
                if let Some((_, substream_id)) = self
1199
0
                    .ingoing_requests_by_connection
1200
0
                    .range(
1201
0
                        (shutting_down_connection, SubstreamId::MIN)
1202
0
                            ..=(shutting_down_connection, SubstreamId::MAX),
1203
0
                    )
1204
0
                    .next()
1205
                {
1206
0
                    let substream_id = *substream_id;
1207
0
1208
0
                    let _was_in = self.ingoing_requests.remove(&substream_id);
1209
0
                    debug_assert!(_was_in.is_some());
1210
0
                    let _was_in = self
1211
0
                        .ingoing_requests_by_connection
1212
0
                        .remove(&(shutting_down_connection, substream_id));
1213
0
                    debug_assert!(_was_in);
1214
1215
0
                    return Some(Event::RequestInCancel { substream_id });
1216
0
                }
1217
1218
                // Find ingoing negotiated substreams to cancel.
1219
0
                if let Some((key, substream_id)) = self
1220
0
                    .ingoing_negotiated_substreams_by_connection
1221
0
                    .range(
1222
0
                        (shutting_down_connection, established::SubstreamId::MIN)
1223
0
                            ..=(shutting_down_connection, established::SubstreamId::MAX),
1224
0
                    )
1225
0
                    .map(|(k, v)| (*k, *v))
1226
0
                    .next()
1227
                {
1228
0
                    let Some((_, _, was_accepted)) =
1229
0
                        self.ingoing_negotiated_substreams.remove(&substream_id)
1230
                    else {
1231
0
                        unreachable!()
1232
                    };
1233
0
                    let _was_in = self
1234
0
                        .ingoing_negotiated_substreams_by_connection
1235
0
                        .remove(&key);
1236
0
                    debug_assert!(_was_in.is_some());
1237
1238
0
                    if was_accepted {
1239
0
                        return Some(Event::InboundAcceptedCancel { substream_id });
1240
                    } else {
1241
0
                        return Some(Event::InboundNegotiatedCancel { substream_id });
1242
                    }
1243
0
                }
1244
0
1245
0
                // If this is reached, this connection has no more request or notifications
1246
0
                // substream that is still in progress or open. The connection is successfully
1247
0
                // shut down.
1248
0
                self.shutting_down_connection = None;
1249
147
            }
1250
1251
            // Now actually process messages.
1252
147
            let (
connection_id, message0
) = self.pending_incoming_messages.pop_front()?;
1253
0
            let connection = &mut self.connections.get_mut(&connection_id).unwrap();
1254
0
1255
0
            break Some(match message {
1256
0
                ConnectionToCoordinatorInner::StartShutdown(reason) => {
1257
                    // The `ConnectionToCoordinator` message contains a shutdown reason if
1258
                    // and only if it sends `StartShutdown` as a response to a shutdown
1259
                    // initiated by the remote. If the shutdown was initiated locally
1260
                    // (`api_initiated` is `true`), then it can contain `None`, but it can also
1261
                    // contain `Some` in case the shutdown was initiated by the remote at the same
1262
                    // time as it was initiated locally.
1263
1264
0
                    let report_event = match &mut connection.state {
1265
                        InnerConnectionState::ShuttingDown {
1266
                            api_initiated: true,
1267
                            ..
1268
0
                        } => false,
1269
                        InnerConnectionState::ShuttingDown {
1270
                            api_initiated: false,
1271
                            ..
1272
0
                        } => unreachable!(),
1273
0
                        st @ InnerConnectionState::Handshaking => {
1274
0
                            *st = InnerConnectionState::ShuttingDown {
1275
0
                                api_initiated: false,
1276
0
                                was_established: false,
1277
0
                            };
1278
0
                            true
1279
                        }
1280
0
                        st @ InnerConnectionState::Established => {
1281
0
                            *st = InnerConnectionState::ShuttingDown {
1282
0
                                api_initiated: false,
1283
0
                                was_established: true,
1284
0
                            };
1285
0
                            true
1286
                        }
1287
                    };
1288
1289
                    // Control flow can't reach here if `shutting_down_connection` is ̀`Some`.
1290
0
                    debug_assert!(self.shutting_down_connection.is_none());
1291
0
                    self.shutting_down_connection = Some(connection_id);
1292
0
1293
0
                    if !report_event {
1294
                        // No `StartShutdown` event is generated if the API user has started
1295
                        // the shutdown themselves. In that case, `StartShutdown` is merely a
1296
                        // confirmation.
1297
0
                        continue;
1298
                    } else {
1299
0
                        Event::StartShutdown {
1300
0
                            id: connection_id,
1301
0
                            reason: reason.unwrap(), // See comment above.
1302
0
                        }
1303
                    }
1304
                }
1305
                ConnectionToCoordinatorInner::ShutdownFinished => {
1306
0
                    self.messages_to_connections.push_back((
1307
0
                        connection_id,
1308
0
                        CoordinatorToConnectionInner::ShutdownFinishedAck,
1309
0
                    ));
1310
0
                    continue;
1311
                }
1312
0
                ConnectionToCoordinatorInner::HandshakeFinished(peer_id) => {
1313
0
                    debug_assert_eq!(
1314
0
                        self.ingoing_notification_substreams_by_connection
1315
0
                            .range(
1316
0
                                (connection_id, established::SubstreamId::MIN)
1317
0
                                    ..=(connection_id, established::SubstreamId::MAX)
1318
0
                            )
1319
0
                            .count(),
1320
                        0
1321
                    );
1322
0
                    debug_assert_eq!(
1323
0
                        self.outgoing_notification_substreams_by_connection
1324
0
                            .range(
1325
0
                                (connection_id, SubstreamId::MIN)
1326
0
                                    ..=(connection_id, SubstreamId::MAX)
1327
0
                            )
1328
0
                            .count(),
1329
                        0
1330
                    );
1331
0
                    debug_assert_eq!(
1332
0
                        self.ingoing_requests_by_connection
1333
0
                            .range(
1334
0
                                (connection_id, SubstreamId::MIN)
1335
0
                                    ..=(connection_id, SubstreamId::MAX)
1336
0
                            )
1337
0
                            .count(),
1338
                        0
1339
                    );
1340
1341
0
                    match &mut connection.state {
1342
                        InnerConnectionState::ShuttingDown {
1343
0
                            was_established,
1344
0
                            api_initiated,
1345
0
                        } => {
1346
0
                            debug_assert!(!*was_established);
1347
0
                            debug_assert!(*api_initiated);
1348
0
                            continue;
1349
                        }
1350
0
                        st @ InnerConnectionState::Handshaking => {
1351
0
                            *st = InnerConnectionState::Established
1352
                        }
1353
0
                        InnerConnectionState::Established => unreachable!(),
1354
                    }
1355
1356
0
                    Event::HandshakeFinished {
1357
0
                        id: connection_id,
1358
0
                        peer_id,
1359
0
                    }
1360
                }
1361
0
                ConnectionToCoordinatorInner::InboundError(error) => {
1362
                    // Ignore events if a shutdown has been initiated by the coordinator.
1363
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1364
0
                        connection.state
1365
                    {
1366
0
                        debug_assert!(api_initiated);
1367
0
                        continue;
1368
0
                    }
1369
0
1370
0
                    Event::InboundError {
1371
0
                        id: connection_id,
1372
0
                        error,
1373
0
                    }
1374
                }
1375
                ConnectionToCoordinatorInner::InboundNegotiated {
1376
0
                    id: connection_substream_id,
1377
0
                    protocol_name,
1378
                } => {
1379
                    // Ignore events if a shutdown has been initiated by the coordinator.
1380
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1381
0
                        connection.state
1382
                    {
1383
0
                        debug_assert!(api_initiated);
1384
0
                        continue;
1385
0
                    }
1386
0
1387
0
                    let substream_id = self.next_substream_id;
1388
0
                    self.next_substream_id.0 += 1;
1389
0
1390
0
                    self.ingoing_negotiated_substreams.insert(
1391
0
                        substream_id,
1392
0
                        (connection_id, connection_substream_id, false),
1393
0
                    );
1394
0
                    self.ingoing_negotiated_substreams_by_connection
1395
0
                        .insert((connection_id, connection_substream_id), substream_id);
1396
0
1397
0
                    Event::InboundNegotiated {
1398
0
                        id: connection_id,
1399
0
                        substream_id,
1400
0
                        protocol_name,
1401
0
                    }
1402
                }
1403
                ConnectionToCoordinatorInner::InboundAcceptedCancel {
1404
0
                    id: connection_substream_id,
1405
                } => {
1406
                    // Ignore events if a shutdown has been initiated by the coordinator.
1407
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1408
0
                        connection.state
1409
                    {
1410
0
                        debug_assert!(api_initiated);
1411
0
                        continue;
1412
0
                    }
1413
0
1414
0
                    let substream_id = self
1415
0
                        .ingoing_negotiated_substreams_by_connection
1416
0
                        .remove(&(connection_id, connection_substream_id))
1417
0
                        .unwrap_or_else(|| unreachable!());
1418
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1419
0
                    debug_assert!(_was_in.is_some());
1420
1421
0
                    Event::InboundAcceptedCancel { substream_id }
1422
                }
1423
                ConnectionToCoordinatorInner::RequestIn {
1424
0
                    id: connection_substream_id,
1425
0
                    request,
1426
                } => {
1427
                    // Ignore events if a shutdown has been initiated by the coordinator.
1428
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1429
0
                        connection.state
1430
                    {
1431
0
                        debug_assert!(api_initiated);
1432
0
                        continue;
1433
0
                    }
1434
0
1435
0
                    let substream_id = self
1436
0
                        .ingoing_negotiated_substreams_by_connection
1437
0
                        .remove(&(connection_id, connection_substream_id))
1438
0
                        .unwrap_or_else(|| unreachable!());
1439
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1440
0
                    debug_assert!(_was_in.is_some());
1441
1442
0
                    self.ingoing_requests
1443
0
                        .insert(substream_id, (connection_id, connection_substream_id));
1444
0
                    self.ingoing_requests_by_connection
1445
0
                        .insert((connection_id, substream_id));
1446
0
1447
0
                    Event::RequestIn {
1448
0
                        substream_id,
1449
0
                        request_payload: request,
1450
0
                    }
1451
                }
1452
                ConnectionToCoordinatorInner::Response {
1453
0
                    id: substream_id,
1454
0
                    response,
1455
                    ..
1456
                } => {
1457
                    // Ignore events if a shutdown has been initiated by the coordinator.
1458
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1459
0
                        connection.state
1460
                    {
1461
0
                        debug_assert!(api_initiated);
1462
0
                        continue;
1463
0
                    }
1464
0
1465
0
                    let _was_in = self
1466
0
                        .outgoing_requests
1467
0
                        .remove(&(connection_id, substream_id));
1468
0
                    debug_assert!(_was_in);
1469
1470
0
                    Event::Response {
1471
0
                        substream_id,
1472
0
                        response: response.map_err(RequestError::Substream),
1473
0
                    }
1474
                }
1475
                ConnectionToCoordinatorInner::NotificationsInOpen {
1476
0
                    id: inner_substream_id,
1477
0
                    handshake,
1478
                } => {
1479
                    // Ignore events if a shutdown has been initiated by the coordinator.
1480
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1481
0
                        connection.state
1482
                    {
1483
0
                        debug_assert!(api_initiated);
1484
0
                        continue;
1485
0
                    }
1486
0
1487
0
                    let substream_id = self
1488
0
                        .ingoing_negotiated_substreams_by_connection
1489
0
                        .remove(&(connection_id, inner_substream_id))
1490
0
                        .unwrap_or_else(|| unreachable!());
1491
0
                    let _was_in = self.ingoing_negotiated_substreams.remove(&substream_id);
1492
0
                    debug_assert!(_was_in.is_some());
1493
1494
0
                    self.ingoing_notification_substreams.insert(
1495
0
                        substream_id,
1496
0
                        (connection_id, SubstreamState::Pending, inner_substream_id),
1497
0
                    );
1498
0
                    self.ingoing_notification_substreams_by_connection
1499
0
                        .insert((connection_id, inner_substream_id), substream_id);
1500
0
1501
0
                    Event::NotificationsInOpen {
1502
0
                        substream_id,
1503
0
                        remote_handshake: handshake,
1504
0
                    }
1505
                }
1506
                ConnectionToCoordinatorInner::NotificationsInOpenCancel {
1507
0
                    id: inner_substream_id,
1508
                    ..
1509
                } => {
1510
                    // Ignore events if a shutdown has been initiated by the coordinator.
1511
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1512
0
                        connection.state
1513
                    {
1514
0
                        debug_assert!(api_initiated);
1515
0
                        continue;
1516
0
                    }
1517
1518
                    // The event might concern a substream that we have already accepted or
1519
                    // refused. In that situation, either reinterpret the event as
1520
                    // "NotificationsInClose" or discard it.
1521
0
                    if let Some(substream_id) = self
1522
0
                        .ingoing_notification_substreams_by_connection
1523
0
                        .remove(&(connection_id, inner_substream_id))
1524
                    {
1525
0
                        let (_, state, _) = self
1526
0
                            .ingoing_notification_substreams
1527
0
                            .remove(&substream_id)
1528
0
                            .unwrap();
1529
0
                        match state {
1530
                            SubstreamState::Open | SubstreamState::RequestedClosing => {
1531
0
                                Event::NotificationsInClose {
1532
0
                                    substream_id,
1533
0
                                    outcome: Err(NotificationsInClosedErr::Substream(
1534
0
                                        established::NotificationsInClosedErr::SubstreamReset,
1535
0
                                    )),
1536
0
                                }
1537
                            }
1538
                            SubstreamState::Pending => {
1539
0
                                Event::NotificationsInOpenCancel { substream_id }
1540
                            }
1541
                        }
1542
                    } else {
1543
                        // Substream was refused. As documented, we must confirm the reception of
1544
                        // the event by sending back a rejection.
1545
0
                        self.messages_to_connections.push_back((
1546
0
                            connection_id,
1547
0
                            CoordinatorToConnectionInner::RejectInNotifications {
1548
0
                                substream_id: inner_substream_id,
1549
0
                            },
1550
0
                        ));
1551
0
                        continue;
1552
                    }
1553
                }
1554
                ConnectionToCoordinatorInner::NotificationIn {
1555
0
                    id: inner_substream_id,
1556
0
                    notification,
1557
                } => {
1558
                    // Ignore events if a shutdown has been initiated by the coordinator.
1559
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1560
0
                        connection.state
1561
                    {
1562
0
                        debug_assert!(api_initiated);
1563
0
                        continue;
1564
0
                    }
1565
0
1566
0
                    let substream_id = *self
1567
0
                        .ingoing_notification_substreams_by_connection
1568
0
                        .get(&(connection_id, inner_substream_id))
1569
0
                        .unwrap();
1570
0
1571
0
                    Event::NotificationsIn {
1572
0
                        substream_id,
1573
0
                        notification,
1574
0
                    }
1575
                }
1576
                ConnectionToCoordinatorInner::NotificationsInClose {
1577
0
                    id: inner_substream_id,
1578
0
                    outcome,
1579
                    ..
1580
                } => {
1581
                    // Ignore events if a shutdown has been initiated by the coordinator.
1582
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1583
0
                        connection.state
1584
                    {
1585
0
                        debug_assert!(api_initiated);
1586
0
                        continue;
1587
0
                    }
1588
0
1589
0
                    let substream_id = self
1590
0
                        .ingoing_notification_substreams_by_connection
1591
0
                        .remove(&(connection_id, inner_substream_id))
1592
0
                        .unwrap();
1593
0
                    let (_, state, _) = self
1594
0
                        .ingoing_notification_substreams
1595
0
                        .remove(&substream_id)
1596
0
                        .unwrap();
1597
0
                    debug_assert!(matches!(
1598
0
                        state,
1599
                        SubstreamState::Open | SubstreamState::RequestedClosing
1600
                    ));
1601
1602
0
                    if let SubstreamState::Open = state {
1603
0
                        // As documented, we must confirm the reception of the event by sending
1604
0
                        // back a rejection, provided that no such event has been sent beforehand.
1605
0
                        self.messages_to_connections.push_back((
1606
0
                            connection_id,
1607
0
                            CoordinatorToConnectionInner::CloseInNotifications {
1608
0
                                substream_id: inner_substream_id,
1609
0
                                timeout: Duration::new(0, 0),
1610
0
                            },
1611
0
                        ));
1612
0
                    }
1613
1614
0
                    Event::NotificationsInClose {
1615
0
                        substream_id,
1616
0
                        outcome: outcome.map_err(NotificationsInClosedErr::Substream),
1617
0
                    }
1618
                }
1619
                ConnectionToCoordinatorInner::NotificationsOutResult {
1620
0
                    id: substream_id,
1621
0
                    result,
1622
                } => {
1623
                    // Ignore events if a shutdown has been initiated by the coordinator.
1624
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1625
0
                        connection.state
1626
                    {
1627
0
                        debug_assert!(api_initiated);
1628
0
                        continue;
1629
0
                    }
1630
1631
0
                    let mut entry = match self.outgoing_notification_substreams.entry(substream_id)
1632
                    {
1633
0
                        hashbrown::hash_map::Entry::Occupied(e) => e,
1634
                        hashbrown::hash_map::Entry::Vacant(_) => {
1635
                            // This can be reached if the API user closed the substream while it
1636
                            // was being open.
1637
0
                            continue;
1638
                        }
1639
                    };
1640
1641
0
                    debug_assert!(matches!(entry.get_mut().1, SubstreamState::Pending));
1642
1643
0
                    if result.is_ok() {
1644
0
                        entry.insert((connection_id, SubstreamState::Open));
1645
0
                    } else {
1646
0
                        entry.remove();
1647
0
1648
0
                        let _was_removed = self
1649
0
                            .outgoing_notification_substreams_by_connection
1650
0
                            .remove(&(connection_id, substream_id));
1651
0
                        debug_assert!(_was_removed);
1652
                    }
1653
1654
0
                    Event::NotificationsOutResult {
1655
0
                        substream_id,
1656
0
                        result,
1657
0
                    }
1658
                }
1659
                ConnectionToCoordinatorInner::NotificationsOutCloseDemanded {
1660
0
                    id: substream_id,
1661
                    ..
1662
                } => {
1663
                    // Ignore events if a shutdown has been initiated by the coordinator.
1664
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1665
0
                        connection.state
1666
                    {
1667
0
                        debug_assert!(api_initiated);
1668
0
                        continue;
1669
0
                    }
1670
0
1671
0
                    match self.outgoing_notification_substreams.get(&substream_id) {
1672
0
                        Some((_connection_id, _substream_state)) => {
1673
0
                            debug_assert_eq!(*_connection_id, connection_id);
1674
0
                            debug_assert!(matches!(_substream_state, SubstreamState::Open));
1675
                        }
1676
                        None => {
1677
                            // The substream might already have been destroyed if the user closed
1678
                            // the substream while this message was pending in the queue.
1679
0
                            continue;
1680
                        }
1681
                    }
1682
1683
0
                    Event::NotificationsOutCloseDemanded { substream_id }
1684
                }
1685
0
                ConnectionToCoordinatorInner::NotificationsOutReset { id: substream_id } => {
1686
                    // Ignore events if a shutdown has been initiated by the coordinator.
1687
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1688
0
                        connection.state
1689
                    {
1690
0
                        debug_assert!(api_initiated);
1691
0
                        continue;
1692
0
                    }
1693
0
1694
0
                    match self.outgoing_notification_substreams.remove(&substream_id) {
1695
0
                        Some((_connection_id, _substream_state)) => {
1696
0
                            debug_assert_eq!(_connection_id, connection_id);
1697
0
                            debug_assert!(matches!(_substream_state, SubstreamState::Open));
1698
                        }
1699
                        None => {
1700
                            // The substream might already have been destroyed if the user closed
1701
                            // the substream while this message was pending in the queue.
1702
0
                            continue;
1703
                        }
1704
                    }
1705
1706
0
                    let _was_removed = self
1707
0
                        .outgoing_notification_substreams_by_connection
1708
0
                        .remove(&(connection_id, substream_id));
1709
0
                    debug_assert!(_was_removed);
1710
1711
0
                    Event::NotificationsOutReset { substream_id }
1712
                }
1713
0
                ConnectionToCoordinatorInner::PingOutSuccess { ping_time } => {
1714
                    // Ignore events if a shutdown has been initiated by the coordinator.
1715
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1716
0
                        connection.state
1717
                    {
1718
0
                        debug_assert!(api_initiated);
1719
0
                        continue;
1720
0
                    }
1721
0
1722
0
                    Event::PingOutSuccess {
1723
0
                        id: connection_id,
1724
0
                        ping_time,
1725
0
                    }
1726
                }
1727
                ConnectionToCoordinatorInner::PingOutFailed => {
1728
                    // Ignore events if a shutdown has been initiated by the coordinator.
1729
0
                    if let InnerConnectionState::ShuttingDown { api_initiated, .. } =
1730
0
                        connection.state
1731
                    {
1732
0
                        debug_assert!(api_initiated);
1733
0
                        continue;
1734
0
                    }
1735
0
1736
0
                    Event::PingOutFailed { id: connection_id }
1737
                }
1738
            });
1739
        }
1740
147
    }
1741
}
1742
1743
impl<TConn, TNow> ops::Index<ConnectionId> for Network<TConn, TNow> {
1744
    type Output = TConn;
1745
0
    fn index(&self, id: ConnectionId) -> &TConn {
1746
0
        &self.connections.get(&id).unwrap().user_data
1747
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p10collections1_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexB9_
Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationEINtNtNtB2V_3ops5index5IndexNtB5_12ConnectionIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p10collections1_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexB9_
Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB5_12ConnectionIdE5indexCsiUjFBJteJ7x_17smoldot_full_node
1748
}
1749
1750
impl<TConn, TNow> ops::IndexMut<ConnectionId> for Network<TConn, TNow> {
1751
0
    fn index_mut(&mut self, id: ConnectionId) -> &mut TConn {
1752
0
        &mut self.connections.get_mut(&id).unwrap().user_data
1753
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p10collections2_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutB9_
Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsaYZPK01V26L_4core4time8DurationEINtNtNtB2V_3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p10collections2_0ppEINtB5_7NetworkppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutB9_
Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionINtB5_7NetworkINtNtNtB9_7network7service14ConnectionInfoINtCsldkcb8n7qNv_13async_channel6SenderNtB5_23CoordinatorToConnectionEENtNtCsbpXXxgr6u8g_3std4time7InstantEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB5_12ConnectionIdE9index_mutCsiUjFBJteJ7x_17smoldot_full_node
1754
}
1755
1756
/// See [`Network::connection_state`].
1757
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1758
pub struct ConnectionState {
1759
    /// If `true`, the connection has finished its handshaking phase.
1760
    pub established: bool,
1761
1762
    /// If `true`, the connection is shutting down.
1763
    pub shutting_down: bool,
1764
}
1765
1766
/// Message from a connection task destined to the coordinator.
1767
pub struct ConnectionToCoordinator {
1768
    inner: ConnectionToCoordinatorInner,
1769
}
1770
1771
enum ConnectionToCoordinatorInner {
1772
    HandshakeFinished(PeerId),
1773
1774
    /// See the corresponding event in [`established::Event`].
1775
    InboundError(established::InboundError),
1776
1777
    /// See the corresponding event in [`established::Event`].
1778
    InboundNegotiated {
1779
        id: established::SubstreamId,
1780
        protocol_name: String,
1781
    },
1782
1783
    /// See the corresponding event in [`established::Event`].
1784
    InboundAcceptedCancel {
1785
        id: established::SubstreamId,
1786
    },
1787
1788
    /// See the corresponding event in [`established::Event`].
1789
    RequestIn {
1790
        id: established::SubstreamId,
1791
        request: Vec<u8>,
1792
    },
1793
1794
    /// See the corresponding event in [`established::Event`].
1795
    Response {
1796
        response: Result<Vec<u8>, established::RequestError>,
1797
        id: SubstreamId,
1798
    },
1799
1800
    /// See the corresponding event in [`established::Event`].
1801
    NotificationsInOpen {
1802
        id: established::SubstreamId,
1803
        handshake: Vec<u8>,
1804
    },
1805
    /// See the corresponding event in [`established::Event`].
1806
    ///
1807
    /// The coordinator should be aware that, due to the asynchronous nature of communications, it
1808
    /// might receive this event after having sent a
1809
    /// [`CoordinatorToConnectionInner::AcceptInNotifications`] or
1810
    /// [`CoordinatorToConnectionInner::RejectInNotifications`]. In that situation, the coordinator
1811
    /// should either reinterpret the message as a `NotificationsInClose` (if it had accepted it)
1812
    /// or ignore it (if it had rejected it).
1813
    ///
1814
    /// The connection should be aware that, due to the asynchronous nature of communications, it
1815
    /// might later receive an [`CoordinatorToConnectionInner::AcceptInNotifications`] or
1816
    /// [`CoordinatorToConnectionInner::RejectInNotifications`] concerning this substream. In that
1817
    /// situation, the connection should ignore this message.
1818
    ///
1819
    /// Because substream IDs can be reused, this introduces an ambiguity in the following sequence
1820
    /// of events: send `NotificationsInOpen`, send `NotificationsInOpenCancel`, send
1821
    /// `NotificationsInOpen`, receive `AcceptInNotifications`. Does the `AcceptInNotifications`
1822
    /// refer to the first `NotificationsInOpen` or to the second?
1823
    /// In order to solve this problem, the coordinator must always send back a
1824
    /// [`CoordinatorToConnectionInner::RejectInNotifications`] in order to acknowledge a
1825
    /// `NotificationsInOpenCancel`.
1826
    NotificationsInOpenCancel {
1827
        id: established::SubstreamId,
1828
    },
1829
    /// See the corresponding event in [`established::Event`].
1830
    NotificationIn {
1831
        id: established::SubstreamId,
1832
        notification: Vec<u8>,
1833
    },
1834
    /// See the corresponding event in [`established::Event`].
1835
    ///
1836
    /// In order to avoid race conditions, this must always be acknowledged by sending back a
1837
    /// [`CoordinatorToConnectionInner::CloseInNotifications`] message if no such message was
1838
    /// sent in the past.
1839
    NotificationsInClose {
1840
        id: established::SubstreamId,
1841
        outcome: Result<(), established::NotificationsInClosedErr>,
1842
    },
1843
    /// See the corresponding event in [`established::Event`].
1844
    NotificationsOutResult {
1845
        id: SubstreamId,
1846
        result: Result<Vec<u8>, NotificationsOutErr>,
1847
    },
1848
    /// See the corresponding event in [`established::Event`].
1849
    NotificationsOutCloseDemanded {
1850
        id: SubstreamId,
1851
    },
1852
    /// See the corresponding event in [`established::Event`].
1853
    NotificationsOutReset {
1854
        id: SubstreamId,
1855
    },
1856
    /// See the corresponding event in [`established::Event`].
1857
    PingOutSuccess {
1858
        ping_time: Duration,
1859
    },
1860
    /// See the corresponding event in [`established::Event`].
1861
    PingOutFailed,
1862
1863
    /// Sent either in response to [`ConnectionToCoordinatorInner::StartShutdown`] (in which case
1864
    /// the content is `None`) or if the remote has initiated the shutdown (in which case the
1865
    /// content is `Some`). After this, no more [`ConnectionToCoordinatorInner`] will be sent
1866
    /// anymore except for [`ConnectionToCoordinatorInner::ShutdownFinished`].
1867
    StartShutdown(Option<ShutdownCause>),
1868
1869
    /// Shutdown has now finished. Always sent after
1870
    /// [`ConnectionToCoordinatorInner::StartShutdown`]. No message is sent by the connection
1871
    /// task anymore after that.
1872
    ///
1873
    /// Must be confirmed with a [`CoordinatorToConnectionInner::ShutdownFinishedAck`].
1874
    ShutdownFinished,
1875
}
1876
1877
/// Message from the coordinator destined to a connection task.
1878
pub struct CoordinatorToConnection {
1879
    inner: CoordinatorToConnectionInner,
1880
}
1881
1882
enum CoordinatorToConnectionInner {
1883
    /// Connection task must terminate. This is always sent back after a
1884
    /// [`ConnectionToCoordinatorInner::ShutdownFinished`].
1885
    ///
1886
    /// This final message is necessary in order to make sure that the coordinator doesn't
1887
    /// generate messages destined to a connection that isn't alive anymore.
1888
    ShutdownFinishedAck,
1889
1890
    /// Connection must start shutting down if it is not already the case.
1891
    /// Before of concurrency, it is possible for this message to be sent/received *after* a
1892
    /// [`ConnectionToCoordinatorInner::StartShutdown`] has been sent.
1893
    StartShutdown,
1894
1895
    AcceptInbound {
1896
        substream_id: established::SubstreamId,
1897
        /// Configuration of the protocol.
1898
        inbound_ty: InboundTy,
1899
    },
1900
    RejectInbound {
1901
        substream_id: established::SubstreamId,
1902
    },
1903
    SetMaxProtocolNameLen {
1904
        new_max_length: usize,
1905
    },
1906
1907
    StartRequest {
1908
        protocol_name: String,
1909
        request_data: Option<Vec<u8>>,
1910
        timeout: Duration,
1911
        max_response_size: usize,
1912
        /// Id of the substream assigned by the coordinator.
1913
        /// This is **not** the same as the actual substream used in the connection.
1914
        substream_id: SubstreamId,
1915
    },
1916
    OpenOutNotifications {
1917
        /// Id of the substream assigned by the coordinator.
1918
        /// This is **not** the same as the actual substream used in the connection.
1919
        substream_id: SubstreamId,
1920
        protocol_name: String,
1921
        max_handshake_size: usize,
1922
        handshake_timeout: Duration,
1923
        handshake: Vec<u8>,
1924
    },
1925
    CloseOutNotifications {
1926
        /// Id of the substream assigned by the coordinator.
1927
        /// This is **not** the same as the actual substream used in the connection.
1928
        substream_id: SubstreamId,
1929
    },
1930
    QueueNotification {
1931
        /// Id of the substream assigned by the coordinator.
1932
        /// This is **not** the same as the actual substream used in the connection.
1933
        substream_id: SubstreamId,
1934
        notification: Vec<u8>,
1935
    },
1936
    AcceptInNotifications {
1937
        substream_id: established::SubstreamId,
1938
        handshake: Vec<u8>,
1939
        max_notification_size: usize,
1940
    },
1941
    RejectInNotifications {
1942
        substream_id: established::SubstreamId,
1943
    },
1944
    CloseInNotifications {
1945
        substream_id: established::SubstreamId,
1946
        timeout: Duration,
1947
    },
1948
1949
    /// Answer an incoming request.
1950
    ///
1951
    /// Since the API doesn't provide any feedback about whether responses have been successfully
1952
    /// received by the remote, the response should simply be ignored in case the substream is
1953
    /// obsolete. In any case, answering an obsolete request is not an API error because the remote
1954
    /// might have canceled their request while the message containing the response was waiting
1955
    /// in queue.
1956
    AnswerRequest {
1957
        substream_id: established::SubstreamId,
1958
        response: Result<Vec<u8>, ()>,
1959
    },
1960
}
1961
1962
/// Event generated by [`Network::next_event`].
1963
#[derive(Debug)]
1964
pub enum Event<TConn> {
1965
    /// Handshake of the given connection has completed.
1966
    ///
1967
    /// This event can only happen once per connection and only for single-stream connections.
1968
    HandshakeFinished {
1969
        /// Identifier of the connection whose handshake is finished.
1970
        id: ConnectionId,
1971
        /// Identity of the peer on the other side of the connection.
1972
        peer_id: PeerId,
1973
    },
1974
1975
    /// A transport-level connection (e.g. a TCP socket) is starting its shutdown.
1976
    ///
1977
    /// It is no longer possible to start requests, open notification substreams, or open
1978
    /// notifications on this connection, and no new incoming requests or notification substreams
1979
    /// will be reported as events.
1980
    ///
1981
    /// Further events will close all existing substreams (requests and notifications) one by one.
1982
    /// Once all substreams have been closed, a [`Event::Shutdown`] is reported.
1983
    ///
1984
    /// Keep in mind that this event can also happen for connections that haven't finished their
1985
    /// handshake.
1986
    ///
1987
    /// This event is **not** generated when [`Network::start_shutdown`] is called.
1988
    StartShutdown {
1989
        /// Identifier of the connection that is starting its shutdown.
1990
        id: ConnectionId,
1991
        /// Reason why the connection is starting its shutdown. Because this event is not generated
1992
        /// when the shutdown is initiated locally, the reason is always cause by the remote.
1993
        reason: ShutdownCause,
1994
    },
1995
1996
    /// A transport-level connection (e.g. a TCP socket) has been shut down.
1997
    ///
1998
    /// This [`ConnectionId`] is no longer valid, and using it will result in panics.
1999
    Shutdown {
2000
        /// Identifier of the connection that has finished its shutdown.
2001
        id: ConnectionId,
2002
        /// `true` if the connection was in its established phase before the shutdown.
2003
        was_established: bool,
2004
        /// User data that was stored in the state machine for this connection.
2005
        user_data: TConn,
2006
    },
2007
2008
    /// Received an incoming substream, but this substream has produced an error.
2009
    ///
2010
    /// > **Note**: This event exists only for diagnostic purposes. No action is expected in
2011
    /// >           return.
2012
    InboundError {
2013
        /// Identifier of the connection that has received the substream.
2014
        id: ConnectionId,
2015
        /// Error that happened.
2016
        error: InboundError,
2017
    },
2018
2019
    /// An inbound substream has requested to use a protocol. Call [`Network::accept_inbound`] or
2020
    /// [`Network::reject_inbound`] in order to accept or reject this substream.
2021
    InboundNegotiated {
2022
        /// Identifier of the connection that has received the substream.
2023
        id: ConnectionId,
2024
        /// Identifier of the substream. Needs to be provided back when accepting or rejecting
2025
        /// the substream.
2026
        substream_id: SubstreamId,
2027
        /// Name of the protocol requested by the remote.
2028
        protocol_name: String,
2029
    },
2030
2031
    /// An inbound substream that is waiting for a call to [`Network::accept_inbound`] or
2032
    /// [`Network::reject_inbound`] has been abruptly closed.
2033
    InboundNegotiatedCancel {
2034
        /// Identifier of the substream.
2035
        substream_id: SubstreamId,
2036
    },
2037
2038
    /// An inbound substream that was previously accepted using [`Network::accept_inbound`] was
2039
    /// closed by the remote or has generated an error.
2040
    InboundAcceptedCancel {
2041
        /// Identifier of the substream.
2042
        substream_id: SubstreamId,
2043
    },
2044
2045
    /// Outcome of a request started using [`Network::start_request`].
2046
    ///
2047
    /// *All* requests always lead to an outcome, even if the connection has been closed while the
2048
    /// request was in progress.
2049
    Response {
2050
        /// Substream that was returned by [`Network::start_request`].
2051
        substream_id: SubstreamId,
2052
        /// If the request is successful, contains the response sent back by the remote. Otherwise,
2053
        /// contains the reason why the request isn't successful.
2054
        response: Result<Vec<u8>, RequestError>,
2055
    },
2056
2057
    /// Received a request from a request-response protocol.
2058
    RequestIn {
2059
        /// Substream on which the request has been received. Must be passed back when providing
2060
        /// the response.
2061
        substream_id: SubstreamId,
2062
        /// Payload that has been sent by the remote. Its interpretation is beyond the scope of
2063
        /// this module.
2064
        request_payload: Vec<u8>,
2065
    },
2066
2067
    /// Request received earlier has been canceled by the remote.
2068
    ///
2069
    /// The [`SubstreamId`] is now invalid.
2070
    RequestInCancel { substream_id: SubstreamId },
2071
2072
    /// Outcome of trying to open a substream with [`Network::open_out_notifications`].
2073
    ///
2074
    /// If `Ok`, it is now possible to send notifications on this substream.
2075
    /// If `Err`, the substream no longer exists and the [`SubstreamId`] becomes invalid.
2076
    NotificationsOutResult {
2077
        substream_id: SubstreamId,
2078
        /// If `Ok`, contains the handshake sent back by the remote. Its interpretation is out of
2079
        /// scope of this module.
2080
        result: Result<Vec<u8>, NotificationsOutErr>,
2081
    },
2082
2083
    /// Remote has closed an outgoing notifications substream, meaning that it demands the closing
2084
    /// of the substream. Use [`Network::close_out_notifications`] as soon as possible, which is
2085
    /// typically after all outbound notifications that need to be queued have been queued.
2086
    ///
2087
    /// This event is only generated for notification substreams that are fully open.
2088
    NotificationsOutCloseDemanded { substream_id: SubstreamId },
2089
2090
    /// A previously open outbound substream has been closed, by the remote or as a consequence of
2091
    /// the connection shutting down.
2092
    ///
2093
    /// This event is only generated for notification substreams that are fully open.
2094
    ///
2095
    /// The substream no longer exists and the [`SubstreamId`] becomes invalid.
2096
    NotificationsOutReset { substream_id: SubstreamId },
2097
2098
    /// The remote would like to open a notifications substream.
2099
    ///
2100
    /// The substream needs to be accepted or refused using [`Network::accept_in_notifications`]
2101
    /// or [`Network::reject_in_notifications`].
2102
    NotificationsInOpen {
2103
        /// Newly-generated identifier for the substream on which the request has been received.
2104
        /// Must be passed back when accepting or refusing the substream.
2105
        substream_id: SubstreamId,
2106
        /// Handshake that has been sent by the remote. Its interpretation is beyond the scope of
2107
        /// this module.
2108
        remote_handshake: Vec<u8>,
2109
    },
2110
2111
    /// The remote has canceled the opening an incoming notifications substream.
2112
    ///
2113
    /// This can only happen before the notification substream has been accepted or refused.
2114
    NotificationsInOpenCancel {
2115
        /// Substream that has been closed. Guaranteed to match a substream that was earlier
2116
        /// reported with a [`Event::NotificationsInOpen`].
2117
        substream_id: SubstreamId,
2118
    },
2119
2120
    /// Received a notification on a notifications substream of a connection.
2121
    NotificationsIn {
2122
        /// Substream on which the notification has been received. Guaranteed to be a substream
2123
        /// that has been accepted with [`Network::accept_in_notifications`].
2124
        substream_id: SubstreamId,
2125
        /// Notification that the remote has sent. The meaning of this data is out of scope of
2126
        /// this module.
2127
        notification: Vec<u8>,
2128
    },
2129
2130
    /// The remote has closed an incoming notifications substream.
2131
    ///
2132
    /// This can only happen after the notification substream has been accepted.
2133
    NotificationsInClose {
2134
        /// Substream that has been closed. Guaranteed to match a substream that was earlier
2135
        /// reported with a [`Event::NotificationsInOpen`].
2136
        substream_id: SubstreamId,
2137
        /// Reason why the substream has been closed.
2138
        outcome: Result<(), NotificationsInClosedErr>,
2139
    },
2140
2141
    /// An outgoing ping has succeeded. This event is generated automatically over time for each
2142
    /// connection in the collection.
2143
    PingOutSuccess {
2144
        id: ConnectionId,
2145
        /// Time between sending the ping and receiving the pong.
2146
        ping_time: Duration,
2147
    },
2148
    /// An outgoing ping has failed. This event is generated automatically over time for each
2149
    /// connection in the collection.
2150
    PingOutFailed { id: ConnectionId },
2151
}
2152
2153
/// Reason why a connection is shutting down. See [`Event::StartShutdown`].
2154
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsD_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_13ShutdownCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsD_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_13ShutdownCauseNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2155
pub enum ShutdownCause {
2156
    /// Shutdown was demanded by the remote and performed cleanly.
2157
    CleanShutdown,
2158
    /// Remote has abruptly reset the connection.
2159
    RemoteReset,
2160
    /// Error in the connection protocol of a fully established connection.
2161
    ProtocolError(established::Error),
2162
    /// Error in the protocol of the handshake.
2163
    HandshakeError(HandshakeError),
2164
    /// Handshake phase took too long.
2165
    HandshakeTimeout,
2166
}
2167
2168
0
#[derive(Debug, derive_more::Display, Clone)]
Unexecuted instantiation: _RNvXsF_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsF_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2169
pub enum RequestError {
2170
    /// Request has been canceled because the connection as a whole is being shut down.
2171
    ConnectionShutdown,
2172
2173
    /// Error happened in the context of the substream.
2174
    #[display(fmt = "{_0}")]
2175
    Substream(established::RequestError),
2176
}
2177
2178
impl RequestError {
2179
    /// Returns `true` if the error is caused by a faulty behavior by the remote. Returns `false`
2180
    /// if the error can happen in normal situations.
2181
0
    pub fn is_protocol_error(&self) -> bool {
2182
0
        match self {
2183
0
            RequestError::ConnectionShutdown => false,
2184
0
            RequestError::Substream(err) => err.is_protocol_error(),
2185
        }
2186
0
    }
Unexecuted instantiation: _RNvMs3_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_12RequestError17is_protocol_error
Unexecuted instantiation: _RNvMs3_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_12RequestError17is_protocol_error
2187
}
2188
2189
0
#[derive(Debug, derive_more::Display, Clone)]
Unexecuted instantiation: _RNvXsI_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsI_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2190
pub enum NotificationsOutErr {
2191
    /// Opening has been interrupted because the connection as a whole is being shut down.
2192
    ConnectionShutdown,
2193
2194
    /// Error happened in the context of the substream.
2195
    #[display(fmt = "{_0}")]
2196
    Substream(established::NotificationsOutErr),
2197
}
2198
2199
0
#[derive(Debug, derive_more::Display, Clone)]
Unexecuted instantiation: _RNvXsL_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsL_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2200
pub enum NotificationsInClosedErr {
2201
    /// Substream has been closed because the connection as a whole is being shut down.
2202
    ConnectionShutdown,
2203
2204
    /// Error happened in the context of the substream.
2205
    #[display(fmt = "{_0}")]
2206
    Substream(established::NotificationsInClosedErr),
2207
}
2208
2209
/// Error potentially returned by [`Network::queue_notification`].
2210
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsO_NtNtCsN16ciHI6Qf_7smoldot6libp2p10collectionNtB5_22QueueNotificationErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsO_NtNtCseuYC0Zibziv_7smoldot6libp2p10collectionNtB5_22QueueNotificationErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
2211
pub enum QueueNotificationError {
2212
    /// Queue of notifications with that peer is full.
2213
    QueueFull,
2214
}