Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/single_stream.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! State machine handling a single TCP or WebSocket libp2p connection.
19
//!
20
//! # About resources allocation and back-pressure
21
//!
22
//! In order to avoid DoS attacks, it is important, in networking code, to make sure that the
23
//! amount of memory allocated directly or indirectly by a connection stays bounded.
24
//!
25
//! The situations in the [`SingleStream`] that lead to an increase in memory consumption are:
26
//!
27
//! 1- On incoming or outgoing substreams.
28
//! 2- When sending a request or receiving a response in a request-response protocol.
29
//! 3- When sending a notification.
30
//! 4- When receiving a request and sending back a response.
31
//! 5- When receiving a notification.
32
//! // TODO: 6- on Yamux ping frames
33
//!
34
//! In order to solve 1-, there exists a maximum number of simultaneous substreams allowed by the
35
//! protocol, thereby guaranteeing that the memory consumption doesn't exceed a certain bound.
36
//! Since receiving a request and a response is a one-time process that occupies an entire
37
//! substream, allocations referenced by points 2- and 4- are also bounded thanks to this limit.
38
//! Request-response protocols enforce a limit to the size of the request and response, again
39
//! guaranteeing a bound on the memory consumption.
40
//!
41
//! In order to solve 3-, always use [`SingleStream::notification_substream_queued_bytes`] in order
42
//! to check the current amount of buffered data before calling
43
//! [`SingleStream::write_notification_unbounded`]. See the documentation of
44
//! [`SingleStream::write_notification_unbounded`] for more details.
45
//!
46
//! In order to solve 5-, // TODO: .
47
//!
48
49
// TODO: expand docs ^
50
51
// TODO: consider implementing on top of multi_stream
52
53
use super::{
54
    super::{super::read_write::ReadWrite, noise, yamux},
55
    substream::{self, RespondInRequestError},
56
    Config, Event, SubstreamId, SubstreamIdInner,
57
};
58
59
use alloc::{boxed::Box, string::String, vec::Vec};
60
use core::{
61
    fmt,
62
    num::{NonZeroU32, NonZeroUsize},
63
    ops::{Add, Index, IndexMut, Sub},
64
    time::Duration,
65
};
66
use rand_chacha::rand_core::{RngCore as _, SeedableRng as _};
67
68
pub use substream::InboundTy;
69
70
/// State machine of a fully-established connection.
71
pub struct SingleStream<TNow, TSubUd> {
72
    /// Encryption layer applied directly on top of the incoming data and outgoing data.
73
    encryption: noise::Noise,
74
75
    /// Extra fields. Segregated in order to solve borrowing questions.
76
    inner: Box<Inner<TNow, TSubUd>>,
77
}
78
79
/// Extra fields. Segregated in order to solve borrowing questions.
80
struct Inner<TNow, TSubUd> {
81
    /// State of the various substreams of the connection.
82
    /// Consists in a collection of substreams, each of which holding a [`substream::Substream`]
83
    /// object, or `None` if the substream has been reset.
84
    yamux: yamux::Yamux<TNow, Option<(substream::Substream<TNow>, Option<TSubUd>)>>,
85
86
    /// Substream in [`Inner::yamux`] used for outgoing pings.
87
    ///
88
    /// Because of the API of [`substream::Substream`] concerning pings, there is no need to
89
    /// handle situations where the substream fails to negotiate, as this is handled by making
90
    /// outgoing pings error. This substream is therefore constant.
91
    ///
92
    /// It is possible, however, that the remote resets the ping substream. In other words, this
93
    /// substream might not be found in [`Inner::yamux`]. When that happens, all outgoing pings
94
    /// are immediately considered as failed.
95
    outgoing_pings: yamux::SubstreamId,
96
    /// When to start the next ping attempt.
97
    next_ping: TNow,
98
    /// Source of randomness to generate ping payloads.
99
    ///
100
    /// Note that we use ChaCha20 because the rest of the code base also uses ChaCha20. This avoids
101
    /// unnecessary code being included in the binary and reduces the binary size.
102
    ping_payload_randomness: rand_chacha::ChaCha20Rng,
103
104
    /// See [`Config::max_inbound_substreams`].
105
    max_inbound_substreams: usize,
106
    /// See [`Config::max_protocol_name_len`].
107
    max_protocol_name_len: usize,
108
    /// See [`Config::ping_interval`].
109
    ping_interval: Duration,
110
    /// See [`Config::ping_timeout`].
111
    ping_timeout: Duration,
112
}
113
114
impl<TNow, TSubUd> SingleStream<TNow, TSubUd>
115
where
116
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
117
{
118
    /// Reads data coming from the socket, updates the internal state machine, and writes data
119
    /// destined to the socket through the [`ReadWrite`].
120
    ///
121
    /// In order to avoid unnecessary memory allocations, only one [`Event`] is returned at a time.
122
    /// Consequently, this method returns as soon as an event is available, even if the buffers
123
    /// haven't finished being read. Call this method in a loop until the number of bytes read and
124
    /// written are both 0, and the returned [`Event`] is `None`.
125
    ///
126
    /// If an error is returned, the socket should be entirely shut down.
127
    // TODO: consider exposing an API more similar to the one of substream::Substream::read_write?
128
389
    pub fn read_write(
129
389
        mut self,
130
389
        read_write: &'_ mut ReadWrite<TNow>,
131
389
    ) -> Result<(SingleStream<TNow, TSubUd>, Option<Event<TSubUd>>), Error> {
132
389
        // Start any outgoing ping if necessary.
133
389
        if read_write.now >= self.inner.next_ping {
134
2
            self.inner.next_ping = read_write.now.clone() + self.inner.ping_interval;
135
2
136
2
            // It might be that the remote has reset the ping substream, in which case the out ping
137
2
            // substream no longer exists and we immediately consider the ping as failed.
138
2
            if self.inner.yamux.has_substream(self.inner.outgoing_pings) {
139
2
                let mut payload = [0u8; 32];
140
2
                self.inner.ping_payload_randomness.fill_bytes(&mut payload);
141
2
                self.inner.yamux[self.inner.outgoing_pings]
142
2
                    .as_mut()
143
2
                    .unwrap()
144
2
                    .0
145
2
                    .queue_ping(&payload, read_write.now.clone(), self.inner.ping_timeout);
146
2
                self.inner
147
2
                    .yamux
148
2
                    .mark_substream_write_ready(self.inner.outgoing_pings);
149
2
            } else {
150
0
                return Ok((self, Some(Event::PingOutFailed)));
151
            }
152
387
        }
153
389
        read_write.wake_up_after(&self.inner.next_ping);
154
389
155
389
        // If we have both sent and received a GoAway frame, that means that no new substream
156
389
        // can be opened. If in addition to this there is no substream in the connection,
157
389
        // then we can safely close it as a normal termination.
158
389
        // Note that, because we have no guarantee that the remote has received our GoAway
159
389
        // frame yet, it is possible to receive requests for new substreams even after having
160
389
        // sent the GoAway. Because we close the writing side, it is not possible to indicate
161
389
        // to the remote that these new substreams are denied. However, this is not a problem
162
389
        // as the remote interprets our GoAway frame as an automatic refusal of all its pending
163
389
        // substream requests.
164
389
        // TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121
165
389
        if (self.inner.yamux.len()
166
389
            == if self.inner.yamux.has_substream(self.inner.outgoing_pings) {
167
389
                1
168
            } else {
169
0
                0
170
            })
171
49
            && self.inner.yamux.goaway_sent()
172
0
            && self.inner.yamux.received_goaway().is_some()
173
0
        {
174
0
            read_write.close_write();
175
389
        }
176
177
        // Note that we treat the reading side being closed the same way as no data being
178
        // received. The fact that the remote has closed their writing side is no different
179
        // than them leaving their writing side open but no longer send any data at all.
180
        // The remote is free to close their writing side at any point if it judges that it
181
        // will no longer need to send anymore data.
182
        // Note, however, that in principle the remote should have sent a GoAway frame prior
183
        // to closing their writing side. But this is not something we check or really care
184
        // about.
185
186
        // Pass the `read_write` through the Noise state machine.
187
389
        let mut decrypted_read_write = self
188
389
            .encryption
189
389
            .read_write(read_write)
190
389
            .map_err(Error::Noise)
?0
;
191
192
        // Pass the Noise decrypted stream through the Yamux state machine.
193
389
        let yamux_rw_outcome = self
194
389
            .inner
195
389
            .yamux
196
389
            .read_write(&mut decrypted_read_write)
197
389
            .map_err(Error::Yamux)
?0
;
198
199
389
        match yamux_rw_outcome {
200
40
            yamux::ReadWriteOutcome::Idle { yamux } => {
201
40
                self.inner.yamux = yamux;
202
40
203
40
                // Nothing happened, and thus there is nothing more to do.
204
40
                drop(decrypted_read_write);
205
40
                return Ok((self, None));
206
            }
207
20
            yamux::ReadWriteOutcome::IncomingSubstream { mut yamux } => {
208
20
                debug_assert!(!yamux.goaway_queued_or_sent());
209
210
                // Receive a request from the remote for a new incoming substream.
211
                // These requests are automatically accepted unless the total limit to the
212
                // number of substreams has been reached.
213
                // Note that `num_inbound()` counts substreams that have been closed but not
214
                // yet removed from the state machine. This can affect the actual limit in a
215
                // subtle way. At the time of writing of this comment the limit should be
216
                // properly enforced, however it is not considered problematic if it weren't.
217
20
                if yamux.num_inbound() >= self.inner.max_inbound_substreams {
218
0
                    // Can only error if there's no incoming substream, which we know for sure
219
0
                    // is the case here.
220
0
                    yamux
221
0
                        .reject_pending_substream()
222
0
                        .unwrap_or_else(|_| panic!());
Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_write0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_write0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_write0CsiUjFBJteJ7x_17smoldot_full_node
223
20
                } else {
224
20
                    // Can only error if there's no incoming substream, which we know for sure
225
20
                    // is the case here.
226
20
                    yamux
227
20
                        .accept_pending_substream(Some((
228
20
                            substream::Substream::ingoing(self.inner.max_protocol_name_len),
229
20
                            None,
230
20
                        )))
231
20
                        .unwrap_or_else(|_| 
panic!()0
);
Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes_0CsiUjFBJteJ7x_17smoldot_full_node
232
20
                }
233
234
20
                self.inner.yamux = yamux;
235
20
236
20
                drop(decrypted_read_write);
237
20
                return Ok((self, None));
238
            }
239
            yamux::ReadWriteOutcome::ProcessSubstream {
240
329
                mut substream_read_write,
241
329
            } => {
242
329
                // The Yamux state machine needs to process a substream.
243
329
244
329
                // Temporarily extract the substream's fields to put them back later.
245
329
                let (state_machine, mut substream_user_data) =
246
329
                    substream_read_write.user_data_mut().take().unwrap();
247
329
                let (state_machine_update, event) =
248
329
                    state_machine.read_write(substream_read_write.read_write());
249
329
250
329
                let event_to_yield = event.map(|ev| {
251
34
                    Self::pass_through_substream_event(
252
34
                        substream_read_write.substream_id(),
253
34
                        &mut substream_user_data,
254
34
                        ev,
255
34
                    )
256
329
                });
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes0_0Bc_
Line
Count
Source
250
34
                let event_to_yield = event.map(|ev| {
251
34
                    Self::pass_through_substream_event(
252
34
                        substream_read_write.substream_id(),
253
34
                        &mut substream_user_data,
254
34
                        ev,
255
34
                    )
256
34
                });
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes0_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes0_0CsiUjFBJteJ7x_17smoldot_full_node
257
329
258
329
                match state_machine_update {
259
324
                    Some(s) => {
260
324
                        *substream_read_write.user_data_mut() = Some((s, substream_user_data));
261
324
                        self.inner.yamux = substream_read_write.finish();
262
324
                    }
263
5
                    None => {
264
5
                        self.inner.yamux = substream_read_write.reset();
265
5
                    }
266
                }
267
268
329
                if let Some(
event_to_yield34
) = event_to_yield {
269
34
                    drop(decrypted_read_write);
270
34
                    return Ok((self, Some(event_to_yield)));
271
295
                }
272
            }
273
0
            yamux::ReadWriteOutcome::StreamReset { yamux, .. } => {
274
0
                self.inner.yamux = yamux;
275
0
                decrypted_read_write.wake_up_asap();
276
0
            }
277
0
            yamux::ReadWriteOutcome::GoAway { yamux, .. } => {
278
0
                self.inner.yamux = yamux;
279
0
                drop(decrypted_read_write);
280
0
                return Ok((self, Some(Event::NewOutboundSubstreamsForbidden)));
281
            }
282
            yamux::ReadWriteOutcome::PingResponse { .. } => {
283
                // Can only happen if we send out Yamux pings, which we never do.
284
0
                unreachable!()
285
            }
286
        }
287
288
295
        drop(decrypted_read_write);
289
295
290
295
        // Substreams that have been closed or reset aren't immediately removed the yamux state
291
295
        // machine. They must be removed manually, which is what is done here.
292
295
        // TODO: could be optimized by doing it only through a Yamux event? this is the case for StreamReset but not for graceful streams closures
293
295
        let dead_substream_ids = self
294
295
            .inner
295
295
            .yamux
296
295
            .dead_substreams()
297
295
            .map(|(id, death_ty, _)| 
(id, death_ty)2
)
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writes1_0Bc_
Line
Count
Source
297
2
            .map(|(id, death_ty, _)| (id, death_ty))
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes1_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamppE10read_writes1_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEE10read_writes1_0CsiUjFBJteJ7x_17smoldot_full_node
298
295
            .collect::<Vec<_>>();
299
297
        for (
dead_substream_id, death_ty2
) in dead_substream_ids {
300
2
            match death_ty {
301
                yamux::DeadSubstreamTy::Reset => {
302
                    // If the substream was reset by the remote, then the substream state
303
                    // machine will still be `Some`.
304
0
                    if let Some((state_machine, mut user_data)) =
305
2
                        self.inner.yamux.remove_dead_substream(dead_substream_id)
306
                    {
307
                        // TODO: consider changing this `state_machine.reset()` function to be a state transition of the substream state machine (that doesn't take ownership), to simplify the implementation of both the substream state machine and this code
308
0
                        if let Some(event) = state_machine.reset() {
309
0
                            return Ok((
310
0
                                self,
311
0
                                Some(Self::pass_through_substream_event(
312
0
                                    dead_substream_id,
313
0
                                    &mut user_data,
314
0
                                    event,
315
0
                                )),
316
0
                            ));
317
0
                        }
318
2
                    };
319
320
                    // Removing a dead substream might lead to Yamux being able to process more
321
                    // incoming data. As such, we loop again.
322
2
                    read_write.wake_up_asap();
323
                }
324
0
                yamux::DeadSubstreamTy::ClosedGracefully => {
325
0
                    self.inner.yamux.remove_dead_substream(dead_substream_id);
326
0
                }
327
            }
328
        }
329
330
295
        Ok((self, None))
331
389
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE10read_writeBa_
Line
Count
Source
128
389
    pub fn read_write(
129
389
        mut self,
130
389
        read_write: &'_ mut ReadWrite<TNow>,
131
389
    ) -> Result<(SingleStream<TNow, TSubUd>, Option<Event<TSubUd>>), Error> {
132
389
        // Start any outgoing ping if necessary.
133
389
        if read_write.now >= self.inner.next_ping {
134
2
            self.inner.next_ping = read_write.now.clone() + self.inner.ping_interval;
135
2
136
2
            // It might be that the remote has reset the ping substream, in which case the out ping
137
2
            // substream no longer exists and we immediately consider the ping as failed.
138
2
            if self.inner.yamux.has_substream(self.inner.outgoing_pings) {
139
2
                let mut payload = [0u8; 32];
140
2
                self.inner.ping_payload_randomness.fill_bytes(&mut payload);
141
2
                self.inner.yamux[self.inner.outgoing_pings]
142
2
                    .as_mut()
143
2
                    .unwrap()
144
2
                    .0
145
2
                    .queue_ping(&payload, read_write.now.clone(), self.inner.ping_timeout);
146
2
                self.inner
147
2
                    .yamux
148
2
                    .mark_substream_write_ready(self.inner.outgoing_pings);
149
2
            } else {
150
0
                return Ok((self, Some(Event::PingOutFailed)));
151
            }
152
387
        }
153
389
        read_write.wake_up_after(&self.inner.next_ping);
154
389
155
389
        // If we have both sent and received a GoAway frame, that means that no new substream
156
389
        // can be opened. If in addition to this there is no substream in the connection,
157
389
        // then we can safely close it as a normal termination.
158
389
        // Note that, because we have no guarantee that the remote has received our GoAway
159
389
        // frame yet, it is possible to receive requests for new substreams even after having
160
389
        // sent the GoAway. Because we close the writing side, it is not possible to indicate
161
389
        // to the remote that these new substreams are denied. However, this is not a problem
162
389
        // as the remote interprets our GoAway frame as an automatic refusal of all its pending
163
389
        // substream requests.
164
389
        // TODO: review w.r.t. https://github.com/smol-dot/smoldot/issues/1121
165
389
        if (self.inner.yamux.len()
166
389
            == if self.inner.yamux.has_substream(self.inner.outgoing_pings) {
167
389
                1
168
            } else {
169
0
                0
170
            })
171
49
            && self.inner.yamux.goaway_sent()
172
0
            && self.inner.yamux.received_goaway().is_some()
173
0
        {
174
0
            read_write.close_write();
175
389
        }
176
177
        // Note that we treat the reading side being closed the same way as no data being
178
        // received. The fact that the remote has closed their writing side is no different
179
        // than them leaving their writing side open but no longer send any data at all.
180
        // The remote is free to close their writing side at any point if it judges that it
181
        // will no longer need to send anymore data.
182
        // Note, however, that in principle the remote should have sent a GoAway frame prior
183
        // to closing their writing side. But this is not something we check or really care
184
        // about.
185
186
        // Pass the `read_write` through the Noise state machine.
187
389
        let mut decrypted_read_write = self
188
389
            .encryption
189
389
            .read_write(read_write)
190
389
            .map_err(Error::Noise)
?0
;
191
192
        // Pass the Noise decrypted stream through the Yamux state machine.
193
389
        let yamux_rw_outcome = self
194
389
            .inner
195
389
            .yamux
196
389
            .read_write(&mut decrypted_read_write)
197
389
            .map_err(Error::Yamux)
?0
;
198
199
389
        match yamux_rw_outcome {
200
40
            yamux::ReadWriteOutcome::Idle { yamux } => {
201
40
                self.inner.yamux = yamux;
202
40
203
40
                // Nothing happened, and thus there is nothing more to do.
204
40
                drop(decrypted_read_write);
205
40
                return Ok((self, None));
206
            }
207
20
            yamux::ReadWriteOutcome::IncomingSubstream { mut yamux } => {
208
20
                debug_assert!(!yamux.goaway_queued_or_sent());
209
210
                // Receive a request from the remote for a new incoming substream.
211
                // These requests are automatically accepted unless the total limit to the
212
                // number of substreams has been reached.
213
                // Note that `num_inbound()` counts substreams that have been closed but not
214
                // yet removed from the state machine. This can affect the actual limit in a
215
                // subtle way. At the time of writing of this comment the limit should be
216
                // properly enforced, however it is not considered problematic if it weren't.
217
20
                if yamux.num_inbound() >= self.inner.max_inbound_substreams {
218
0
                    // Can only error if there's no incoming substream, which we know for sure
219
0
                    // is the case here.
220
0
                    yamux
221
0
                        .reject_pending_substream()
222
0
                        .unwrap_or_else(|_| panic!());
223
20
                } else {
224
20
                    // Can only error if there's no incoming substream, which we know for sure
225
20
                    // is the case here.
226
20
                    yamux
227
20
                        .accept_pending_substream(Some((
228
20
                            substream::Substream::ingoing(self.inner.max_protocol_name_len),
229
20
                            None,
230
20
                        )))
231
20
                        .unwrap_or_else(|_| panic!());
232
20
                }
233
234
20
                self.inner.yamux = yamux;
235
20
236
20
                drop(decrypted_read_write);
237
20
                return Ok((self, None));
238
            }
239
            yamux::ReadWriteOutcome::ProcessSubstream {
240
329
                mut substream_read_write,
241
329
            } => {
242
329
                // The Yamux state machine needs to process a substream.
243
329
244
329
                // Temporarily extract the substream's fields to put them back later.
245
329
                let (state_machine, mut substream_user_data) =
246
329
                    substream_read_write.user_data_mut().take().unwrap();
247
329
                let (state_machine_update, event) =
248
329
                    state_machine.read_write(substream_read_write.read_write());
249
329
250
329
                let event_to_yield = event.map(|ev| {
251
                    Self::pass_through_substream_event(
252
                        substream_read_write.substream_id(),
253
                        &mut substream_user_data,
254
                        ev,
255
                    )
256
329
                });
257
329
258
329
                match state_machine_update {
259
324
                    Some(s) => {
260
324
                        *substream_read_write.user_data_mut() = Some((s, substream_user_data));
261
324
                        self.inner.yamux = substream_read_write.finish();
262
324
                    }
263
5
                    None => {
264
5
                        self.inner.yamux = substream_read_write.reset();
265
5
                    }
266
                }
267
268
329
                if let Some(
event_to_yield34
) = event_to_yield {
269
34
                    drop(decrypted_read_write);
270
34
                    return Ok((self, Some(event_to_yield)));
271
295
                }
272
            }
273
0
            yamux::ReadWriteOutcome::StreamReset { yamux, .. } => {
274
0
                self.inner.yamux = yamux;
275
0
                decrypted_read_write.wake_up_asap();
276
0
            }
277
0
            yamux::ReadWriteOutcome::GoAway { yamux, .. } => {
278
0
                self.inner.yamux = yamux;
279
0
                drop(decrypted_read_write);
280
0
                return Ok((self, Some(Event::NewOutboundSubstreamsForbidden)));
281
            }
282
            yamux::ReadWriteOutcome::PingResponse { .. } => {
283
                // Can only happen if we send out Yamux pings, which we never do.
284
0
                unreachable!()
285
            }
286
        }
287
288
295
        drop(decrypted_read_write);
289
295
290
295
        // Substreams that have been closed or reset aren't immediately removed the yamux state
291
295
        // machine. They must be removed manually, which is what is done here.
292
295
        // TODO: could be optimized by doing it only through a Yamux event? this is the case for StreamReset but not for graceful streams closures
293
295
        let dead_substream_ids = self
294
295
            .inner
295
295
            .yamux
296
295
            .dead_substreams()
297
295
            .map(|(id, death_ty, _)| (id, death_ty))
298
295
            .collect::<Vec<_>>();
299
297
        for (
dead_substream_id, death_ty2
) in dead_substream_ids {
300
2
            match death_ty {
301
                yamux::DeadSubstreamTy::Reset => {
302
                    // If the substream was reset by the remote, then the substream state
303
                    // machine will still be `Some`.
304
0
                    if let Some((state_machine, mut user_data)) =
305
2
                        self.inner.yamux.remove_dead_substream(dead_substream_id)
306
                    {
307
                        // TODO: consider changing this `state_machine.reset()` function to be a state transition of the substream state machine (that doesn't take ownership), to simplify the implementation of both the substream state machine and this code
308
0
                        if let Some(event) = state_machine.reset() {
309
0
                            return Ok((
310
0
                                self,
311
0
                                Some(Self::pass_through_substream_event(
312
0
                                    dead_substream_id,
313
0
                                    &mut user_data,
314
0
                                    event,
315
0
                                )),
316
0
                            ));
317
0
                        }
318
2
                    };
319
320
                    // Removing a dead substream might lead to Yamux being able to process more
321
                    // incoming data. As such, we loop again.
322
2
                    read_write.wake_up_asap();
323
                }
324
0
                yamux::DeadSubstreamTy::ClosedGracefully => {
325
0
                    self.inner.yamux.remove_dead_substream(dead_substream_id);
326
0
                }
327
            }
328
        }
329
330
295
        Ok((self, None))
331
389
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE10read_writeBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE10read_writeCsiUjFBJteJ7x_17smoldot_full_node
332
333
    /// Turns an event from the [`substream`] module into an [`Event`].
334
34
    fn pass_through_substream_event(
335
34
        substream_id: yamux::SubstreamId,
336
34
        substream_user_data: &mut Option<TSubUd>,
337
34
        event: substream::Event,
338
34
    ) -> Event<TSubUd> {
339
34
        match event {
340
            substream::Event::InboundError {
341
0
                error,
342
0
                was_accepted: false,
343
0
            } => Event::InboundError(error),
344
            substream::Event::InboundError {
345
                was_accepted: true, ..
346
0
            } => Event::InboundAcceptedCancel {
347
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
348
0
                user_data: substream_user_data.take().unwrap(),
349
0
                // TODO: notify of the error?
350
0
            },
351
20
            substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated {
352
20
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
353
20
                protocol_name,
354
20
            },
355
0
            substream::Event::InboundNegotiatedCancel => Event::InboundNegotiatedCancel {
356
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
357
0
            },
358
2
            substream::Event::RequestIn { request } => Event::RequestIn {
359
2
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
360
2
                request,
361
2
            },
362
3
            substream::Event::Response { response } => Event::Response {
363
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
364
3
                response,
365
3
                user_data: substream_user_data.take().unwrap(),
366
3
            },
367
3
            substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen {
368
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
369
3
                handshake,
370
3
            },
371
0
            substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel {
372
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
373
0
            },
374
3
            substream::Event::NotificationIn { notification } => Event::NotificationIn {
375
3
                notification,
376
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
377
3
            },
378
0
            substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose {
379
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
380
0
                outcome,
381
0
                user_data: substream_user_data.take().unwrap(),
382
0
            },
383
3
            substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult {
384
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
385
3
                result: match result {
386
1
                    Ok(r) => Ok(r),
387
2
                    Err(err) => Err((err, substream_user_data.take().unwrap())),
388
                },
389
            },
390
            substream::Event::NotificationsOutCloseDemanded => {
391
0
                Event::NotificationsOutCloseDemanded {
392
0
                    id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
393
0
                }
394
            }
395
0
            substream::Event::NotificationsOutReset => Event::NotificationsOutReset {
396
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
397
0
                user_data: substream_user_data.take().unwrap(),
398
0
            },
399
0
            substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time },
400
            substream::Event::PingOutError { .. } => {
401
                // Because ping events are automatically generated by the external API without any
402
                // guarantee, it is safe to merge multiple failed pings into one.
403
0
                Event::PingOutFailed
404
            }
405
        }
406
34
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28pass_through_substream_eventBa_
Line
Count
Source
334
34
    fn pass_through_substream_event(
335
34
        substream_id: yamux::SubstreamId,
336
34
        substream_user_data: &mut Option<TSubUd>,
337
34
        event: substream::Event,
338
34
    ) -> Event<TSubUd> {
339
34
        match event {
340
            substream::Event::InboundError {
341
0
                error,
342
0
                was_accepted: false,
343
0
            } => Event::InboundError(error),
344
            substream::Event::InboundError {
345
                was_accepted: true, ..
346
0
            } => Event::InboundAcceptedCancel {
347
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
348
0
                user_data: substream_user_data.take().unwrap(),
349
0
                // TODO: notify of the error?
350
0
            },
351
20
            substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated {
352
20
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
353
20
                protocol_name,
354
20
            },
355
0
            substream::Event::InboundNegotiatedCancel => Event::InboundNegotiatedCancel {
356
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
357
0
            },
358
2
            substream::Event::RequestIn { request } => Event::RequestIn {
359
2
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
360
2
                request,
361
2
            },
362
3
            substream::Event::Response { response } => Event::Response {
363
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
364
3
                response,
365
3
                user_data: substream_user_data.take().unwrap(),
366
3
            },
367
3
            substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen {
368
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
369
3
                handshake,
370
3
            },
371
0
            substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel {
372
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
373
0
            },
374
3
            substream::Event::NotificationIn { notification } => Event::NotificationIn {
375
3
                notification,
376
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
377
3
            },
378
0
            substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose {
379
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
380
0
                outcome,
381
0
                user_data: substream_user_data.take().unwrap(),
382
0
            },
383
3
            substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult {
384
3
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
385
3
                result: match result {
386
1
                    Ok(r) => Ok(r),
387
2
                    Err(err) => Err((err, substream_user_data.take().unwrap())),
388
                },
389
            },
390
            substream::Event::NotificationsOutCloseDemanded => {
391
0
                Event::NotificationsOutCloseDemanded {
392
0
                    id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
393
0
                }
394
            }
395
0
            substream::Event::NotificationsOutReset => Event::NotificationsOutReset {
396
0
                id: SubstreamId(SubstreamIdInner::SingleStream(substream_id)),
397
0
                user_data: substream_user_data.take().unwrap(),
398
0
            },
399
0
            substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time },
400
            substream::Event::PingOutError { .. } => {
401
                // Because ping events are automatically generated by the external API without any
402
                // guarantee, it is safe to merge multiple failed pings into one.
403
0
                Event::PingOutFailed
404
            }
405
        }
406
34
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28pass_through_substream_eventCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28pass_through_substream_eventBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28pass_through_substream_eventCsiUjFBJteJ7x_17smoldot_full_node
407
408
    /// Close the incoming substreams, automatically denying any new substream request from the
409
    /// remote.
410
    ///
411
    /// Note that this does not prevent incoming-substreams-related events
412
    /// (such as [`Event::RequestIn`]) from being generated, as it is possible that the remote has
413
    /// already opened a substream but has no sent all the necessary handshake messages yet.
414
    ///
415
    /// # Panic
416
    ///
417
    /// Panic if this function has been called before. It is illegal to call
418
    /// [`SingleStream::deny_new_incoming_substreams`] more than one on the same connections.
419
    ///
420
0
    pub fn deny_new_incoming_substreams(&mut self) {
421
0
        // TODO: arbitrary yamux error code
422
0
        self.inner
423
0
            .yamux
424
0
            .send_goaway(yamux::GoAwayErrorCode::NormalTermination)
425
0
            .unwrap()
426
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28deny_new_incoming_substreamsBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28deny_new_incoming_substreamsBa_
427
428
    /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`].
429
    ///
430
    /// The new value only applies to substreams opened after this function has been called.
431
0
    pub fn set_max_protocol_name_len(&mut self, new_value: usize) {
432
0
        self.inner.max_protocol_name_len = new_value;
433
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE25set_max_protocol_name_lenBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE25set_max_protocol_name_lenBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsiUjFBJteJ7x_17smoldot_full_node
434
435
    /// Sends a request to the remote.
436
    ///
437
    /// This method only inserts the request into the connection object. Use
438
    /// [`SingleStream::read_write`] in order to actually send out the request.
439
    ///
440
    /// Assuming that the remote is using the same implementation, an [`Event::RequestIn`] will
441
    /// be generated on its side.
442
    ///
443
    /// If `request` is `None`, then no request is sent to the remote at all. If `request` is
444
    /// `Some`, then a (potentially-empty) request is sent. If `Some(&[])` is provided, a
445
    /// length-prefix containing a 0 is sent to the remote.
446
    ///
447
    /// After the remote has sent back a response, an [`Event::Response`] event will be generated
448
    /// locally. The `user_data` parameter will be passed back.
449
    ///
450
    /// The timeout is the time between the moment the substream is opened and the moment the
451
    /// response is sent back. If the emitter doesn't send the request or if the receiver doesn't
452
    /// answer during this time window, the request is considered failed.
453
    ///
454
    /// # Panic
455
    ///
456
    /// Panics if a [`Event::NewOutboundSubstreamsForbidden`] event has been generated in the past.
457
    ///
458
3
    pub fn add_request(
459
3
        &mut self,
460
3
        protocol_name: String,
461
3
        request: Option<Vec<u8>>,
462
3
        timeout: TNow,
463
3
        max_response_size: usize,
464
3
        user_data: TSubUd,
465
3
    ) -> SubstreamId {
466
3
        let substream_id = self
467
3
            .inner
468
3
            .yamux
469
3
            .open_substream(Some((
470
3
                substream::Substream::request_out(
471
3
                    protocol_name,
472
3
                    timeout,
473
3
                    request,
474
3
                    max_response_size,
475
3
                ),
476
3
                Some(user_data),
477
3
            )))
478
3
            .unwrap(); // TODO: consider not panicking
479
3
480
3
        // TODO: we add some bytes due to the length prefix, this is a bit hacky as we should ask this information from the substream
481
3
        self.inner.yamux.add_remote_window_saturating(
482
3
            substream_id,
483
3
            u64::try_from(max_response_size)
484
3
                .unwrap_or(u64::MAX)
485
3
                .saturating_add(64)
486
3
                .saturating_sub(yamux::NEW_SUBSTREAMS_FRAME_SIZE),
487
3
        );
488
3
489
3
        SubstreamId(SubstreamIdInner::SingleStream(substream_id))
490
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE11add_requestBa_
Line
Count
Source
458
3
    pub fn add_request(
459
3
        &mut self,
460
3
        protocol_name: String,
461
3
        request: Option<Vec<u8>>,
462
3
        timeout: TNow,
463
3
        max_response_size: usize,
464
3
        user_data: TSubUd,
465
3
    ) -> SubstreamId {
466
3
        let substream_id = self
467
3
            .inner
468
3
            .yamux
469
3
            .open_substream(Some((
470
3
                substream::Substream::request_out(
471
3
                    protocol_name,
472
3
                    timeout,
473
3
                    request,
474
3
                    max_response_size,
475
3
                ),
476
3
                Some(user_data),
477
3
            )))
478
3
            .unwrap(); // TODO: consider not panicking
479
3
480
3
        // TODO: we add some bytes due to the length prefix, this is a bit hacky as we should ask this information from the substream
481
3
        self.inner.yamux.add_remote_window_saturating(
482
3
            substream_id,
483
3
            u64::try_from(max_response_size)
484
3
                .unwrap_or(u64::MAX)
485
3
                .saturating_add(64)
486
3
                .saturating_sub(yamux::NEW_SUBSTREAMS_FRAME_SIZE),
487
3
        );
488
3
489
3
        SubstreamId(SubstreamIdInner::SingleStream(substream_id))
490
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE11add_requestBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsiUjFBJteJ7x_17smoldot_full_node
491
492
    /// Opens a outgoing substream with the given protocol, destined for a stream of
493
    /// notifications.
494
    ///
495
    /// The remote must first accept (or reject) the substream before notifications can be sent
496
    /// on it.
497
    ///
498
    /// This method only inserts the opening handshake into the connection object. Use
499
    /// [`SingleStream::read_write`] in order to actually send out the request.
500
    ///
501
    /// Assuming that the remote is using the same implementation, an
502
    /// [`Event::NotificationsInOpen`] will be generated on its side.
503
    ///
504
    /// # Panic
505
    ///
506
    /// Panics if a [`Event::NewOutboundSubstreamsForbidden`] event has been generated in the past.
507
    ///
508
3
    pub fn open_notifications_substream(
509
3
        &mut self,
510
3
        protocol_name: String,
511
3
        handshake: Vec<u8>,
512
3
        max_handshake_size: usize,
513
3
        timeout: TNow,
514
3
        user_data: TSubUd,
515
3
    ) -> SubstreamId {
516
3
        let substream = self
517
3
            .inner
518
3
            .yamux
519
3
            .open_substream(Some((
520
3
                substream::Substream::notifications_out(
521
3
                    timeout,
522
3
                    protocol_name,
523
3
                    handshake,
524
3
                    max_handshake_size,
525
3
                ),
526
3
                Some(user_data),
527
3
            )))
528
3
            .unwrap(); // TODO: consider not panicking
529
3
530
3
        SubstreamId(SubstreamIdInner::SingleStream(substream))
531
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28open_notifications_substreamBa_
Line
Count
Source
508
3
    pub fn open_notifications_substream(
509
3
        &mut self,
510
3
        protocol_name: String,
511
3
        handshake: Vec<u8>,
512
3
        max_handshake_size: usize,
513
3
        timeout: TNow,
514
3
        user_data: TSubUd,
515
3
    ) -> SubstreamId {
516
3
        let substream = self
517
3
            .inner
518
3
            .yamux
519
3
            .open_substream(Some((
520
3
                substream::Substream::notifications_out(
521
3
                    timeout,
522
3
                    protocol_name,
523
3
                    handshake,
524
3
                    max_handshake_size,
525
3
                ),
526
3
                Some(user_data),
527
3
            )))
528
3
            .unwrap(); // TODO: consider not panicking
529
3
530
3
        SubstreamId(SubstreamIdInner::SingleStream(substream))
531
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28open_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
532
533
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol
534
    /// name and indicate the type of the protocol.
535
    ///
536
    /// # Panic
537
    ///
538
    /// Panics if the substream is not in the correct state.
539
    ///
540
19
    pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) {
541
19
        let substream_id = match substream_id.0 {
542
19
            SubstreamIdInner::SingleStream(id) => id,
543
0
            _ => panic!(),
544
        };
545
546
19
        let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap();
547
19
        substream.accept_inbound(ty);
548
19
        debug_assert!(ud.is_none());
549
19
        *ud = Some(user_data);
550
19
        self.inner.yamux.mark_substream_write_ready(substream_id);
551
19
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE14accept_inboundBa_
Line
Count
Source
540
19
    pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) {
541
19
        let substream_id = match substream_id.0 {
542
19
            SubstreamIdInner::SingleStream(id) => id,
543
0
            _ => panic!(),
544
        };
545
546
19
        let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap();
547
19
        substream.accept_inbound(ty);
548
19
        debug_assert!(ud.is_none());
549
19
        *ud = Some(user_data);
550
19
        self.inner.yamux.mark_substream_write_ready(substream_id);
551
19
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE14accept_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node
552
553
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the
554
    /// protocol name as not supported.
555
    ///
556
    /// # Panic
557
    ///
558
    /// Panics if the substream is not in the correct state.
559
    ///
560
1
    pub fn reject_inbound(&mut self, substream_id: SubstreamId) {
561
1
        let substream_id = match substream_id.0 {
562
1
            SubstreamIdInner::SingleStream(id) => id,
563
0
            _ => panic!(),
564
        };
565
566
1
        let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap();
567
1
        substream.reject_inbound();
568
1
        debug_assert!(ud.is_none());
569
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
570
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE14reject_inboundBa_
Line
Count
Source
560
1
    pub fn reject_inbound(&mut self, substream_id: SubstreamId) {
561
1
        let substream_id = match substream_id.0 {
562
1
            SubstreamIdInner::SingleStream(id) => id,
563
0
            _ => panic!(),
564
        };
565
566
1
        let (substream, ud) = self.inner.yamux[substream_id].as_mut().unwrap();
567
1
        substream.reject_inbound();
568
1
        debug_assert!(ud.is_none());
569
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
570
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE14reject_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node
571
572
    /// Accepts an inbound notifications protocol. Must be called in response to a
573
    /// [`Event::NotificationsInOpen`].
574
    ///
575
    /// # Panic
576
    ///
577
    /// Panics if the substream id is not valid or the substream is of the wrong type.
578
    ///
579
1
    pub fn accept_in_notifications_substream(
580
1
        &mut self,
581
1
        substream_id: SubstreamId,
582
1
        handshake: Vec<u8>,
583
1
        max_notification_size: usize,
584
1
    ) {
585
1
        let substream_id = match substream_id.0 {
586
1
            SubstreamIdInner::SingleStream(id) => id,
587
0
            _ => panic!(),
588
        };
589
590
1
        self.inner.yamux[substream_id]
591
1
            .as_mut()
592
1
            .unwrap()
593
1
            .0
594
1
            .accept_in_notifications_substream(handshake, max_notification_size);
595
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
596
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33accept_in_notifications_substreamBa_
Line
Count
Source
579
1
    pub fn accept_in_notifications_substream(
580
1
        &mut self,
581
1
        substream_id: SubstreamId,
582
1
        handshake: Vec<u8>,
583
1
        max_notification_size: usize,
584
1
    ) {
585
1
        let substream_id = match substream_id.0 {
586
1
            SubstreamIdInner::SingleStream(id) => id,
587
0
            _ => panic!(),
588
        };
589
590
1
        self.inner.yamux[substream_id]
591
1
            .as_mut()
592
1
            .unwrap()
593
1
            .0
594
1
            .accept_in_notifications_substream(handshake, max_notification_size);
595
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
596
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33accept_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
597
598
    /// Rejects an inbound notifications protocol. Must be called in response to a
599
    /// [`Event::NotificationsInOpen`].
600
    ///
601
    /// # Panic
602
    ///
603
    /// Panics if the substream id is not valid or the substream is of the wrong type.
604
    ///
605
1
    pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) {
606
1
        let substream_id = match substream_id.0 {
607
1
            SubstreamIdInner::SingleStream(id) => id,
608
0
            _ => panic!(),
609
        };
610
611
1
        self.inner.yamux[substream_id]
612
1
            .as_mut()
613
1
            .unwrap()
614
1
            .0
615
1
            .reject_in_notifications_substream();
616
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
617
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33reject_in_notifications_substreamBa_
Line
Count
Source
605
1
    pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) {
606
1
        let substream_id = match substream_id.0 {
607
1
            SubstreamIdInner::SingleStream(id) => id,
608
0
            _ => panic!(),
609
        };
610
611
1
        self.inner.yamux[substream_id]
612
1
            .as_mut()
613
1
            .unwrap()
614
1
            .0
615
1
            .reject_in_notifications_substream();
616
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
617
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33reject_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
618
619
    /// Queues a notification to be written out on the given substream.
620
    ///
621
    /// # About back-pressure
622
    ///
623
    /// This method unconditionally queues up data. You must be aware that the remote, however,
624
    /// can decide to delay indefinitely the sending of that data, which can potentially lead to
625
    /// an unbounded increase in memory.
626
    ///
627
    /// As such, you are encouraged to call this method only if the amount of queued data (as
628
    /// determined by calling [`SingleStream::notification_substream_queued_bytes`]) is below a
629
    /// certain threshold. If above, the notification should be silently discarded.
630
    ///
631
    /// # Panic
632
    ///
633
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
634
    /// notifications substream isn't in the appropriate state.
635
    ///
636
3
    pub fn write_notification_unbounded(
637
3
        &mut self,
638
3
        substream_id: SubstreamId,
639
3
        notification: Vec<u8>,
640
3
    ) {
641
3
        let substream_id = match substream_id.0 {
642
3
            SubstreamIdInner::SingleStream(id) => id,
643
0
            _ => panic!(),
644
        };
645
646
3
        self.inner.yamux[substream_id]
647
3
            .as_mut()
648
3
            .unwrap()
649
3
            .0
650
3
            .write_notification_unbounded(notification);
651
3
        self.inner.yamux.mark_substream_write_ready(substream_id);
652
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE28write_notification_unboundedBa_
Line
Count
Source
636
3
    pub fn write_notification_unbounded(
637
3
        &mut self,
638
3
        substream_id: SubstreamId,
639
3
        notification: Vec<u8>,
640
3
    ) {
641
3
        let substream_id = match substream_id.0 {
642
3
            SubstreamIdInner::SingleStream(id) => id,
643
0
            _ => panic!(),
644
        };
645
646
3
        self.inner.yamux[substream_id]
647
3
            .as_mut()
648
3
            .unwrap()
649
3
            .0
650
3
            .write_notification_unbounded(notification);
651
3
        self.inner.yamux.mark_substream_write_ready(substream_id);
652
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE28write_notification_unboundedBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsiUjFBJteJ7x_17smoldot_full_node
653
654
    /// Returns the number of bytes waiting to be sent out on that substream.
655
    ///
656
    /// See the documentation of [`SingleStream::write_notification_unbounded`] for context.
657
    ///
658
    /// # Panic
659
    ///
660
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
661
    /// notifications substream isn't in the appropriate state.
662
    ///
663
0
    pub fn notification_substream_queued_bytes(&self, substream_id: SubstreamId) -> usize {
664
0
        let substream_id = match substream_id.0 {
665
0
            SubstreamIdInner::SingleStream(id) => id,
666
0
            _ => panic!(),
667
        };
668
669
        // Note that this doesn't take into account data that the Yamux or Noise state machines
670
        // have extracted from the substream but hasn't sent out yet, because the objective of this
671
        // function is to provide a hint about when to stop sending more data, and the size of the
672
        // data that Noise and Yamux have extracted is always bounded anyway. It's not worth the
673
        // effort of reporting a 100% accurate information when a 100% accurate information isn't
674
        // needed.
675
0
        self.inner.yamux[substream_id]
676
0
            .as_ref()
677
0
            .unwrap()
678
0
            .0
679
0
            .notification_substream_queued_bytes()
680
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE35notification_substream_queued_bytesBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE35notification_substream_queued_bytesBa_
681
682
    /// Closes a notifications substream opened after a successful
683
    /// [`Event::NotificationsOutResult`].
684
    ///
685
    /// This can be done even when in the negotiation phase, in other words before the remote has
686
    /// accepted/refused the substream.
687
    ///
688
    /// # Panic
689
    ///
690
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
691
    /// notifications substream isn't in the appropriate state.
692
    ///
693
0
    pub fn close_out_notifications_substream(&mut self, substream_id: SubstreamId) {
694
0
        let substream_id = match substream_id.0 {
695
0
            SubstreamIdInner::SingleStream(id) => id,
696
0
            _ => panic!(),
697
        };
698
699
0
        if !self.inner.yamux.has_substream(substream_id) {
700
0
            panic!()
701
0
        }
702
0
703
0
        self.inner.yamux[substream_id]
704
0
            .as_mut()
705
0
            .unwrap()
706
0
            .0
707
0
            .close_out_notifications_substream();
708
0
        self.inner.yamux.mark_substream_write_ready(substream_id);
709
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE33close_out_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE33close_out_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
710
711
    /// Closes a notifications substream that was accepted using
712
    /// [`SingleStream::accept_in_notifications_substream`].
713
    ///
714
    /// # Panic
715
    ///
716
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
717
    /// notifications substream isn't in the appropriate state.
718
    ///
719
0
    pub fn close_in_notifications_substream(&mut self, substream_id: SubstreamId, timeout: TNow) {
720
0
        let substream_id = match substream_id.0 {
721
0
            SubstreamIdInner::SingleStream(id) => id,
722
0
            _ => panic!(),
723
        };
724
725
0
        if !self.inner.yamux.has_substream(substream_id) {
726
0
            panic!()
727
0
        }
728
0
729
0
        self.inner.yamux[substream_id]
730
0
            .as_mut()
731
0
            .unwrap()
732
0
            .0
733
0
            .close_in_notifications_substream(timeout);
734
0
        self.inner.yamux.mark_substream_write_ready(substream_id);
735
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE32close_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE32close_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
736
737
    /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`].
738
    ///
739
    /// Passing an `Err` corresponds, on the other side, to a
740
    /// [`substream::RequestError::SubstreamClosed`].
741
    ///
742
    /// Returns an error if the [`SubstreamId`] is invalid.
743
1
    pub fn respond_in_request(
744
1
        &mut self,
745
1
        substream_id: SubstreamId,
746
1
        response: Result<Vec<u8>, ()>,
747
1
    ) -> Result<(), RespondInRequestError> {
748
1
        let substream_id = match substream_id.0 {
749
1
            SubstreamIdInner::SingleStream(id) => id,
750
0
            _ => return Err(RespondInRequestError::SubstreamClosed),
751
        };
752
753
1
        if !self.inner.yamux.has_substream(substream_id) {
754
0
            return Err(RespondInRequestError::SubstreamClosed);
755
1
        }
756
1
757
1
        self.inner.yamux[substream_id]
758
1
            .as_mut()
759
1
            .unwrap()
760
1
            .0
761
1
            .respond_in_request(response)
?0
;
762
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
763
1
        Ok(())
764
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationuE18respond_in_requestBa_
Line
Count
Source
743
1
    pub fn respond_in_request(
744
1
        &mut self,
745
1
        substream_id: SubstreamId,
746
1
        response: Result<Vec<u8>, ()>,
747
1
    ) -> Result<(), RespondInRequestError> {
748
1
        let substream_id = match substream_id.0 {
749
1
            SubstreamIdInner::SingleStream(id) => id,
750
0
            _ => return Err(RespondInRequestError::SubstreamClosed),
751
        };
752
753
1
        if !self.inner.yamux.has_substream(substream_id) {
754
0
            return Err(RespondInRequestError::SubstreamClosed);
755
1
        }
756
1
757
1
        self.inner.yamux[substream_id]
758
1
            .as_mut()
759
1
            .unwrap()
760
1
            .0
761
1
            .respond_in_request(response)
?0
;
762
1
        self.inner.yamux.mark_substream_write_ready(substream_id);
763
1
        Ok(())
764
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1F_6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamppE18respond_in_requestBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB2_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node
765
}
766
767
impl<TNow, TSubUd> Index<SubstreamId> for SingleStream<TNow, TSubUd> {
768
    type Output = TSubUd;
769
770
0
    fn index(&self, substream_id: SubstreamId) -> &Self::Output {
771
0
        let substream_id = match substream_id.0 {
772
0
            SubstreamIdInner::SingleStream(id) => id,
773
0
            _ => panic!(),
774
        };
775
776
0
        self.inner.yamux[substream_id]
777
0
            .as_ref()
778
0
            .unwrap()
779
0
            .1
780
0
            .as_ref()
781
0
            .unwrap()
782
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_
Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsaYZPK01V26L_4core4time8DurationINtNtB1H_6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB1H_3ops5index5IndexNtB6_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_
Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamINtB4_12SingleStreamNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB2h_3ops5index5IndexNtB6_11SubstreamIdE5indexCsiUjFBJteJ7x_17smoldot_full_node
783
}
784
785
impl<TNow, TSubUd> IndexMut<SubstreamId> for SingleStream<TNow, TSubUd> {
786
0
    fn index_mut(&mut self, substream_id: SubstreamId) -> &mut Self::Output {
787
0
        let substream_id = match substream_id.0 {
788
0
            SubstreamIdInner::SingleStream(id) => id,
789
0
            _ => panic!(),
790
        };
791
792
0
        self.inner.yamux[substream_id]
793
0
            .as_mut()
794
0
            .unwrap()
795
0
            .1
796
0
            .as_mut()
797
0
            .unwrap()
798
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams0_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams0_0ppEINtB5_12SingleStreamppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_
799
}
800
801
impl<TNow, TSubUd> fmt::Debug for SingleStream<TNow, TSubUd>
802
where
803
    TSubUd: fmt::Debug,
804
{
805
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
806
0
        f.debug_map()
807
0
            .entries(self.inner.yamux.user_datas())
808
0
            .finish()
809
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streams1_0ppEINtB5_12SingleStreamppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streams1_0ppEINtB5_12SingleStreamppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
810
}
811
812
/// Error during a connection. The connection should be shut down.
813
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXs5_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs5_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
814
pub enum Error {
815
    /// Error in the noise cipher. Data has most likely been corrupted.
816
    #[display(fmt = "Noise error: {_0}")]
817
    Noise(noise::CipherError),
818
    /// Error while encoding noise data.
819
    #[display(fmt = "{_0}")]
820
    NoiseEncrypt(noise::EncryptError),
821
    /// Error in the Yamux multiplexing protocol.
822
    #[display(fmt = "Yamux error: {_0}")]
823
    Yamux(yamux::Error),
824
}
825
826
/// Successfully negotiated connection. Ready to be turned into a [`SingleStream`].
827
pub struct ConnectionPrototype {
828
    encryption: noise::Noise,
829
}
830
831
impl ConnectionPrototype {
832
    /// Builds a new [`ConnectionPrototype`] of a connection using the Noise and Yamux protocols.
833
16
    pub(crate) fn from_noise_yamux(encryption: noise::Noise) -> Self {
834
16
        ConnectionPrototype { encryption }
835
16
    }
_RNvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype16from_noise_yamux
Line
Count
Source
833
16
    pub(crate) fn from_noise_yamux(encryption: noise::Noise) -> Self {
834
16
        ConnectionPrototype { encryption }
835
16
    }
Unexecuted instantiation: _RNvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype16from_noise_yamux
836
837
    /// Extracts the Noise state machine from this prototype.
838
0
    pub fn into_noise_state_machine(self) -> noise::Noise {
839
0
        self.encryption
840
0
    }
Unexecuted instantiation: _RNvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype24into_noise_state_machine
Unexecuted instantiation: _RNvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototype24into_noise_state_machine
841
842
    /// Turns this prototype into an actual connection.
843
14
    pub fn into_connection<TNow, TSubUd>(self, config: Config<TNow>) -> SingleStream<TNow, TSubUd>
844
14
    where
845
14
        TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
846
14
    {
847
14
        let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed);
848
14
849
14
        let mut yamux = yamux::Yamux::new(yamux::Config {
850
14
            is_initiator: self.encryption.is_initiator(),
851
14
            capacity: config.substreams_capacity,
852
14
            randomness_seed: {
853
14
                let mut seed = [0; 32];
854
14
                randomness.fill_bytes(&mut seed);
855
14
                seed
856
14
            },
857
14
            max_out_data_frame_size: NonZeroU32::new(8192).unwrap(), // TODO: make configurable?
858
14
            max_simultaneous_queued_pongs: NonZeroUsize::new(4).unwrap(),
859
14
            max_simultaneous_rst_substreams: NonZeroUsize::new(1024).unwrap(),
860
14
        });
861
14
862
14
        let outgoing_pings = yamux
863
14
            .open_substream(Some((
864
14
                substream::Substream::ping_out(config.ping_protocol.clone()),
865
14
                None,
866
14
            )))
867
14
            // Can only panic if a `GoAway` has been received, or if there are too many substreams
868
14
            // already open, which we know for sure can't happen here
869
14
            .unwrap_or_else(|_| 
panic!()0
);
Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationuE0Bg_
Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationINtNtB28_6option6OptionNtNtBe_10collection11SubstreamIdEE0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionppE0Bg_
Unexecuted instantiation: _RNCINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB8_19ConnectionPrototype15into_connectionNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBe_10collection11SubstreamIdEE0CsiUjFBJteJ7x_17smoldot_full_node
870
14
871
14
        SingleStream {
872
14
            encryption: self.encryption,
873
14
            inner: Box::new(Inner {
874
14
                yamux,
875
14
                outgoing_pings,
876
14
                next_ping: config.first_out_ping,
877
14
                ping_payload_randomness: randomness,
878
14
                max_inbound_substreams: config.max_inbound_substreams,
879
14
                max_protocol_name_len: config.max_protocol_name_len,
880
14
                ping_interval: config.ping_interval,
881
14
                ping_timeout: config.ping_timeout,
882
14
            }),
883
14
        }
884
14
    }
_RINvMs2_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationuEBe_
Line
Count
Source
843
14
    pub fn into_connection<TNow, TSubUd>(self, config: Config<TNow>) -> SingleStream<TNow, TSubUd>
844
14
    where
845
14
        TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
846
14
    {
847
14
        let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed);
848
14
849
14
        let mut yamux = yamux::Yamux::new(yamux::Config {
850
14
            is_initiator: self.encryption.is_initiator(),
851
14
            capacity: config.substreams_capacity,
852
14
            randomness_seed: {
853
14
                let mut seed = [0; 32];
854
14
                randomness.fill_bytes(&mut seed);
855
14
                seed
856
14
            },
857
14
            max_out_data_frame_size: NonZeroU32::new(8192).unwrap(), // TODO: make configurable?
858
14
            max_simultaneous_queued_pongs: NonZeroUsize::new(4).unwrap(),
859
14
            max_simultaneous_rst_substreams: NonZeroUsize::new(1024).unwrap(),
860
14
        });
861
14
862
14
        let outgoing_pings = yamux
863
14
            .open_substream(Some((
864
14
                substream::Substream::ping_out(config.ping_protocol.clone()),
865
14
                None,
866
14
            )))
867
14
            // Can only panic if a `GoAway` has been received, or if there are too many substreams
868
14
            // already open, which we know for sure can't happen here
869
14
            .unwrap_or_else(|_| panic!());
870
14
871
14
        SingleStream {
872
14
            encryption: self.encryption,
873
14
            inner: Box::new(Inner {
874
14
                yamux,
875
14
                outgoing_pings,
876
14
                next_ping: config.first_out_ping,
877
14
                ping_payload_randomness: randomness,
878
14
                max_inbound_substreams: config.max_inbound_substreams,
879
14
                max_protocol_name_len: config.max_protocol_name_len,
880
14
                ping_interval: config.ping_interval,
881
14
                ping_timeout: config.ping_timeout,
882
14
            }),
883
14
        }
884
14
    }
Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsaYZPK01V26L_4core4time8DurationINtNtB26_6option6OptionNtNtBc_10collection11SubstreamIdEECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionppEBe_
Unexecuted instantiation: _RINvMs2_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB6_19ConnectionPrototype15into_connectionNtNtCsbpXXxgr6u8g_3std4time7InstantINtNtCsaYZPK01V26L_4core6option6OptionNtNtBc_10collection11SubstreamIdEECsiUjFBJteJ7x_17smoldot_full_node
885
}
886
887
impl fmt::Debug for ConnectionPrototype {
888
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
889
0
        f.debug_struct("ConnectionPrototype").finish()
890
0
    }
Unexecuted instantiation: _RNvXs3_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototypeNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
Unexecuted instantiation: _RNvXs3_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established13single_streamNtB5_19ConnectionPrototypeNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt
891
}