Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/substream.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Individual substream within an established connection.
19
//!
20
//! This module contains the [`Substream`] struct, a state machine containing the state of a
21
//! single substream. When the remote sends data on that substream, or when the remote is ready to
22
//! accept more data on that substream, the state machine can be updated by calling
23
//! [`Substream::read_write`]. This optionally produces an event that indicates what happened on
24
//! the substream as a result of the call.
25
26
use crate::libp2p::{connection::multistream_select, read_write};
27
use crate::util::leb128;
28
29
use alloc::{borrow::ToOwned as _, collections::VecDeque, string::String, vec::Vec};
30
use core::{
31
    fmt, mem,
32
    num::NonZeroUsize,
33
    ops::{Add, Sub},
34
    time::Duration,
35
};
36
37
/// State machine containing the state of a single substream of an established connection.
38
pub struct Substream<TNow> {
39
    inner: SubstreamInner<TNow>,
40
}
41
42
enum SubstreamInner<TNow> {
43
    /// Protocol negotiation in progress in an incoming substream.
44
    InboundNegotiating(multistream_select::InProgress<String>, bool),
45
    /// Protocol negotiation in an incoming substream is in progress, and an
46
    /// [`Event::InboundNegotiated`] has been emitted. Now waiting for the API user to indicate
47
    /// whether the protocol is supported and if so the type of substream.
48
    InboundNegotiatingApiWait(multistream_select::ListenerAcceptOrDeny<String>),
49
    /// Protocol negotiation in an incoming substream is in progress, and the API user has
50
    /// indicated that the given protocol is supported. Finishing the handshake before switching
51
    /// to a different state.
52
    InboundNegotiatingAccept(multistream_select::InProgress<String>, InboundTy),
53
    /// Incoming substream has failed to negotiate a protocol. Waiting for a close from the remote.
54
    /// In order to save a round-trip time, the remote might assume that the protocol negotiation
55
    /// has succeeded. As such, it might send additional data on this substream that should be
56
    /// ignored.
57
    InboundFailed,
58
59
    /// Failure to negotiate an outbound notifications substream.
60
    NotificationsOutNegotiationFailed,
61
    /// A notifications protocol is being negotiated or has been negotiated on a substream. Either
62
    /// a successful handshake or an abrupt closing is now expected.
63
    NotificationsOutHandshakeRecv {
64
        /// When the opening will time out in the absence of response.
65
        timeout: TNow,
66
        /// State of the protocol negotiation. `None` if the handshake has already finished.
67
        negotiation: Option<multistream_select::InProgress<String>>,
68
        /// Size of the remote handshake, if known. If `Some`, we have already extracted the length
69
        /// from the incoming buffer.
70
        handshake_in_size: Option<usize>,
71
        /// Maximum allowed size of the remote handshake.
72
        handshake_in_max_size: usize,
73
        /// Handshake payload to write out.
74
        handshake_out: VecDeque<u8>,
75
    },
76
    /// A notifications protocol has been negotiated, and the remote accepted it. Can now send
77
    /// notifications.
78
    NotificationsOut {
79
        /// Notifications to write out.
80
        notifications: VecDeque<u8>,
81
        /// If `true`, we have reported a [`Event::NotificationsOutCloseDemanded`] event in the
82
        /// past and shouldn't report one again.
83
        close_demanded_by_remote: bool,
84
    },
85
    /// A notifications protocol has been closed. Waiting for the remote to close it as well.
86
    NotificationsOutClosed,
87
88
    /// A notifications protocol has been negotiated on an incoming substream. A handshake from
89
    /// the remote is expected.
90
    NotificationsInHandshake {
91
        /// Size of the handshake, if known. If `Some`, we have already extracted the length
92
        /// from the incoming buffer.
93
        handshake_size: Option<usize>,
94
        /// Maximum allowed size of the handshake.
95
        handshake_max_size: usize,
96
    },
97
    /// A handshake on a notifications protocol has been received. Now waiting for an action from
98
    /// the API user.
99
    NotificationsInWait,
100
    /// API user has refused an incoming substream. Waiting for a close from the remote.
101
    /// In order to save a round-trip time, the remote might assume that the protocol negotiation
102
    /// has succeeded. As such, it might send additional data on this substream that should be
103
    /// ignored.
104
    NotificationsInRefused,
105
    /// A notifications protocol has been negotiated on a substream. Remote can now send
106
    /// notifications.
107
    NotificationsIn {
108
        /// If `Some`, the local node wants to shut down the substream. If the given timeout is
109
        /// reached, the closing is forced.
110
        close_desired_timeout: Option<TNow>,
111
        /// Size of the next notification, if known. If `Some`, we have already extracted the
112
        /// length from the incoming buffer.
113
        next_notification_size: Option<usize>,
114
        /// Handshake payload to write out.
115
        handshake: VecDeque<u8>,
116
        /// Maximum size, in bytes, allowed for each notification.
117
        max_notification_size: usize,
118
    },
119
    /// An inbound notifications protocol was open, but then the remote closed its writing side.
120
    NotificationsInClosed,
121
122
    /// Outgoing request.
123
    RequestOut {
124
        /// When the request will time out in the absence of response.
125
        timeout: TNow,
126
        /// State of the protocol negotiation. `None` if the negotiation has finished.
127
        negotiation: Option<multistream_select::InProgress<String>>,
128
        /// Request payload to write out.
129
        request: VecDeque<u8>,
130
        /// Size of the response, if known. If `Some`, we have already extracted the length
131
        /// from the incoming buffer.
132
        response_size: Option<usize>,
133
        /// Maximum allowed size of the response.
134
        response_max_size: usize,
135
    },
136
137
    /// A request-response protocol has been negotiated on an inbound substream. A request is now
138
    /// expected.
139
    RequestInRecv {
140
        /// Size of the request, if known. If `Some`, we have already extracted the length
141
        /// from the incoming buffer.
142
        request_size: Option<usize>,
143
        /// Maximum allowed size of the request.
144
        request_max_size: usize,
145
    },
146
    /// Similar to [`SubstreamInner::RequestInRecv`], but doesn't expect any request body.
147
    /// Immediately reports an event and switches to [`SubstreamInner::RequestInApiWait`].
148
    RequestInRecvEmpty,
149
    /// A request has been sent by the remote. API user must now send back the response.
150
    RequestInApiWait,
151
    /// A request has been sent by the remote. Sending back the response.
152
    RequestInRespond {
153
        /// Response being sent back.
154
        response: VecDeque<u8>,
155
    },
156
157
    /// Inbound ping substream. Waiting for the ping payload to be received.
158
    PingIn { payload_out: VecDeque<u8> },
159
160
    /// Failed to negotiate a protocol for an outgoing ping substream.
161
    PingOutFailed {
162
        /// FIFO queue of pings that will immediately fail.
163
        queued_pings: smallvec::SmallVec<[Option<(TNow, Duration)>; 1]>,
164
    },
165
    /// Outbound ping substream.
166
    PingOut {
167
        /// State of the protocol negotiation. `None` if the handshake is already finished.
168
        negotiation: Option<multistream_select::InProgress<String>>,
169
        /// Payload of the queued pings that remains to write out.
170
        outgoing_payload: VecDeque<u8>,
171
        /// Data waiting to be received from the remote. Any mismatch will cause an error.
172
        /// Contains even the data that is still queued in `outgoing_payload`.
173
        expected_payload: VecDeque<Vec<u8>>,
174
        /// FIFO queue of pings waiting to be answered. For each ping, when the ping was queued
175
        /// and after how long it will time out, or `None` if the timeout has already occurred.
176
        queued_pings: smallvec::SmallVec<[Option<(TNow, Duration)>; 1]>,
177
    },
178
}
179
180
impl<TNow> Substream<TNow>
181
where
182
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
183
{
184
    /// Initializes an new `ingoing` substream.
185
    ///
186
    /// After the remote has requested a protocol, an [`Event::InboundNegotiated`] event will be
187
    /// generated, after which [`Substream::accept_inbound`] or [`Substream::reject_inbound`] must
188
    /// be called in order to indicate whether the protocol is accepted, and if so the nature of
189
    /// the negotiated protocol.
190
    /// A [`Event::InboundError`] can also be generated, either before or after the
191
    /// [`Event::InboundNegotiated`], but always before any [`Event::NotificationsInOpen`].
192
    ///
193
    /// If [`InboundTy::Notifications`] is passed, then a [`Event::NotificationsInOpen`] will be
194
    /// generated (unless an error happens, in which case [`Event::InboundError`]).
195
    /// In response, the API user must call either [`Substream::accept_in_notifications_substream`]
196
    /// or [`Substream::reject_in_notifications_substream`]. Before one of these two methods is
197
    /// called, it is possible for an [`Event::NotificationsInOpenCancel`] to be generated, in
198
    /// which case the inbound request is canceled and the substream closed.
199
    /// After [`Substream::accept_in_notifications_substream`] is called, zero or more
200
    /// [`Event::NotificationIn`] will be generated, until a [`Event::NotificationsInClose`] which
201
    /// indicates the end of the substream.
202
    ///
203
    /// If [`InboundTy::Request`] is passed, then a [`Event::RequestIn`] will be generated, after
204
    /// which the API user must call [`Substream::respond_in_request`]. An [`Event::InboundError`]
205
    /// can happen at any point.
206
    ///
207
    /// This flow is also true if you call [`Substream::reset`] at any point.
208
20
    pub fn ingoing(max_protocol_name_len: usize) -> Self {
209
20
        let negotiation =
210
20
            multistream_select::InProgress::new(multistream_select::Config::Listener {
211
20
                max_protocol_name_len,
212
20
            });
213
20
214
20
        Substream {
215
20
            inner: SubstreamInner::InboundNegotiating(negotiation, false),
216
20
        }
217
20
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE7ingoingBa_
Line
Count
Source
208
20
    pub fn ingoing(max_protocol_name_len: usize) -> Self {
209
20
        let negotiation =
210
20
            multistream_select::InProgress::new(multistream_select::Config::Listener {
211
20
                max_protocol_name_len,
212
20
            });
213
20
214
20
        Substream {
215
20
            inner: SubstreamInner::InboundNegotiating(negotiation, false),
216
20
        }
217
20
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE7ingoingCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE7ingoingBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE7ingoingCsiUjFBJteJ7x_17smoldot_full_node
218
219
    /// Initializes an outgoing notifications substream.
220
    ///
221
    /// After the remote has sent back a handshake or after an error occurred, an
222
    /// [`Event::NotificationsOutResult`] event will be generated locally.
223
    ///
224
    /// If this event contains an `Ok`, then [`Substream::write_notification_unbounded`],
225
    /// [`Substream::notification_substream_queued_bytes`] and
226
    /// [`Substream::close_out_notifications_substream`] can be used, and
227
    /// [`Event::NotificationsOutCloseDemanded`] and [`Event::NotificationsOutReset`] can be
228
    /// generated.
229
3
    pub fn notifications_out(
230
3
        timeout: TNow,
231
3
        requested_protocol: String,
232
3
        handshake: Vec<u8>,
233
3
        max_handshake_size: usize,
234
3
    ) -> Self {
235
3
        // TODO: check `handshake < max_handshake_size`?
236
3
237
3
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
238
3
            requested_protocol,
239
3
        });
240
3
241
3
        let handshake_out = {
242
3
            let handshake_len = handshake.len();
243
3
            leb128::encode_usize(handshake_len)
244
3
                .chain(handshake)
245
3
                .collect::<VecDeque<_>>()
246
3
        };
247
3
248
3
        Substream {
249
3
            inner: SubstreamInner::NotificationsOutHandshakeRecv {
250
3
                timeout,
251
3
                negotiation: Some(negotiation),
252
3
                handshake_in_size: None,
253
3
                handshake_in_max_size: max_handshake_size,
254
3
                handshake_out,
255
3
            },
256
3
        }
257
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE17notifications_outBa_
Line
Count
Source
229
3
    pub fn notifications_out(
230
3
        timeout: TNow,
231
3
        requested_protocol: String,
232
3
        handshake: Vec<u8>,
233
3
        max_handshake_size: usize,
234
3
    ) -> Self {
235
3
        // TODO: check `handshake < max_handshake_size`?
236
3
237
3
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
238
3
            requested_protocol,
239
3
        });
240
3
241
3
        let handshake_out = {
242
3
            let handshake_len = handshake.len();
243
3
            leb128::encode_usize(handshake_len)
244
3
                .chain(handshake)
245
3
                .collect::<VecDeque<_>>()
246
3
        };
247
3
248
3
        Substream {
249
3
            inner: SubstreamInner::NotificationsOutHandshakeRecv {
250
3
                timeout,
251
3
                negotiation: Some(negotiation),
252
3
                handshake_in_size: None,
253
3
                handshake_in_max_size: max_handshake_size,
254
3
                handshake_out,
255
3
            },
256
3
        }
257
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE17notifications_outCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE17notifications_outBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17notifications_outCsiUjFBJteJ7x_17smoldot_full_node
258
259
    /// Initializes an outgoing request substream.
260
    ///
261
    /// After the remote has sent back a response or after an error occurred, an [`Event::Response`]
262
    /// event will be generated locally. The `user_data` parameter will be passed back.
263
    ///
264
    /// If the `request` is `None`, then nothing at all will be written out, not even a length
265
    /// prefix. If the `request` is `Some`, then a length prefix will be written out. Consequently,
266
    /// `Some(&[])` writes a single `0` for the request.
267
3
    pub fn request_out(
268
3
        requested_protocol: String,
269
3
        timeout: TNow,
270
3
        request: Option<Vec<u8>>,
271
3
        max_response_size: usize,
272
3
    ) -> Self {
273
3
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
274
3
            requested_protocol,
275
3
        });
276
277
3
        let request_payload = if let Some(request) = request {
278
3
            let request_len = request.len();
279
3
            leb128::encode_usize(request_len)
280
3
                .chain(request)
281
3
                .collect::<VecDeque<_>>()
282
        } else {
283
0
            VecDeque::new()
284
        };
285
286
3
        Substream {
287
3
            inner: SubstreamInner::RequestOut {
288
3
                timeout,
289
3
                negotiation: Some(negotiation),
290
3
                request: request_payload,
291
3
                response_size: None,
292
3
                response_max_size: max_response_size,
293
3
            },
294
3
        }
295
3
296
3
        // TODO: somehow do substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size
297
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11request_outBa_
Line
Count
Source
267
3
    pub fn request_out(
268
3
        requested_protocol: String,
269
3
        timeout: TNow,
270
3
        request: Option<Vec<u8>>,
271
3
        max_response_size: usize,
272
3
    ) -> Self {
273
3
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
274
3
            requested_protocol,
275
3
        });
276
277
3
        let request_payload = if let Some(request) = request {
278
3
            let request_len = request.len();
279
3
            leb128::encode_usize(request_len)
280
3
                .chain(request)
281
3
                .collect::<VecDeque<_>>()
282
        } else {
283
0
            VecDeque::new()
284
        };
285
286
3
        Substream {
287
3
            inner: SubstreamInner::RequestOut {
288
3
                timeout,
289
3
                negotiation: Some(negotiation),
290
3
                request: request_payload,
291
3
                response_size: None,
292
3
                response_max_size: max_response_size,
293
3
            },
294
3
        }
295
3
296
3
        // TODO: somehow do substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size
297
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11request_outCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE11request_outBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11request_outCsiUjFBJteJ7x_17smoldot_full_node
298
299
    /// Initializes an outgoing ping substream.
300
    ///
301
    /// Call [`Substream::queue_ping`] in order to queue an outgoing ping on this substream. This
302
    /// can be done at any time, even immediately after this function has returned.
303
    ///
304
    /// The substream will attempt to negotiate the ping protocol. No event is reported if the
305
    /// protocol fails to negotiate. Instead, outgoing pings will be transparently failing.
306
    ///
307
    /// > Note: At the time of the writing of this comment, no API method exists to close an
308
    /// >       outgoing ping substream.
309
14
    pub fn ping_out(ping_protocol_name: String) -> Self {
310
14
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
311
14
            requested_protocol: ping_protocol_name,
312
14
        });
313
14
314
14
        Substream {
315
14
            inner: SubstreamInner::PingOut {
316
14
                negotiation: Some(negotiation),
317
14
                outgoing_payload: VecDeque::with_capacity(32),
318
14
                expected_payload: VecDeque::with_capacity(32),
319
14
                queued_pings: smallvec::SmallVec::new(),
320
14
            },
321
14
        }
322
14
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE8ping_outBa_
Line
Count
Source
309
14
    pub fn ping_out(ping_protocol_name: String) -> Self {
310
14
        let negotiation = multistream_select::InProgress::new(multistream_select::Config::Dialer {
311
14
            requested_protocol: ping_protocol_name,
312
14
        });
313
14
314
14
        Substream {
315
14
            inner: SubstreamInner::PingOut {
316
14
                negotiation: Some(negotiation),
317
14
                outgoing_payload: VecDeque::with_capacity(32),
318
14
                expected_payload: VecDeque::with_capacity(32),
319
14
                queued_pings: smallvec::SmallVec::new(),
320
14
            },
321
14
        }
322
14
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE8ping_outCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE8ping_outBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE8ping_outCsiUjFBJteJ7x_17smoldot_full_node
323
324
    /// Reads data coming from the socket, updates the internal state machine, and writes data
325
    /// destined to the socket through the [`read_write::ReadWrite`].
326
    ///
327
    /// If both the reading side and the writing side are closed and no other event can happen, or
328
    /// if at any point a protocol error happens, then `None` is returned. In that case, the
329
    /// substream must be reset if it is not closed.
330
329
    pub fn read_write(
331
329
        self,
332
329
        read_write: &'_ mut read_write::ReadWrite<TNow>,
333
329
    ) -> (Option<Self>, Option<Event>) {
334
329
        let (me, event) = self.read_write2(read_write);
335
329
        (me.map(|inner| 
Substream { inner }324
), event)
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_write0Bc_
Line
Count
Source
335
324
        (me.map(|inner| Substream { inner }), event)
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_write0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE10read_write0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_write0CsiUjFBJteJ7x_17smoldot_full_node
336
329
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeBa_
Line
Count
Source
330
329
    pub fn read_write(
331
329
        self,
332
329
        read_write: &'_ mut read_write::ReadWrite<TNow>,
333
329
    ) -> (Option<Self>, Option<Event>) {
334
329
        let (me, event) = self.read_write2(read_write);
335
329
        (me.map(|inner| Substream { inner }), event)
336
329
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE10read_writeBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10read_writeCsiUjFBJteJ7x_17smoldot_full_node
337
338
329
    fn read_write2(
339
329
        self,
340
329
        read_write: &'_ mut read_write::ReadWrite<TNow>,
341
329
    ) -> (Option<SubstreamInner<TNow>>, Option<Event>) {
342
329
        match self.inner {
343
102
            SubstreamInner::InboundNegotiating(nego, was_rejected_already) => {
344
102
                match nego.read_write(read_write) {
345
81
                    Ok(multistream_select::Negotiation::InProgress(nego)) => (
346
81
                        Some(SubstreamInner::InboundNegotiating(
347
81
                            nego,
348
81
                            was_rejected_already,
349
81
                        )),
350
81
                        None,
351
81
                    ),
352
20
                    Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(accept_deny)) => {
353
20
                        // TODO: maybe avoid cloning the protocol name?
354
20
                        let protocol = accept_deny.requested_protocol().to_owned();
355
20
                        (
356
20
                            Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)),
357
20
                            Some(Event::InboundNegotiated(protocol)),
358
20
                        )
359
                    }
360
                    Ok(multistream_select::Negotiation::Success) => {
361
                        // Unreachable, as we expect a `ListenerAcceptOrDeny`.
362
0
                        unreachable!()
363
                    }
364
                    Ok(multistream_select::Negotiation::NotAvailable) => {
365
                        // Unreachable in listener mode.
366
0
                        unreachable!()
367
                    }
368
1
                    Err(_) if was_rejected_already => {
369
1
                        // If the negotiation was already rejected once, it is likely that the
370
1
                        // multistream-select protocol error is due to the fact that the remote
371
1
                        // assumes that the multistream-select negotiation always succeeds. As such,
372
1
                        // we treat this situation as "negotiation has failed gracefully".
373
1
                        (Some(SubstreamInner::InboundFailed), None)
374
                    }
375
0
                    Err(err) => (
376
0
                        None,
377
0
                        Some(Event::InboundError {
378
0
                            error: InboundError::NegotiationError(err),
379
0
                            was_accepted: false,
380
0
                        }),
381
0
                    ),
382
                }
383
            }
384
0
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => (
385
0
                Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)),
386
0
                None,
387
0
            ),
388
18
            SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty) => {
389
18
                match nego.read_write(read_write) {
390
0
                    Ok(multistream_select::Negotiation::InProgress(nego)) => (
391
0
                        Some(SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty)),
392
0
                        None,
393
0
                    ),
394
                    Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
395
                        // Can't be reached again, as we have already accepted the protocol.
396
0
                        unreachable!()
397
                    }
398
18
                    Ok(multistream_select::Negotiation::Success) => match inbound_ty {
399
13
                        InboundTy::Ping => (
400
13
                            Some(SubstreamInner::PingIn {
401
13
                                payload_out: VecDeque::with_capacity(32),
402
13
                            }),
403
13
                            None,
404
13
                        ),
405
3
                        InboundTy::Notifications { max_handshake_size } => (
406
3
                            Some(SubstreamInner::NotificationsInHandshake {
407
3
                                handshake_size: None,
408
3
                                handshake_max_size: max_handshake_size,
409
3
                            }),
410
3
                            None,
411
3
                        ),
412
2
                        InboundTy::Request { request_max_size } => {
413
2
                            if let Some(request_max_size) = request_max_size {
414
2
                                (
415
2
                                    Some(SubstreamInner::RequestInRecv {
416
2
                                        request_max_size,
417
2
                                        request_size: None,
418
2
                                    }),
419
2
                                    None,
420
2
                                )
421
                            } else {
422
0
                                (Some(SubstreamInner::RequestInRecvEmpty), None)
423
                            }
424
                        }
425
                    },
426
                    Ok(multistream_select::Negotiation::NotAvailable) => {
427
                        // Unreachable in listener mode.
428
0
                        unreachable!()
429
                    }
430
0
                    Err(err) => (
431
0
                        None,
432
0
                        Some(Event::InboundError {
433
0
                            error: InboundError::NegotiationError(err),
434
0
                            was_accepted: true,
435
0
                        }),
436
0
                    ),
437
                }
438
            }
439
            SubstreamInner::InboundFailed => {
440
                // Substream is an inbound substream that has failed to negotiate a
441
                // protocol. The substream is expected to close soon, but the remote might
442
                // have been eagerly sending data (assuming that the negotiation would
443
                // succeed), which should be silently discarded.
444
1
                read_write.discard_all_incoming();
445
1
                read_write.close_write();
446
1
                if read_write.is_dead() {
447
1
                    (None, None)
448
                } else {
449
0
                    (Some(SubstreamInner::InboundFailed), None)
450
                }
451
            }
452
453
            SubstreamInner::NotificationsOutNegotiationFailed => {
454
                // Substream has failed to negotiate a protocol. The substream is expected to
455
                // close soon.
456
0
                read_write.discard_all_incoming();
457
0
                read_write.close_write();
458
0
                (
459
0
                    if read_write.is_dead() {
460
0
                        None
461
                    } else {
462
0
                        Some(SubstreamInner::NotificationsOutNegotiationFailed)
463
                    },
464
0
                    None,
465
                )
466
            }
467
            SubstreamInner::NotificationsOutHandshakeRecv {
468
30
                timeout,
469
30
                mut negotiation,
470
30
                mut handshake_in_size,
471
30
                handshake_in_max_size,
472
30
                mut handshake_out,
473
30
            } => {
474
30
                if timeout < read_write.now {
475
1
                    read_write.wake_up_asap();
476
1
                    return (
477
1
                        Some(SubstreamInner::NotificationsOutNegotiationFailed),
478
1
                        Some(Event::NotificationsOutResult {
479
1
                            result: Err(NotificationsOutErr::Timeout),
480
1
                        }),
481
1
                    );
482
29
                }
483
484
29
                if let Some(
extracted_negotiation25
) = negotiation.take() {
485
25
                    match extracted_negotiation.read_write(read_write) {
486
23
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
487
23
                            negotiation = Some(nego)
488
                        }
489
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
490
                            // Never happens when dialing.
491
0
                            unreachable!()
492
                        }
493
2
                        Ok(multistream_select::Negotiation::Success) => {}
494
                        Ok(multistream_select::Negotiation::NotAvailable) => {
495
0
                            read_write.wake_up_asap();
496
0
                            return (
497
0
                                Some(SubstreamInner::NotificationsOutNegotiationFailed),
498
0
                                Some(Event::NotificationsOutResult {
499
0
                                    result: Err(NotificationsOutErr::ProtocolNotAvailable),
500
0
                                }),
501
0
                            );
502
                        }
503
0
                        Err(err) => {
504
0
                            return (
505
0
                                None,
506
0
                                Some(Event::NotificationsOutResult {
507
0
                                    result: Err(NotificationsOutErr::NegotiationError(err)),
508
0
                                }),
509
0
                            )
510
                        }
511
                    }
512
4
                }
513
514
29
                if negotiation
515
29
                    .as_ref()
516
29
                    .map_or(true, |n| 
n.can_write_protocol_data()23
)
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write20Bc_
Line
Count
Source
516
23
                    .map_or(true, |n| n.can_write_protocol_data())
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write20CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write20Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write20CsiUjFBJteJ7x_17smoldot_full_node
517
17
                {
518
17
                    read_write.write_from_vec_deque(&mut handshake_out);
519
17
                }
12
520
521
29
                if negotiation.is_none() {
522
6
                    if read_write.expected_incoming_bytes.is_none() {
523
1
                        read_write.wake_up_asap();
524
1
                        return (
525
1
                            Some(SubstreamInner::NotificationsOutNegotiationFailed),
526
1
                            Some(Event::NotificationsOutResult {
527
1
                                result: Err(NotificationsOutErr::RefusedHandshake),
528
1
                            }),
529
1
                        );
530
5
                    }
531
5
532
5
                    // Don't actually process incoming data before handshake is sent out, in order
533
5
                    // to not accidentally perform a state transition.
534
5
                    if !handshake_out.is_empty() {
535
0
                        return (
536
0
                            Some(SubstreamInner::NotificationsOutHandshakeRecv {
537
0
                                timeout,
538
0
                                negotiation,
539
0
                                handshake_in_size,
540
0
                                handshake_in_max_size,
541
0
                                handshake_out,
542
0
                            }),
543
0
                            None,
544
0
                        );
545
5
                    }
546
547
5
                    if let Some(
handshake_in_size1
) = handshake_in_size {
548
1
                        match read_write.incoming_bytes_take(handshake_in_size) {
549
1
                            Ok(Some(remote_handshake)) => {
550
1
                                read_write.wake_up_asap();
551
1
                                return (
552
1
                                    Some(SubstreamInner::NotificationsOut {
553
1
                                        notifications: VecDeque::new(),
554
1
                                        close_demanded_by_remote: false,
555
1
                                    }),
556
1
                                    Some(Event::NotificationsOutResult {
557
1
                                        result: Ok(remote_handshake),
558
1
                                    }),
559
1
                                );
560
                            }
561
0
                            Ok(None) => {}
562
                            Err(read_write::IncomingBytesTakeError::ReadClosed) => {
563
0
                                read_write.wake_up_asap();
564
0
                                return (
565
0
                                    Some(SubstreamInner::NotificationsOutNegotiationFailed),
566
0
                                    Some(Event::NotificationsOutResult {
567
0
                                        result: Err(NotificationsOutErr::RefusedHandshake),
568
0
                                    }),
569
0
                                );
570
                            }
571
                        }
572
                    } else {
573
4
                        match read_write.incoming_bytes_take_leb128(handshake_in_max_size) {
574
1
                            Ok(Some(s)) => handshake_in_size = Some(s),
575
3
                            Ok(None) => {}
576
0
                            Err(error) => return (
577
0
                                None,
578
0
                                Some(Event::Response {
579
0
                                    response: Err(match error {
580
                                        read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => {
581
0
                                            RequestError::ResponseInvalidLeb128
582
                                        }
583
                                        read_write::IncomingBytesTakeLeb128Error::ReadClosed => {
584
0
                                            RequestError::SubstreamClosed
585
                                        }
586
                                        read_write::IncomingBytesTakeLeb128Error::TooLarge => {
587
0
                                            RequestError::ResponseTooLarge
588
                                        }
589
                                    }),
590
                                }),
591
                            ),
592
                        }
593
                    }
594
23
                }
595
596
27
                read_write.wake_up_after(&timeout);
597
27
598
27
                (
599
27
                    Some(SubstreamInner::NotificationsOutHandshakeRecv {
600
27
                        timeout,
601
27
                        negotiation,
602
27
                        handshake_in_size,
603
27
                        handshake_in_max_size,
604
27
                        handshake_out,
605
27
                    }),
606
27
                    None,
607
27
                )
608
            }
609
            SubstreamInner::NotificationsOut {
610
3
                mut notifications,
611
3
                close_demanded_by_remote,
612
3
            } => {
613
3
                // Receiving data on an outgoing substream is forbidden by the protocol.
614
3
                read_write.discard_all_incoming();
615
3
                read_write.write_from_vec_deque(&mut notifications);
616
3
617
3
                // If this debug assertion fails, it means that `expected_incoming_bytes` was `None` in
618
3
                // the past then became `Some` again.
619
3
                debug_assert!(
620
3
                    !close_demanded_by_remote || 
read_write.expected_incoming_bytes.is_none()0
621
                );
622
623
3
                if !close_demanded_by_remote && read_write.expected_incoming_bytes.is_none() {
624
0
                    read_write.wake_up_asap();
625
0
                    return (
626
0
                        Some(SubstreamInner::NotificationsOut {
627
0
                            notifications,
628
0
                            close_demanded_by_remote: true,
629
0
                        }),
630
0
                        Some(Event::NotificationsOutCloseDemanded),
631
0
                    );
632
3
                }
633
3
634
3
                (
635
3
                    Some(SubstreamInner::NotificationsOut {
636
3
                        notifications,
637
3
                        close_demanded_by_remote,
638
3
                    }),
639
3
                    None,
640
3
                )
641
            }
642
            SubstreamInner::NotificationsOutClosed => {
643
0
                read_write.discard_all_incoming();
644
0
                read_write.close_write();
645
0
                (
646
0
                    if read_write.is_dead() {
647
0
                        None
648
                    } else {
649
0
                        Some(SubstreamInner::NotificationsOutClosed)
650
                    },
651
0
                    None,
652
                )
653
            }
654
655
            SubstreamInner::RequestOut {
656
27
                timeout,
657
27
                mut negotiation,
658
27
                mut request,
659
27
                mut response_size,
660
27
                response_max_size,
661
27
            } => {
662
27
                // Note that this might trigger timeouts for requests whose response is available
663
27
                // in `incoming_buffer`. This is intentional, as from the perspective of
664
27
                // `read_write` the response arrived after the timeout. It is the responsibility
665
27
                // of the user to call `read_write` in an appropriate way for this to not happen.
666
27
                if timeout < read_write.now {
667
1
                    read_write.close_write();
668
1
                    return (
669
1
                        None,
670
1
                        Some(Event::Response {
671
1
                            response: Err(RequestError::Timeout),
672
1
                        }),
673
1
                    );
674
26
                }
675
676
26
                if let Some(
extracted_nego25
) = negotiation.take() {
677
25
                    match extracted_nego.read_write(read_write) {
678
23
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
679
23
                            negotiation = Some(nego)
680
                        }
681
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
682
                            // Never happens when dialing.
683
0
                            unreachable!()
684
                        }
685
1
                        Ok(multistream_select::Negotiation::Success) => {}
686
                        Ok(multistream_select::Negotiation::NotAvailable) => {
687
1
                            return (
688
1
                                None,
689
1
                                Some(Event::Response {
690
1
                                    response: Err(RequestError::ProtocolNotAvailable),
691
1
                                }),
692
1
                            )
693
                        }
694
0
                        Err(err) => {
695
0
                            return (
696
0
                                None,
697
0
                                Some(Event::Response {
698
0
                                    response: Err(RequestError::NegotiationError(err)),
699
0
                                }),
700
0
                            )
701
                        }
702
                    }
703
1
                }
704
705
25
                if negotiation
706
25
                    .as_ref()
707
25
                    .map_or(true, |n| 
n.can_write_protocol_data()23
)
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s_0Bc_
Line
Count
Source
707
23
                    .map_or(true, |n| n.can_write_protocol_data())
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s_0CsiUjFBJteJ7x_17smoldot_full_node
708
                {
709
13
                    if request.is_empty() {
710
10
                        read_write.close_write();
711
10
                    } else {
712
3
                        read_write.write_from_vec_deque(&mut request);
713
3
                    }
714
12
                }
715
716
25
                if negotiation.is_none() {
717
2
                    if let Some(
response_size0
) = response_size {
718
0
                        match read_write.incoming_bytes_take(response_size) {
719
0
                            Ok(Some(response)) => {
720
0
                                return (
721
0
                                    None,
722
0
                                    Some(Event::Response {
723
0
                                        response: Ok(response),
724
0
                                    }),
725
0
                                );
726
                            }
727
0
                            Ok(None) => {}
728
                            Err(read_write::IncomingBytesTakeError::ReadClosed) => {
729
0
                                return (
730
0
                                    None,
731
0
                                    Some(Event::Response {
732
0
                                        response: Err(RequestError::SubstreamClosed),
733
0
                                    }),
734
0
                                )
735
                            }
736
                        }
737
                    } else {
738
2
                        match read_write.incoming_bytes_take_leb128(response_max_size) {
739
0
                            Ok(Some(s)) => response_size = Some(s),
740
1
                            Ok(None) => {}
741
1
                            Err(error) => return (
742
1
                                None,
743
1
                                Some(Event::Response {
744
1
                                    response: Err(match error {
745
                                        read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => {
746
0
                                            RequestError::ResponseInvalidLeb128
747
                                        }
748
                                        read_write::IncomingBytesTakeLeb128Error::ReadClosed => {
749
1
                                            RequestError::SubstreamClosed
750
                                        }
751
                                        read_write::IncomingBytesTakeLeb128Error::TooLarge => {
752
0
                                            RequestError::ResponseTooLarge
753
                                        }
754
                                    }),
755
                                }),
756
                            ),
757
                        }
758
                    }
759
23
                }
760
761
24
                read_write.wake_up_after(&timeout);
762
24
763
24
                (
764
24
                    Some(SubstreamInner::RequestOut {
765
24
                        timeout,
766
24
                        negotiation,
767
24
                        request,
768
24
                        response_size,
769
24
                        response_max_size,
770
24
                    }),
771
24
                    None,
772
24
                )
773
            }
774
775
            SubstreamInner::RequestInRecv {
776
4
                mut request_size,
777
4
                request_max_size,
778
            } => {
779
4
                if let Some(
request_size2
) = request_size {
780
2
                    match read_write.incoming_bytes_take(request_size) {
781
2
                        Ok(Some(request)) => {
782
2
                            return (
783
2
                                Some(SubstreamInner::RequestInApiWait),
784
2
                                Some(Event::RequestIn { request }),
785
2
                            );
786
                        }
787
0
                        Ok(None) => {}
788
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
789
0
                            return (
790
0
                                None,
791
0
                                Some(Event::InboundError {
792
0
                                    error: InboundError::SubstreamClosed,
793
0
                                    was_accepted: true,
794
0
                                }),
795
0
                            )
796
                        }
797
                    }
798
                } else {
799
2
                    match read_write.incoming_bytes_take_leb128(request_max_size) {
800
2
                        Ok(Some(s)) => request_size = Some(s),
801
0
                        Ok(None) => {}
802
0
                        Err(error) => {
803
0
                            return (
804
0
                                None,
805
0
                                Some(Event::InboundError {
806
0
                                    error: InboundError::RequestInLebError(error),
807
0
                                    was_accepted: true,
808
0
                                }),
809
0
                            )
810
                        }
811
                    }
812
                }
813
814
2
                (
815
2
                    Some(SubstreamInner::RequestInRecv {
816
2
                        request_max_size,
817
2
                        request_size,
818
2
                    }),
819
2
                    None,
820
2
                )
821
            }
822
0
            SubstreamInner::RequestInRecvEmpty => (
823
0
                Some(SubstreamInner::RequestInApiWait),
824
0
                Some(Event::RequestIn {
825
0
                    request: Vec::new(),
826
0
                }),
827
0
            ),
828
0
            SubstreamInner::RequestInApiWait => (Some(SubstreamInner::RequestInApiWait), None),
829
1
            SubstreamInner::RequestInRespond { mut response } => {
830
1
                if response.is_empty() {
831
1
                    read_write.close_write();
832
1
                    (None, None)
833
                } else {
834
0
                    read_write.write_from_vec_deque(&mut response);
835
0
                    (Some(SubstreamInner::RequestInRespond { response }), None)
836
                }
837
            }
838
839
            SubstreamInner::NotificationsInHandshake {
840
6
                handshake_max_size,
841
6
                mut handshake_size,
842
            } => {
843
6
                if let Some(
handshake_size3
) = handshake_size {
844
3
                    match read_write.incoming_bytes_take(handshake_size) {
845
3
                        Ok(Some(handshake)) => {
846
3
                            return (
847
3
                                Some(SubstreamInner::NotificationsInWait),
848
3
                                Some(Event::NotificationsInOpen { handshake }),
849
3
                            );
850
                        }
851
0
                        Ok(None) => {}
852
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
853
0
                            return (
854
0
                                None,
855
0
                                Some(Event::InboundError {
856
0
                                    error: InboundError::SubstreamClosed,
857
0
                                    was_accepted: true,
858
0
                                }),
859
0
                            )
860
                        }
861
                    }
862
                } else {
863
3
                    match read_write.incoming_bytes_take_leb128(handshake_max_size) {
864
3
                        Ok(Some(s)) => handshake_size = Some(s),
865
0
                        Ok(None) => {}
866
0
                        Err(error) => {
867
0
                            return (
868
0
                                None,
869
0
                                Some(Event::InboundError {
870
0
                                    error: InboundError::NotificationsInError { error },
871
0
                                    was_accepted: true,
872
0
                                }),
873
0
                            )
874
                        }
875
                    }
876
                }
877
878
3
                (
879
3
                    Some(SubstreamInner::NotificationsInHandshake {
880
3
                        handshake_max_size,
881
3
                        handshake_size,
882
3
                    }),
883
3
                    None,
884
3
                )
885
            }
886
            SubstreamInner::NotificationsInWait => {
887
                // Incoming data isn't processed, potentially back-pressuring it.
888
0
                if read_write.expected_incoming_bytes.is_some() {
889
0
                    (Some(SubstreamInner::NotificationsInWait), None)
890
                } else {
891
0
                    read_write.wake_up_asap();
892
0
                    (
893
0
                        Some(SubstreamInner::NotificationsInRefused),
894
0
                        Some(Event::NotificationsInOpenCancel),
895
0
                    )
896
                }
897
            }
898
            SubstreamInner::NotificationsInRefused => {
899
2
                read_write.discard_all_incoming();
900
2
                read_write.close_write();
901
2
                (
902
2
                    if read_write.is_dead() {
903
0
                        None
904
                    } else {
905
2
                        Some(SubstreamInner::NotificationsInRefused)
906
                    },
907
2
                    None,
908
                )
909
            }
910
            SubstreamInner::NotificationsIn {
911
9
                close_desired_timeout,
912
9
                mut next_notification_size,
913
9
                mut handshake,
914
9
                max_notification_size,
915
9
            } => {
916
9
                read_write.write_from_vec_deque(&mut handshake);
917
9
918
9
                if close_desired_timeout
919
9
                    .as_ref()
920
9
                    .map_or(false, |timeout| 
*timeout >= read_write.now0
)
Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s0_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s0_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s0_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s0_0CsiUjFBJteJ7x_17smoldot_full_node
921
                {
922
0
                    read_write.wake_up_asap();
923
0
                    return (
924
0
                        Some(SubstreamInner::NotificationsInClosed),
925
0
                        Some(Event::NotificationsInClose {
926
0
                            outcome: Err(NotificationsInClosedErr::CloseDesiredTimeout),
927
0
                        }),
928
0
                    );
929
9
                }
930
9
931
9
                if close_desired_timeout.is_some() && 
handshake.is_empty()0
{
932
0
                    read_write.close_write();
933
9
                }
934
935
9
                let mut notification = None;
936
937
9
                if let Some(
sz3
) = next_notification_size {
938
3
                    match read_write.incoming_bytes_take(sz) {
939
3
                        Ok(Some(notif)) => {
940
3
                            read_write.wake_up_asap();
941
3
                            notification = Some(notif);
942
3
                            next_notification_size = None;
943
3
                        }
944
0
                        Ok(None) => {}
945
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
946
0
                            read_write.wake_up_asap();
947
0
                            return (
948
0
                                Some(SubstreamInner::NotificationsInClosed),
949
0
                                Some(Event::NotificationsInClose {
950
0
                                    outcome: Err(NotificationsInClosedErr::SubstreamClosed),
951
0
                                }),
952
0
                            );
953
                        }
954
                    }
955
                } else {
956
6
                    match read_write.incoming_bytes_take_leb128(max_notification_size) {
957
3
                        Ok(Some(s)) => next_notification_size = Some(s),
958
3
                        Ok(None) => {}
959
0
                        Err(error) => {
960
0
                            read_write.wake_up_asap();
961
0
                            return (
962
0
                                Some(SubstreamInner::NotificationsInClosed),
963
0
                                Some(Event::NotificationsInClose {
964
0
                                    outcome: Err(NotificationsInClosedErr::ProtocolError(error)),
965
0
                                }),
966
0
                            );
967
                        }
968
                    }
969
                }
970
971
9
                (
972
9
                    Some(SubstreamInner::NotificationsIn {
973
9
                        close_desired_timeout,
974
9
                        next_notification_size,
975
9
                        handshake,
976
9
                        max_notification_size,
977
9
                    }),
978
9
                    notification.map(|n| 
Event::NotificationIn { notification: n }3
),
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s1_0Bc_
Line
Count
Source
978
3
                    notification.map(|n| Event::NotificationIn { notification: n }),
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s1_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s1_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s1_0CsiUjFBJteJ7x_17smoldot_full_node
979
9
                )
980
            }
981
            SubstreamInner::NotificationsInClosed => {
982
0
                read_write.discard_all_incoming();
983
0
                read_write.close_write();
984
0
                (
985
0
                    if read_write.is_dead() {
986
0
                        None
987
                    } else {
988
0
                        Some(SubstreamInner::NotificationsInClosed)
989
                    },
990
0
                    None,
991
                )
992
            }
993
994
24
            SubstreamInner::PingIn { mut payload_out } => {
995
24
                // Inbound ping substream.
996
24
                // The ping protocol consists in sending 32 bytes of data, which the remote has
997
24
                // to send back.
998
24
                read_write.write_from_vec_deque(&mut payload_out);
999
24
                if payload_out.is_empty() {
1000
24
                    if let Ok(Some(
ping0
)) = read_write.incoming_bytes_take(32) {
1001
0
                        payload_out.extend(ping);
1002
24
                    }
1003
0
                }
1004
1005
24
                (Some(SubstreamInner::PingIn { payload_out }), None)
1006
            }
1007
1008
0
            SubstreamInner::PingOutFailed { mut queued_pings } => {
1009
0
                read_write.close_write();
1010
0
                if !queued_pings.is_empty() {
1011
0
                    queued_pings.remove(0);
1012
0
                    read_write.wake_up_asap();
1013
0
                    (
1014
0
                        Some(SubstreamInner::PingOutFailed { queued_pings }),
1015
0
                        Some(Event::PingOutError {
1016
0
                            num_pings: NonZeroUsize::new(1).unwrap(),
1017
0
                        }),
1018
0
                    )
1019
                } else {
1020
0
                    (Some(SubstreamInner::PingOutFailed { queued_pings }), None)
1021
                }
1022
            }
1023
            SubstreamInner::PingOut {
1024
102
                mut negotiation,
1025
102
                mut queued_pings,
1026
102
                mut outgoing_payload,
1027
102
                mut expected_payload,
1028
            } => {
1029
102
                if let Some(
extracted_negotiation90
) = negotiation.take() {
1030
90
                    match extracted_negotiation.read_write(read_write) {
1031
78
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
1032
78
                            negotiation = Some(nego)
1033
                        }
1034
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
1035
                            // Never happens when dialing.
1036
0
                            unreachable!()
1037
                        }
1038
12
                        Ok(multistream_select::Negotiation::Success) => {}
1039
                        Ok(multistream_select::Negotiation::NotAvailable) => {
1040
0
                            read_write.wake_up_asap();
1041
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1042
                        }
1043
                        Err(_) => {
1044
0
                            read_write.wake_up_asap();
1045
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1046
                        }
1047
                    }
1048
12
                }
1049
1050
102
                if negotiation
1051
102
                    .as_ref()
1052
102
                    .map_or(true, |n| 
n.can_write_protocol_data()78
)
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s2_0Bc_
Line
Count
Source
1052
78
                    .map_or(true, |n| n.can_write_protocol_data())
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s2_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s2_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s2_0CsiUjFBJteJ7x_17smoldot_full_node
1053
48
                {
1054
48
                    read_write.write_from_vec_deque(&mut outgoing_payload);
1055
54
                }
1056
1057
                // We check the timeouts before checking the incoming data, as otherwise pings
1058
                // might succeed after their timeout.
1059
102
                for 
timeout6
in queued_pings.iter_mut() {
1060
6
                    if timeout.as_ref().map_or(false, |(when_started, timeout)| {
1061
6
                        (read_write.now.clone() - when_started.clone()) >= *timeout
1062
6
                    }) {
_RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s3_0Bc_
Line
Count
Source
1060
6
                    if timeout.as_ref().map_or(false, |(when_started, timeout)| {
1061
6
                        (read_write.now.clone() - when_started.clone()) >= *timeout
1062
6
                    }) {
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s3_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s3_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s3_0CsiUjFBJteJ7x_17smoldot_full_node
1063
0
                        *timeout = None;
1064
0
                        read_write.wake_up_asap();
1065
0
                        return (
1066
0
                            Some(SubstreamInner::PingOut {
1067
0
                                negotiation,
1068
0
                                expected_payload,
1069
0
                                outgoing_payload,
1070
0
                                queued_pings,
1071
0
                            }),
1072
0
                            Some(Event::PingOutError {
1073
0
                                num_pings: NonZeroUsize::new(1).unwrap(),
1074
0
                            }),
1075
0
                        );
1076
6
                    }
1077
1078
6
                    if let Some((when_started, timeout)) = timeout {
1079
6
                        read_write.wake_up_after(&(when_started.clone() + *timeout));
1080
6
                    }
0
1081
                }
1082
1083
102
                if negotiation.is_none() {
1084
24
                    if let Ok(Some(
pong0
)) = read_write.incoming_bytes_take(32) {
1085
0
                        if expected_payload
1086
0
                            .pop_front()
1087
0
                            .map_or(true, |expected| pong != *expected)
Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s4_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2s4_0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE11read_write2s4_0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2s4_0CsiUjFBJteJ7x_17smoldot_full_node
1088
                        {
1089
0
                            read_write.wake_up_asap();
1090
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1091
0
                        }
1092
0
                        if let Some((when_started, _)) = queued_pings.remove(0) {
1093
0
                            return (
1094
0
                                Some(SubstreamInner::PingOut {
1095
0
                                    negotiation,
1096
0
                                    expected_payload,
1097
0
                                    outgoing_payload,
1098
0
                                    queued_pings,
1099
0
                                }),
1100
0
                                Some(Event::PingOutSuccess {
1101
0
                                    ping_time: read_write.now.clone() - when_started,
1102
0
                                }),
1103
0
                            );
1104
0
                        }
1105
24
                    }
1106
78
                }
1107
1108
102
                (
1109
102
                    Some(SubstreamInner::PingOut {
1110
102
                        negotiation,
1111
102
                        expected_payload,
1112
102
                        outgoing_payload,
1113
102
                        queued_pings,
1114
102
                    }),
1115
102
                    None,
1116
102
                )
1117
            }
1118
        }
1119
329
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2Ba_
Line
Count
Source
338
329
    fn read_write2(
339
329
        self,
340
329
        read_write: &'_ mut read_write::ReadWrite<TNow>,
341
329
    ) -> (Option<SubstreamInner<TNow>>, Option<Event>) {
342
329
        match self.inner {
343
102
            SubstreamInner::InboundNegotiating(nego, was_rejected_already) => {
344
102
                match nego.read_write(read_write) {
345
81
                    Ok(multistream_select::Negotiation::InProgress(nego)) => (
346
81
                        Some(SubstreamInner::InboundNegotiating(
347
81
                            nego,
348
81
                            was_rejected_already,
349
81
                        )),
350
81
                        None,
351
81
                    ),
352
20
                    Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(accept_deny)) => {
353
20
                        // TODO: maybe avoid cloning the protocol name?
354
20
                        let protocol = accept_deny.requested_protocol().to_owned();
355
20
                        (
356
20
                            Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)),
357
20
                            Some(Event::InboundNegotiated(protocol)),
358
20
                        )
359
                    }
360
                    Ok(multistream_select::Negotiation::Success) => {
361
                        // Unreachable, as we expect a `ListenerAcceptOrDeny`.
362
0
                        unreachable!()
363
                    }
364
                    Ok(multistream_select::Negotiation::NotAvailable) => {
365
                        // Unreachable in listener mode.
366
0
                        unreachable!()
367
                    }
368
1
                    Err(_) if was_rejected_already => {
369
1
                        // If the negotiation was already rejected once, it is likely that the
370
1
                        // multistream-select protocol error is due to the fact that the remote
371
1
                        // assumes that the multistream-select negotiation always succeeds. As such,
372
1
                        // we treat this situation as "negotiation has failed gracefully".
373
1
                        (Some(SubstreamInner::InboundFailed), None)
374
                    }
375
0
                    Err(err) => (
376
0
                        None,
377
0
                        Some(Event::InboundError {
378
0
                            error: InboundError::NegotiationError(err),
379
0
                            was_accepted: false,
380
0
                        }),
381
0
                    ),
382
                }
383
            }
384
0
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => (
385
0
                Some(SubstreamInner::InboundNegotiatingApiWait(accept_deny)),
386
0
                None,
387
0
            ),
388
18
            SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty) => {
389
18
                match nego.read_write(read_write) {
390
0
                    Ok(multistream_select::Negotiation::InProgress(nego)) => (
391
0
                        Some(SubstreamInner::InboundNegotiatingAccept(nego, inbound_ty)),
392
0
                        None,
393
0
                    ),
394
                    Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
395
                        // Can't be reached again, as we have already accepted the protocol.
396
0
                        unreachable!()
397
                    }
398
18
                    Ok(multistream_select::Negotiation::Success) => match inbound_ty {
399
13
                        InboundTy::Ping => (
400
13
                            Some(SubstreamInner::PingIn {
401
13
                                payload_out: VecDeque::with_capacity(32),
402
13
                            }),
403
13
                            None,
404
13
                        ),
405
3
                        InboundTy::Notifications { max_handshake_size } => (
406
3
                            Some(SubstreamInner::NotificationsInHandshake {
407
3
                                handshake_size: None,
408
3
                                handshake_max_size: max_handshake_size,
409
3
                            }),
410
3
                            None,
411
3
                        ),
412
2
                        InboundTy::Request { request_max_size } => {
413
2
                            if let Some(request_max_size) = request_max_size {
414
2
                                (
415
2
                                    Some(SubstreamInner::RequestInRecv {
416
2
                                        request_max_size,
417
2
                                        request_size: None,
418
2
                                    }),
419
2
                                    None,
420
2
                                )
421
                            } else {
422
0
                                (Some(SubstreamInner::RequestInRecvEmpty), None)
423
                            }
424
                        }
425
                    },
426
                    Ok(multistream_select::Negotiation::NotAvailable) => {
427
                        // Unreachable in listener mode.
428
0
                        unreachable!()
429
                    }
430
0
                    Err(err) => (
431
0
                        None,
432
0
                        Some(Event::InboundError {
433
0
                            error: InboundError::NegotiationError(err),
434
0
                            was_accepted: true,
435
0
                        }),
436
0
                    ),
437
                }
438
            }
439
            SubstreamInner::InboundFailed => {
440
                // Substream is an inbound substream that has failed to negotiate a
441
                // protocol. The substream is expected to close soon, but the remote might
442
                // have been eagerly sending data (assuming that the negotiation would
443
                // succeed), which should be silently discarded.
444
1
                read_write.discard_all_incoming();
445
1
                read_write.close_write();
446
1
                if read_write.is_dead() {
447
1
                    (None, None)
448
                } else {
449
0
                    (Some(SubstreamInner::InboundFailed), None)
450
                }
451
            }
452
453
            SubstreamInner::NotificationsOutNegotiationFailed => {
454
                // Substream has failed to negotiate a protocol. The substream is expected to
455
                // close soon.
456
0
                read_write.discard_all_incoming();
457
0
                read_write.close_write();
458
0
                (
459
0
                    if read_write.is_dead() {
460
0
                        None
461
                    } else {
462
0
                        Some(SubstreamInner::NotificationsOutNegotiationFailed)
463
                    },
464
0
                    None,
465
                )
466
            }
467
            SubstreamInner::NotificationsOutHandshakeRecv {
468
30
                timeout,
469
30
                mut negotiation,
470
30
                mut handshake_in_size,
471
30
                handshake_in_max_size,
472
30
                mut handshake_out,
473
30
            } => {
474
30
                if timeout < read_write.now {
475
1
                    read_write.wake_up_asap();
476
1
                    return (
477
1
                        Some(SubstreamInner::NotificationsOutNegotiationFailed),
478
1
                        Some(Event::NotificationsOutResult {
479
1
                            result: Err(NotificationsOutErr::Timeout),
480
1
                        }),
481
1
                    );
482
29
                }
483
484
29
                if let Some(
extracted_negotiation25
) = negotiation.take() {
485
25
                    match extracted_negotiation.read_write(read_write) {
486
23
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
487
23
                            negotiation = Some(nego)
488
                        }
489
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
490
                            // Never happens when dialing.
491
0
                            unreachable!()
492
                        }
493
2
                        Ok(multistream_select::Negotiation::Success) => {}
494
                        Ok(multistream_select::Negotiation::NotAvailable) => {
495
0
                            read_write.wake_up_asap();
496
0
                            return (
497
0
                                Some(SubstreamInner::NotificationsOutNegotiationFailed),
498
0
                                Some(Event::NotificationsOutResult {
499
0
                                    result: Err(NotificationsOutErr::ProtocolNotAvailable),
500
0
                                }),
501
0
                            );
502
                        }
503
0
                        Err(err) => {
504
0
                            return (
505
0
                                None,
506
0
                                Some(Event::NotificationsOutResult {
507
0
                                    result: Err(NotificationsOutErr::NegotiationError(err)),
508
0
                                }),
509
0
                            )
510
                        }
511
                    }
512
4
                }
513
514
29
                if negotiation
515
29
                    .as_ref()
516
29
                    .map_or(true, |n| n.can_write_protocol_data())
517
17
                {
518
17
                    read_write.write_from_vec_deque(&mut handshake_out);
519
17
                }
12
520
521
29
                if negotiation.is_none() {
522
6
                    if read_write.expected_incoming_bytes.is_none() {
523
1
                        read_write.wake_up_asap();
524
1
                        return (
525
1
                            Some(SubstreamInner::NotificationsOutNegotiationFailed),
526
1
                            Some(Event::NotificationsOutResult {
527
1
                                result: Err(NotificationsOutErr::RefusedHandshake),
528
1
                            }),
529
1
                        );
530
5
                    }
531
5
532
5
                    // Don't actually process incoming data before handshake is sent out, in order
533
5
                    // to not accidentally perform a state transition.
534
5
                    if !handshake_out.is_empty() {
535
0
                        return (
536
0
                            Some(SubstreamInner::NotificationsOutHandshakeRecv {
537
0
                                timeout,
538
0
                                negotiation,
539
0
                                handshake_in_size,
540
0
                                handshake_in_max_size,
541
0
                                handshake_out,
542
0
                            }),
543
0
                            None,
544
0
                        );
545
5
                    }
546
547
5
                    if let Some(
handshake_in_size1
) = handshake_in_size {
548
1
                        match read_write.incoming_bytes_take(handshake_in_size) {
549
1
                            Ok(Some(remote_handshake)) => {
550
1
                                read_write.wake_up_asap();
551
1
                                return (
552
1
                                    Some(SubstreamInner::NotificationsOut {
553
1
                                        notifications: VecDeque::new(),
554
1
                                        close_demanded_by_remote: false,
555
1
                                    }),
556
1
                                    Some(Event::NotificationsOutResult {
557
1
                                        result: Ok(remote_handshake),
558
1
                                    }),
559
1
                                );
560
                            }
561
0
                            Ok(None) => {}
562
                            Err(read_write::IncomingBytesTakeError::ReadClosed) => {
563
0
                                read_write.wake_up_asap();
564
0
                                return (
565
0
                                    Some(SubstreamInner::NotificationsOutNegotiationFailed),
566
0
                                    Some(Event::NotificationsOutResult {
567
0
                                        result: Err(NotificationsOutErr::RefusedHandshake),
568
0
                                    }),
569
0
                                );
570
                            }
571
                        }
572
                    } else {
573
4
                        match read_write.incoming_bytes_take_leb128(handshake_in_max_size) {
574
1
                            Ok(Some(s)) => handshake_in_size = Some(s),
575
3
                            Ok(None) => {}
576
0
                            Err(error) => return (
577
0
                                None,
578
0
                                Some(Event::Response {
579
0
                                    response: Err(match error {
580
                                        read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => {
581
0
                                            RequestError::ResponseInvalidLeb128
582
                                        }
583
                                        read_write::IncomingBytesTakeLeb128Error::ReadClosed => {
584
0
                                            RequestError::SubstreamClosed
585
                                        }
586
                                        read_write::IncomingBytesTakeLeb128Error::TooLarge => {
587
0
                                            RequestError::ResponseTooLarge
588
                                        }
589
                                    }),
590
                                }),
591
                            ),
592
                        }
593
                    }
594
23
                }
595
596
27
                read_write.wake_up_after(&timeout);
597
27
598
27
                (
599
27
                    Some(SubstreamInner::NotificationsOutHandshakeRecv {
600
27
                        timeout,
601
27
                        negotiation,
602
27
                        handshake_in_size,
603
27
                        handshake_in_max_size,
604
27
                        handshake_out,
605
27
                    }),
606
27
                    None,
607
27
                )
608
            }
609
            SubstreamInner::NotificationsOut {
610
3
                mut notifications,
611
3
                close_demanded_by_remote,
612
3
            } => {
613
3
                // Receiving data on an outgoing substream is forbidden by the protocol.
614
3
                read_write.discard_all_incoming();
615
3
                read_write.write_from_vec_deque(&mut notifications);
616
3
617
3
                // If this debug assertion fails, it means that `expected_incoming_bytes` was `None` in
618
3
                // the past then became `Some` again.
619
3
                debug_assert!(
620
3
                    !close_demanded_by_remote || 
read_write.expected_incoming_bytes.is_none()0
621
                );
622
623
3
                if !close_demanded_by_remote && read_write.expected_incoming_bytes.is_none() {
624
0
                    read_write.wake_up_asap();
625
0
                    return (
626
0
                        Some(SubstreamInner::NotificationsOut {
627
0
                            notifications,
628
0
                            close_demanded_by_remote: true,
629
0
                        }),
630
0
                        Some(Event::NotificationsOutCloseDemanded),
631
0
                    );
632
3
                }
633
3
634
3
                (
635
3
                    Some(SubstreamInner::NotificationsOut {
636
3
                        notifications,
637
3
                        close_demanded_by_remote,
638
3
                    }),
639
3
                    None,
640
3
                )
641
            }
642
            SubstreamInner::NotificationsOutClosed => {
643
0
                read_write.discard_all_incoming();
644
0
                read_write.close_write();
645
0
                (
646
0
                    if read_write.is_dead() {
647
0
                        None
648
                    } else {
649
0
                        Some(SubstreamInner::NotificationsOutClosed)
650
                    },
651
0
                    None,
652
                )
653
            }
654
655
            SubstreamInner::RequestOut {
656
27
                timeout,
657
27
                mut negotiation,
658
27
                mut request,
659
27
                mut response_size,
660
27
                response_max_size,
661
27
            } => {
662
27
                // Note that this might trigger timeouts for requests whose response is available
663
27
                // in `incoming_buffer`. This is intentional, as from the perspective of
664
27
                // `read_write` the response arrived after the timeout. It is the responsibility
665
27
                // of the user to call `read_write` in an appropriate way for this to not happen.
666
27
                if timeout < read_write.now {
667
1
                    read_write.close_write();
668
1
                    return (
669
1
                        None,
670
1
                        Some(Event::Response {
671
1
                            response: Err(RequestError::Timeout),
672
1
                        }),
673
1
                    );
674
26
                }
675
676
26
                if let Some(
extracted_nego25
) = negotiation.take() {
677
25
                    match extracted_nego.read_write(read_write) {
678
23
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
679
23
                            negotiation = Some(nego)
680
                        }
681
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
682
                            // Never happens when dialing.
683
0
                            unreachable!()
684
                        }
685
1
                        Ok(multistream_select::Negotiation::Success) => {}
686
                        Ok(multistream_select::Negotiation::NotAvailable) => {
687
1
                            return (
688
1
                                None,
689
1
                                Some(Event::Response {
690
1
                                    response: Err(RequestError::ProtocolNotAvailable),
691
1
                                }),
692
1
                            )
693
                        }
694
0
                        Err(err) => {
695
0
                            return (
696
0
                                None,
697
0
                                Some(Event::Response {
698
0
                                    response: Err(RequestError::NegotiationError(err)),
699
0
                                }),
700
0
                            )
701
                        }
702
                    }
703
1
                }
704
705
25
                if negotiation
706
25
                    .as_ref()
707
25
                    .map_or(true, |n| n.can_write_protocol_data())
708
                {
709
13
                    if request.is_empty() {
710
10
                        read_write.close_write();
711
10
                    } else {
712
3
                        read_write.write_from_vec_deque(&mut request);
713
3
                    }
714
12
                }
715
716
25
                if negotiation.is_none() {
717
2
                    if let Some(
response_size0
) = response_size {
718
0
                        match read_write.incoming_bytes_take(response_size) {
719
0
                            Ok(Some(response)) => {
720
0
                                return (
721
0
                                    None,
722
0
                                    Some(Event::Response {
723
0
                                        response: Ok(response),
724
0
                                    }),
725
0
                                );
726
                            }
727
0
                            Ok(None) => {}
728
                            Err(read_write::IncomingBytesTakeError::ReadClosed) => {
729
0
                                return (
730
0
                                    None,
731
0
                                    Some(Event::Response {
732
0
                                        response: Err(RequestError::SubstreamClosed),
733
0
                                    }),
734
0
                                )
735
                            }
736
                        }
737
                    } else {
738
2
                        match read_write.incoming_bytes_take_leb128(response_max_size) {
739
0
                            Ok(Some(s)) => response_size = Some(s),
740
1
                            Ok(None) => {}
741
1
                            Err(error) => return (
742
1
                                None,
743
1
                                Some(Event::Response {
744
1
                                    response: Err(match error {
745
                                        read_write::IncomingBytesTakeLeb128Error::InvalidLeb128 => {
746
0
                                            RequestError::ResponseInvalidLeb128
747
                                        }
748
                                        read_write::IncomingBytesTakeLeb128Error::ReadClosed => {
749
1
                                            RequestError::SubstreamClosed
750
                                        }
751
                                        read_write::IncomingBytesTakeLeb128Error::TooLarge => {
752
0
                                            RequestError::ResponseTooLarge
753
                                        }
754
                                    }),
755
                                }),
756
                            ),
757
                        }
758
                    }
759
23
                }
760
761
24
                read_write.wake_up_after(&timeout);
762
24
763
24
                (
764
24
                    Some(SubstreamInner::RequestOut {
765
24
                        timeout,
766
24
                        negotiation,
767
24
                        request,
768
24
                        response_size,
769
24
                        response_max_size,
770
24
                    }),
771
24
                    None,
772
24
                )
773
            }
774
775
            SubstreamInner::RequestInRecv {
776
4
                mut request_size,
777
4
                request_max_size,
778
            } => {
779
4
                if let Some(
request_size2
) = request_size {
780
2
                    match read_write.incoming_bytes_take(request_size) {
781
2
                        Ok(Some(request)) => {
782
2
                            return (
783
2
                                Some(SubstreamInner::RequestInApiWait),
784
2
                                Some(Event::RequestIn { request }),
785
2
                            );
786
                        }
787
0
                        Ok(None) => {}
788
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
789
0
                            return (
790
0
                                None,
791
0
                                Some(Event::InboundError {
792
0
                                    error: InboundError::SubstreamClosed,
793
0
                                    was_accepted: true,
794
0
                                }),
795
0
                            )
796
                        }
797
                    }
798
                } else {
799
2
                    match read_write.incoming_bytes_take_leb128(request_max_size) {
800
2
                        Ok(Some(s)) => request_size = Some(s),
801
0
                        Ok(None) => {}
802
0
                        Err(error) => {
803
0
                            return (
804
0
                                None,
805
0
                                Some(Event::InboundError {
806
0
                                    error: InboundError::RequestInLebError(error),
807
0
                                    was_accepted: true,
808
0
                                }),
809
0
                            )
810
                        }
811
                    }
812
                }
813
814
2
                (
815
2
                    Some(SubstreamInner::RequestInRecv {
816
2
                        request_max_size,
817
2
                        request_size,
818
2
                    }),
819
2
                    None,
820
2
                )
821
            }
822
0
            SubstreamInner::RequestInRecvEmpty => (
823
0
                Some(SubstreamInner::RequestInApiWait),
824
0
                Some(Event::RequestIn {
825
0
                    request: Vec::new(),
826
0
                }),
827
0
            ),
828
0
            SubstreamInner::RequestInApiWait => (Some(SubstreamInner::RequestInApiWait), None),
829
1
            SubstreamInner::RequestInRespond { mut response } => {
830
1
                if response.is_empty() {
831
1
                    read_write.close_write();
832
1
                    (None, None)
833
                } else {
834
0
                    read_write.write_from_vec_deque(&mut response);
835
0
                    (Some(SubstreamInner::RequestInRespond { response }), None)
836
                }
837
            }
838
839
            SubstreamInner::NotificationsInHandshake {
840
6
                handshake_max_size,
841
6
                mut handshake_size,
842
            } => {
843
6
                if let Some(
handshake_size3
) = handshake_size {
844
3
                    match read_write.incoming_bytes_take(handshake_size) {
845
3
                        Ok(Some(handshake)) => {
846
3
                            return (
847
3
                                Some(SubstreamInner::NotificationsInWait),
848
3
                                Some(Event::NotificationsInOpen { handshake }),
849
3
                            );
850
                        }
851
0
                        Ok(None) => {}
852
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
853
0
                            return (
854
0
                                None,
855
0
                                Some(Event::InboundError {
856
0
                                    error: InboundError::SubstreamClosed,
857
0
                                    was_accepted: true,
858
0
                                }),
859
0
                            )
860
                        }
861
                    }
862
                } else {
863
3
                    match read_write.incoming_bytes_take_leb128(handshake_max_size) {
864
3
                        Ok(Some(s)) => handshake_size = Some(s),
865
0
                        Ok(None) => {}
866
0
                        Err(error) => {
867
0
                            return (
868
0
                                None,
869
0
                                Some(Event::InboundError {
870
0
                                    error: InboundError::NotificationsInError { error },
871
0
                                    was_accepted: true,
872
0
                                }),
873
0
                            )
874
                        }
875
                    }
876
                }
877
878
3
                (
879
3
                    Some(SubstreamInner::NotificationsInHandshake {
880
3
                        handshake_max_size,
881
3
                        handshake_size,
882
3
                    }),
883
3
                    None,
884
3
                )
885
            }
886
            SubstreamInner::NotificationsInWait => {
887
                // Incoming data isn't processed, potentially back-pressuring it.
888
0
                if read_write.expected_incoming_bytes.is_some() {
889
0
                    (Some(SubstreamInner::NotificationsInWait), None)
890
                } else {
891
0
                    read_write.wake_up_asap();
892
0
                    (
893
0
                        Some(SubstreamInner::NotificationsInRefused),
894
0
                        Some(Event::NotificationsInOpenCancel),
895
0
                    )
896
                }
897
            }
898
            SubstreamInner::NotificationsInRefused => {
899
2
                read_write.discard_all_incoming();
900
2
                read_write.close_write();
901
2
                (
902
2
                    if read_write.is_dead() {
903
0
                        None
904
                    } else {
905
2
                        Some(SubstreamInner::NotificationsInRefused)
906
                    },
907
2
                    None,
908
                )
909
            }
910
            SubstreamInner::NotificationsIn {
911
9
                close_desired_timeout,
912
9
                mut next_notification_size,
913
9
                mut handshake,
914
9
                max_notification_size,
915
9
            } => {
916
9
                read_write.write_from_vec_deque(&mut handshake);
917
9
918
9
                if close_desired_timeout
919
9
                    .as_ref()
920
9
                    .map_or(false, |timeout| *timeout >= read_write.now)
921
                {
922
0
                    read_write.wake_up_asap();
923
0
                    return (
924
0
                        Some(SubstreamInner::NotificationsInClosed),
925
0
                        Some(Event::NotificationsInClose {
926
0
                            outcome: Err(NotificationsInClosedErr::CloseDesiredTimeout),
927
0
                        }),
928
0
                    );
929
9
                }
930
9
931
9
                if close_desired_timeout.is_some() && 
handshake.is_empty()0
{
932
0
                    read_write.close_write();
933
9
                }
934
935
9
                let mut notification = None;
936
937
9
                if let Some(
sz3
) = next_notification_size {
938
3
                    match read_write.incoming_bytes_take(sz) {
939
3
                        Ok(Some(notif)) => {
940
3
                            read_write.wake_up_asap();
941
3
                            notification = Some(notif);
942
3
                            next_notification_size = None;
943
3
                        }
944
0
                        Ok(None) => {}
945
                        Err(read_write::IncomingBytesTakeError::ReadClosed) => {
946
0
                            read_write.wake_up_asap();
947
0
                            return (
948
0
                                Some(SubstreamInner::NotificationsInClosed),
949
0
                                Some(Event::NotificationsInClose {
950
0
                                    outcome: Err(NotificationsInClosedErr::SubstreamClosed),
951
0
                                }),
952
0
                            );
953
                        }
954
                    }
955
                } else {
956
6
                    match read_write.incoming_bytes_take_leb128(max_notification_size) {
957
3
                        Ok(Some(s)) => next_notification_size = Some(s),
958
3
                        Ok(None) => {}
959
0
                        Err(error) => {
960
0
                            read_write.wake_up_asap();
961
0
                            return (
962
0
                                Some(SubstreamInner::NotificationsInClosed),
963
0
                                Some(Event::NotificationsInClose {
964
0
                                    outcome: Err(NotificationsInClosedErr::ProtocolError(error)),
965
0
                                }),
966
0
                            );
967
                        }
968
                    }
969
                }
970
971
9
                (
972
9
                    Some(SubstreamInner::NotificationsIn {
973
9
                        close_desired_timeout,
974
9
                        next_notification_size,
975
9
                        handshake,
976
9
                        max_notification_size,
977
9
                    }),
978
9
                    notification.map(|n| Event::NotificationIn { notification: n }),
979
9
                )
980
            }
981
            SubstreamInner::NotificationsInClosed => {
982
0
                read_write.discard_all_incoming();
983
0
                read_write.close_write();
984
0
                (
985
0
                    if read_write.is_dead() {
986
0
                        None
987
                    } else {
988
0
                        Some(SubstreamInner::NotificationsInClosed)
989
                    },
990
0
                    None,
991
                )
992
            }
993
994
24
            SubstreamInner::PingIn { mut payload_out } => {
995
24
                // Inbound ping substream.
996
24
                // The ping protocol consists in sending 32 bytes of data, which the remote has
997
24
                // to send back.
998
24
                read_write.write_from_vec_deque(&mut payload_out);
999
24
                if payload_out.is_empty() {
1000
24
                    if let Ok(Some(
ping0
)) = read_write.incoming_bytes_take(32) {
1001
0
                        payload_out.extend(ping);
1002
24
                    }
1003
0
                }
1004
1005
24
                (Some(SubstreamInner::PingIn { payload_out }), None)
1006
            }
1007
1008
0
            SubstreamInner::PingOutFailed { mut queued_pings } => {
1009
0
                read_write.close_write();
1010
0
                if !queued_pings.is_empty() {
1011
0
                    queued_pings.remove(0);
1012
0
                    read_write.wake_up_asap();
1013
0
                    (
1014
0
                        Some(SubstreamInner::PingOutFailed { queued_pings }),
1015
0
                        Some(Event::PingOutError {
1016
0
                            num_pings: NonZeroUsize::new(1).unwrap(),
1017
0
                        }),
1018
0
                    )
1019
                } else {
1020
0
                    (Some(SubstreamInner::PingOutFailed { queued_pings }), None)
1021
                }
1022
            }
1023
            SubstreamInner::PingOut {
1024
102
                mut negotiation,
1025
102
                mut queued_pings,
1026
102
                mut outgoing_payload,
1027
102
                mut expected_payload,
1028
            } => {
1029
102
                if let Some(
extracted_negotiation90
) = negotiation.take() {
1030
90
                    match extracted_negotiation.read_write(read_write) {
1031
78
                        Ok(multistream_select::Negotiation::InProgress(nego)) => {
1032
78
                            negotiation = Some(nego)
1033
                        }
1034
                        Ok(multistream_select::Negotiation::ListenerAcceptOrDeny(_)) => {
1035
                            // Never happens when dialing.
1036
0
                            unreachable!()
1037
                        }
1038
12
                        Ok(multistream_select::Negotiation::Success) => {}
1039
                        Ok(multistream_select::Negotiation::NotAvailable) => {
1040
0
                            read_write.wake_up_asap();
1041
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1042
                        }
1043
                        Err(_) => {
1044
0
                            read_write.wake_up_asap();
1045
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1046
                        }
1047
                    }
1048
12
                }
1049
1050
102
                if negotiation
1051
102
                    .as_ref()
1052
102
                    .map_or(true, |n| n.can_write_protocol_data())
1053
48
                {
1054
48
                    read_write.write_from_vec_deque(&mut outgoing_payload);
1055
54
                }
1056
1057
                // We check the timeouts before checking the incoming data, as otherwise pings
1058
                // might succeed after their timeout.
1059
102
                for 
timeout6
in queued_pings.iter_mut() {
1060
6
                    if timeout.as_ref().map_or(false, |(when_started, timeout)| {
1061
                        (read_write.now.clone() - when_started.clone()) >= *timeout
1062
6
                    }) {
1063
0
                        *timeout = None;
1064
0
                        read_write.wake_up_asap();
1065
0
                        return (
1066
0
                            Some(SubstreamInner::PingOut {
1067
0
                                negotiation,
1068
0
                                expected_payload,
1069
0
                                outgoing_payload,
1070
0
                                queued_pings,
1071
0
                            }),
1072
0
                            Some(Event::PingOutError {
1073
0
                                num_pings: NonZeroUsize::new(1).unwrap(),
1074
0
                            }),
1075
0
                        );
1076
6
                    }
1077
1078
6
                    if let Some((when_started, timeout)) = timeout {
1079
6
                        read_write.wake_up_after(&(when_started.clone() + *timeout));
1080
6
                    }
0
1081
                }
1082
1083
102
                if negotiation.is_none() {
1084
24
                    if let Ok(Some(
pong0
)) = read_write.incoming_bytes_take(32) {
1085
0
                        if expected_payload
1086
0
                            .pop_front()
1087
0
                            .map_or(true, |expected| pong != *expected)
1088
                        {
1089
0
                            read_write.wake_up_asap();
1090
0
                            return (Some(SubstreamInner::PingOutFailed { queued_pings }), None);
1091
0
                        }
1092
0
                        if let Some((when_started, _)) = queued_pings.remove(0) {
1093
0
                            return (
1094
0
                                Some(SubstreamInner::PingOut {
1095
0
                                    negotiation,
1096
0
                                    expected_payload,
1097
0
                                    outgoing_payload,
1098
0
                                    queued_pings,
1099
0
                                }),
1100
0
                                Some(Event::PingOutSuccess {
1101
0
                                    ping_time: read_write.now.clone() - when_started,
1102
0
                                }),
1103
0
                            );
1104
0
                        }
1105
24
                    }
1106
78
                }
1107
1108
102
                (
1109
102
                    Some(SubstreamInner::PingOut {
1110
102
                        negotiation,
1111
102
                        expected_payload,
1112
102
                        outgoing_payload,
1113
102
                        queued_pings,
1114
102
                    }),
1115
102
                    None,
1116
102
                )
1117
            }
1118
        }
1119
329
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE11read_write2CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE11read_write2Ba_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE11read_write2CsiUjFBJteJ7x_17smoldot_full_node
1120
1121
0
    pub fn reset(self) -> Option<Event> {
1122
0
        match self.inner {
1123
0
            SubstreamInner::InboundNegotiating(_, _) => None,
1124
0
            SubstreamInner::InboundNegotiatingAccept(_, _) => None,
1125
0
            SubstreamInner::InboundNegotiatingApiWait(_) => Some(Event::InboundNegotiatedCancel),
1126
0
            SubstreamInner::InboundFailed => None,
1127
0
            SubstreamInner::RequestOut { .. } => Some(Event::Response {
1128
0
                response: Err(RequestError::SubstreamReset),
1129
0
            }),
1130
0
            SubstreamInner::NotificationsInHandshake { .. } => None,
1131
0
            SubstreamInner::NotificationsInWait { .. } => Some(Event::NotificationsInOpenCancel),
1132
0
            SubstreamInner::NotificationsIn { .. } => Some(Event::NotificationsInClose {
1133
0
                outcome: Err(NotificationsInClosedErr::SubstreamReset),
1134
0
            }),
1135
0
            SubstreamInner::NotificationsInRefused => None,
1136
0
            SubstreamInner::NotificationsInClosed => None,
1137
            SubstreamInner::NotificationsOutHandshakeRecv { .. } => {
1138
0
                Some(Event::NotificationsOutResult {
1139
0
                    result: Err(NotificationsOutErr::SubstreamReset),
1140
0
                })
1141
            }
1142
0
            SubstreamInner::NotificationsOutNegotiationFailed => None,
1143
0
            SubstreamInner::NotificationsOut { .. } => Some(Event::NotificationsOutReset),
1144
0
            SubstreamInner::NotificationsOutClosed { .. } => None,
1145
0
            SubstreamInner::PingIn { .. } => None,
1146
0
            SubstreamInner::RequestInRecv { .. } => None,
1147
0
            SubstreamInner::RequestInRecvEmpty { .. } => None,
1148
0
            SubstreamInner::RequestInApiWait => None,
1149
0
            SubstreamInner::RequestInRespond { .. } => None,
1150
0
            SubstreamInner::PingOut { queued_pings, .. }
1151
0
            | SubstreamInner::PingOutFailed { queued_pings, .. } => {
1152
0
                NonZeroUsize::new(queued_pings.len())
1153
0
                    .map(|num_pings| Event::PingOutError { num_pings })
Unexecuted instantiation: _RNCNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5reset0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5reset0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreampE5reset0Bc_
Unexecuted instantiation: _RNCNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB4_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE5reset0CsiUjFBJteJ7x_17smoldot_full_node
1154
            }
1155
        }
1156
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5resetBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE5resetCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE5resetBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE5resetCsiUjFBJteJ7x_17smoldot_full_node
1157
1158
    /// Accepts an inbound notifications protocol. Must be called in response to a
1159
    /// [`Event::NotificationsInOpen`].
1160
    ///
1161
    /// # Panic
1162
    ///
1163
    /// Panics if this substream is not of the correct type.
1164
    ///
1165
1
    pub fn accept_in_notifications_substream(
1166
1
        &mut self,
1167
1
        handshake: Vec<u8>,
1168
1
        max_notification_size: usize,
1169
1
    ) {
1170
1
        if let SubstreamInner::NotificationsInWait = &mut self.inner {
1171
1
            self.inner = SubstreamInner::NotificationsIn {
1172
1
                close_desired_timeout: None,
1173
1
                next_notification_size: None,
1174
1
                handshake: {
1175
1
                    let handshake_len = handshake.len();
1176
1
                    leb128::encode_usize(handshake_len)
1177
1
                        .chain(handshake)
1178
1
                        .collect::<VecDeque<_>>()
1179
1
                },
1180
1
                max_notification_size,
1181
1
            }
1182
0
        }
1183
1184
        // TODO: too defensive, should be } else { panic!() }
1185
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33accept_in_notifications_substreamBa_
Line
Count
Source
1165
1
    pub fn accept_in_notifications_substream(
1166
1
        &mut self,
1167
1
        handshake: Vec<u8>,
1168
1
        max_notification_size: usize,
1169
1
    ) {
1170
1
        if let SubstreamInner::NotificationsInWait = &mut self.inner {
1171
1
            self.inner = SubstreamInner::NotificationsIn {
1172
1
                close_desired_timeout: None,
1173
1
                next_notification_size: None,
1174
1
                handshake: {
1175
1
                    let handshake_len = handshake.len();
1176
1
                    leb128::encode_usize(handshake_len)
1177
1
                        .chain(handshake)
1178
1
                        .collect::<VecDeque<_>>()
1179
1
                },
1180
1
                max_notification_size,
1181
1
            }
1182
0
        }
1183
1184
        // TODO: too defensive, should be } else { panic!() }
1185
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33accept_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33accept_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
1186
1187
    /// Rejects an inbound notifications protocol. Must be called in response to a
1188
    /// [`Event::NotificationsInOpen`].
1189
    ///
1190
    /// # Panic
1191
    ///
1192
    /// Panics if this substream is not of the correct type.
1193
    ///
1194
1
    pub fn reject_in_notifications_substream(&mut self) {
1195
1
        match &mut self.inner {
1196
1
            SubstreamInner::NotificationsInWait { .. } => {
1197
1
                self.inner = SubstreamInner::NotificationsInRefused;
1198
1
            }
1199
0
            _ => panic!(),
1200
        }
1201
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33reject_in_notifications_substreamBa_
Line
Count
Source
1194
1
    pub fn reject_in_notifications_substream(&mut self) {
1195
1
        match &mut self.inner {
1196
1
            SubstreamInner::NotificationsInWait { .. } => {
1197
1
                self.inner = SubstreamInner::NotificationsInRefused;
1198
1
            }
1199
0
            _ => panic!(),
1200
        }
1201
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33reject_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33reject_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
1202
1203
    /// Queues a notification to be written out on the given substream.
1204
    ///
1205
    /// # Panic
1206
    ///
1207
    /// Panics if the substream isn't a notifications substream, or if the notifications substream
1208
    /// isn't in the appropriate state.
1209
    ///
1210
3
    pub fn write_notification_unbounded(&mut self, notification: Vec<u8>) {
1211
3
        match &mut self.inner {
1212
3
            SubstreamInner::NotificationsOut { notifications, .. } => {
1213
3
                // TODO: expensive copying?
1214
3
                notifications.extend(leb128::encode_usize(notification.len()));
1215
3
                notifications.extend(notification);
1216
3
            }
1217
0
            _ => panic!(),
1218
        }
1219
3
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE28write_notification_unboundedBa_
Line
Count
Source
1210
3
    pub fn write_notification_unbounded(&mut self, notification: Vec<u8>) {
1211
3
        match &mut self.inner {
1212
3
            SubstreamInner::NotificationsOut { notifications, .. } => {
1213
3
                // TODO: expensive copying?
1214
3
                notifications.extend(leb128::encode_usize(notification.len()));
1215
3
                notifications.extend(notification);
1216
3
            }
1217
0
            _ => panic!(),
1218
        }
1219
3
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE28write_notification_unboundedBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE28write_notification_unboundedCsiUjFBJteJ7x_17smoldot_full_node
1220
1221
    /// Returns the number of bytes waiting to be sent out on that substream.
1222
    ///
1223
    /// See the documentation of [`Substream::write_notification_unbounded`] for context.
1224
    ///
1225
    /// # Panic
1226
    ///
1227
    /// Panics if the substream isn't a notifications substream, or if the notifications substream
1228
    /// isn't in the appropriate state.
1229
    ///
1230
0
    pub fn notification_substream_queued_bytes(&self) -> usize {
1231
0
        match &self.inner {
1232
0
            SubstreamInner::NotificationsOut { notifications, .. } => notifications.len(),
1233
0
            _ => panic!(),
1234
        }
1235
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE35notification_substream_queued_bytesBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE35notification_substream_queued_bytesBa_
1236
1237
    /// Closes a outgoing notifications substream opened after a successful
1238
    /// [`Event::NotificationsOutResult`].
1239
    ///
1240
    /// This can be done even when in the negotiation phase, in other words before the remote has
1241
    /// accepted/refused the substream.
1242
    ///
1243
    /// # Panic
1244
    ///
1245
    /// Panics if the substream isn't a notifications substream, or if the notifications substream
1246
    /// isn't in the appropriate state.
1247
    ///
1248
0
    pub fn close_out_notifications_substream(&mut self) {
1249
0
        match &mut self.inner {
1250
            SubstreamInner::NotificationsOutHandshakeRecv { .. }
1251
0
            | SubstreamInner::NotificationsOut { .. } => {
1252
0
                self.inner = SubstreamInner::NotificationsOutClosed;
1253
0
            }
1254
0
            _ => panic!(),
1255
        };
1256
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33close_out_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE33close_out_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE33close_out_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
1257
1258
    /// Closes an ingoing notifications substream that was accepted using
1259
    /// [`Substream::accept_in_notifications_substream`].
1260
    ///
1261
    /// Notifications can continue to be received. Calling this function only asynchronously
1262
    /// signals to the remote that the substream should be closed. The closing is enforced only
1263
    /// after the given timeout elapses.
1264
    ///
1265
    /// # Panic
1266
    ///
1267
    /// Panics if the substream isn't a notifications substream, or if the notifications substream
1268
    /// isn't in the appropriate state.
1269
    ///
1270
0
    pub fn close_in_notifications_substream(&mut self, timeout: TNow) {
1271
0
        match &mut self.inner {
1272
0
            SubstreamInner::NotificationsIn {
1273
0
                close_desired_timeout,
1274
0
                ..
1275
0
            } if close_desired_timeout.is_none() => {
1276
0
                *close_desired_timeout = Some(timeout);
1277
0
            }
1278
0
            _ => panic!(),
1279
        };
1280
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE32close_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE32close_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE32close_in_notifications_substreamCsiUjFBJteJ7x_17smoldot_full_node
1281
1282
    /// Queues a ping on the given substream. Must be passed a randomly-generated payload of 32
1283
    /// bytes, the time after which this ping is considered as failed.
1284
    ///
1285
    /// # Panic
1286
    ///
1287
    /// Panics if the substream isn't an outgoing ping substream.
1288
    ///
1289
2
    pub fn queue_ping(&mut self, payload: &[u8; 32], now: TNow, timeout: Duration) {
1290
2
        match &mut self.inner {
1291
2
            SubstreamInner::PingOut { queued_pings, .. }
1292
2
            | SubstreamInner::PingOutFailed { 
queued_pings0
, .. } => {
1293
2
                queued_pings.push(Some((now, timeout)));
1294
2
            }
1295
0
            _ => panic!(),
1296
        }
1297
1298
2
        match &mut self.inner {
1299
            SubstreamInner::PingOut {
1300
2
                outgoing_payload,
1301
2
                expected_payload,
1302
2
                ..
1303
2
            } => {
1304
2
                outgoing_payload.extend(payload.iter().copied());
1305
2
                expected_payload.push_back(payload.to_vec());
1306
2
            }
1307
0
            SubstreamInner::PingOutFailed { .. } => {}
1308
0
            _ => panic!(),
1309
        }
1310
2
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10queue_pingBa_
Line
Count
Source
1289
2
    pub fn queue_ping(&mut self, payload: &[u8; 32], now: TNow, timeout: Duration) {
1290
2
        match &mut self.inner {
1291
2
            SubstreamInner::PingOut { queued_pings, .. }
1292
2
            | SubstreamInner::PingOutFailed { 
queued_pings0
, .. } => {
1293
2
                queued_pings.push(Some((now, timeout)));
1294
2
            }
1295
0
            _ => panic!(),
1296
        }
1297
1298
2
        match &mut self.inner {
1299
            SubstreamInner::PingOut {
1300
2
                outgoing_payload,
1301
2
                expected_payload,
1302
2
                ..
1303
2
            } => {
1304
2
                outgoing_payload.extend(payload.iter().copied());
1305
2
                expected_payload.push_back(payload.to_vec());
1306
2
            }
1307
0
            SubstreamInner::PingOutFailed { .. } => {}
1308
0
            _ => panic!(),
1309
        }
1310
2
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE10queue_pingCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE10queue_pingBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE10queue_pingCsiUjFBJteJ7x_17smoldot_full_node
1311
1312
    /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`].
1313
    ///
1314
    /// Passing an `Err` corresponds, on the other side, to a [`RequestError::SubstreamClosed`].
1315
1
    pub fn respond_in_request(
1316
1
        &mut self,
1317
1
        response: Result<Vec<u8>, ()>,
1318
1
    ) -> Result<(), RespondInRequestError> {
1319
1
        match &mut self.inner {
1320
            SubstreamInner::RequestInApiWait => {
1321
1
                self.inner = SubstreamInner::RequestInRespond {
1322
1
                    response: if let Ok(
response0
) = response {
1323
0
                        let response_len = response.len();
1324
0
                        leb128::encode_usize(response_len).chain(response).collect()
1325
                    } else {
1326
                        // An error is indicated by closing the substream without even sending
1327
                        // back the length of the response.
1328
1
                        VecDeque::new()
1329
                    },
1330
                };
1331
1332
1
                Ok(())
1333
            }
1334
            // TODO: handle substream closed
1335
0
            _ => panic!(),
1336
        }
1337
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestBa_
Line
Count
Source
1315
1
    pub fn respond_in_request(
1316
1
        &mut self,
1317
1
        response: Result<Vec<u8>, ()>,
1318
1
    ) -> Result<(), RespondInRequestError> {
1319
1
        match &mut self.inner {
1320
            SubstreamInner::RequestInApiWait => {
1321
1
                self.inner = SubstreamInner::RequestInRespond {
1322
1
                    response: if let Ok(
response0
) = response {
1323
0
                        let response_len = response.len();
1324
0
                        leb128::encode_usize(response_len).chain(response).collect()
1325
                    } else {
1326
                        // An error is indicated by closing the substream without even sending
1327
                        // back the length of the response.
1328
1
                        VecDeque::new()
1329
                    },
1330
                };
1331
1332
1
                Ok(())
1333
            }
1334
            // TODO: handle substream closed
1335
0
            _ => panic!(),
1336
        }
1337
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE18respond_in_requestBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE18respond_in_requestCsiUjFBJteJ7x_17smoldot_full_node
1338
1339
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol
1340
    /// name and indicate the type of the protocol.
1341
    ///
1342
    /// # Panic
1343
    ///
1344
    /// Panics if the substream is not in the correct state.
1345
    ///
1346
19
    pub fn accept_inbound(&mut self, ty: InboundTy) {
1347
19
        match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) {
1348
19
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => {
1349
19
                self.inner = SubstreamInner::InboundNegotiatingAccept(accept_deny.accept(), ty)
1350
            }
1351
0
            _ => panic!(),
1352
        }
1353
19
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundBa_
Line
Count
Source
1346
19
    pub fn accept_inbound(&mut self, ty: InboundTy) {
1347
19
        match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) {
1348
19
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => {
1349
19
                self.inner = SubstreamInner::InboundNegotiatingAccept(accept_deny.accept(), ty)
1350
            }
1351
0
            _ => panic!(),
1352
        }
1353
19
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE14accept_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE14accept_inboundCsiUjFBJteJ7x_17smoldot_full_node
1354
1355
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the
1356
    /// protocol name as not supported.
1357
    ///
1358
    /// # Panic
1359
    ///
1360
    /// Panics if the substream is not in the correct state.
1361
    ///
1362
1
    pub fn reject_inbound(&mut self) {
1363
1
        match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) {
1364
1
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => {
1365
1
                self.inner = SubstreamInner::InboundNegotiating(accept_deny.reject(), true)
1366
            }
1367
0
            _ => panic!(),
1368
        }
1369
1
    }
_RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundBa_
Line
Count
Source
1362
1
    pub fn reject_inbound(&mut self) {
1363
1
        match mem::replace(&mut self.inner, SubstreamInner::InboundFailed) {
1364
1
            SubstreamInner::InboundNegotiatingApiWait(accept_deny) => {
1365
1
                self.inner = SubstreamInner::InboundNegotiating(accept_deny.reject(), true)
1366
            }
1367
0
            _ => panic!(),
1368
        }
1369
1
    }
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsaYZPK01V26L_4core4time8DurationE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreampE14reject_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamINtB2_9SubstreamNtNtCsbpXXxgr6u8g_3std4time7InstantE14reject_inboundCsiUjFBJteJ7x_17smoldot_full_node
1370
}
1371
1372
impl<TNow> fmt::Debug for Substream<TNow> {
1373
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1374
0
        match &self.inner {
1375
0
            SubstreamInner::InboundFailed => f.debug_tuple("incoming-negotiation-failed").finish(),
1376
            SubstreamInner::InboundNegotiating(_, _) => {
1377
0
                f.debug_tuple("incoming-negotiating").finish()
1378
            }
1379
            SubstreamInner::InboundNegotiatingAccept(_, _) => {
1380
0
                f.debug_tuple("incoming-negotiating-after-accept").finish()
1381
            }
1382
            SubstreamInner::InboundNegotiatingApiWait(..) => {
1383
0
                f.debug_tuple("incoming-negotiated-api-wait").finish()
1384
            }
1385
            SubstreamInner::NotificationsOutHandshakeRecv { .. } => {
1386
0
                f.debug_tuple("notifications-out-handshake-recv").finish()
1387
            }
1388
0
            SubstreamInner::NotificationsOutNegotiationFailed { .. } => f
1389
0
                .debug_tuple("notifications-out-negotiation-failed")
1390
0
                .finish(),
1391
0
            SubstreamInner::NotificationsOut { .. } => f.debug_tuple("notifications-out").finish(),
1392
            SubstreamInner::NotificationsOutClosed { .. } => {
1393
0
                f.debug_tuple("notifications-out-closed").finish()
1394
            }
1395
            SubstreamInner::NotificationsInHandshake { .. } => {
1396
0
                f.debug_tuple("notifications-in-handshake").finish()
1397
            }
1398
            SubstreamInner::NotificationsInWait { .. } => {
1399
0
                f.debug_tuple("notifications-in-wait").finish()
1400
            }
1401
0
            SubstreamInner::NotificationsIn { .. } => f.debug_tuple("notifications-in").finish(),
1402
            SubstreamInner::NotificationsInRefused => {
1403
0
                f.debug_tuple("notifications-in-refused").finish()
1404
            }
1405
            SubstreamInner::NotificationsInClosed => {
1406
0
                f.debug_tuple("notifications-in-closed").finish()
1407
            }
1408
0
            SubstreamInner::RequestOut { .. } => f.debug_tuple("request-out").finish(),
1409
            SubstreamInner::RequestInRecv { .. } | SubstreamInner::RequestInRecvEmpty { .. } => {
1410
0
                f.debug_tuple("request-in").finish()
1411
            }
1412
0
            SubstreamInner::RequestInRespond { .. } => f.debug_tuple("request-in-respond").finish(),
1413
0
            SubstreamInner::RequestInApiWait => f.debug_tuple("request-in").finish(),
1414
0
            SubstreamInner::PingIn { .. } => f.debug_tuple("ping-in").finish(),
1415
0
            SubstreamInner::PingOutFailed { .. } => f.debug_tuple("ping-out-failed").finish(),
1416
0
            SubstreamInner::PingOut { .. } => f.debug_tuple("ping-out").finish(),
1417
        }
1418
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreams_0pEINtB5_9SubstreampENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreams_0pEINtB5_9SubstreampENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
1419
}
1420
1421
/// Event that happened on the connection. See [`Substream::read_write`].
1422
#[must_use]
1423
#[derive(Debug)]
1424
pub enum Event {
1425
    /// Error while receiving an inbound substream.
1426
    InboundError {
1427
        error: InboundError,
1428
        was_accepted: bool,
1429
    },
1430
1431
    /// An inbound substream has successfully negotiated a protocol. Call
1432
    /// [`Substream::accept_inbound`] or [`Substream::reject_inbound`] in order to resume.
1433
    InboundNegotiated(String),
1434
1435
    /// An inbound substream that had successfully negotiated a protocol got abruptly closed
1436
    /// while waiting for the call to [`Substream::accept_inbound`] or
1437
    /// [`Substream::reject_inbound`].
1438
    InboundNegotiatedCancel,
1439
1440
    /// Received a request in the context of a request-response protocol.
1441
    RequestIn {
1442
        /// Bytes of the request. Its interpretation is out of scope of this module.
1443
        request: Vec<u8>,
1444
    },
1445
1446
    /// Received a response to a previously emitted request on a request-response protocol.
1447
    Response {
1448
        /// Bytes of the response. Its interpretation is out of scope of this module.
1449
        response: Result<Vec<u8>, RequestError>,
1450
    },
1451
1452
    /// Remote has opened an inbound notifications substream.
1453
    ///
1454
    /// Either [`Substream::accept_in_notifications_substream`] or
1455
    /// [`Substream::reject_in_notifications_substream`] must be called in the near future in
1456
    /// order to accept or reject this substream.
1457
    NotificationsInOpen {
1458
        /// Handshake sent by the remote. Its interpretation is out of scope of this module.
1459
        handshake: Vec<u8>,
1460
    },
1461
    /// Remote has canceled an inbound notifications substream opening.
1462
    ///
1463
    /// This can only happen after [`Event::NotificationsInOpen`].
1464
    /// [`Substream::accept_in_notifications_substream`] or
1465
    /// [`Substream::reject_in_notifications_substream`] should not be called on this substream.
1466
    NotificationsInOpenCancel,
1467
    /// Remote has sent a notification on an inbound notifications substream. Can only happen
1468
    /// after the substream has been accepted.
1469
    // TODO: give a way to back-pressure notifications
1470
    NotificationIn {
1471
        /// Notification sent by the remote.
1472
        notification: Vec<u8>,
1473
    },
1474
    /// Remote has closed an inbound notifications substream opening. No more notifications will
1475
    /// be received.
1476
    ///
1477
    /// This can only happen after the substream has been accepted.
1478
    NotificationsInClose {
1479
        /// If `Ok`, the substream has been closed gracefully. If `Err`, a problem happened.
1480
        outcome: Result<(), NotificationsInClosedErr>,
1481
    },
1482
1483
    /// Remote has accepted or refused a substream opened with [`Substream::notifications_out`].
1484
    ///
1485
    /// If `Ok`, it is now possible to send notifications on this substream.
1486
    NotificationsOutResult {
1487
        /// If `Ok`, contains the handshake sent back by the remote. Its interpretation is out of
1488
        /// scope of this module.
1489
        result: Result<Vec<u8>, NotificationsOutErr>,
1490
    },
1491
    /// Remote has closed an outgoing notifications substream, meaning that it demands the closing
1492
    /// of the substream.
1493
    NotificationsOutCloseDemanded,
1494
    /// Remote has reset an outgoing notifications substream. The substream is instantly closed.
1495
    NotificationsOutReset,
1496
1497
    /// A ping has been successfully answered by the remote.
1498
    PingOutSuccess {
1499
        /// Time between sending the ping and receiving the pong.
1500
        ping_time: Duration,
1501
    },
1502
    /// Remote has failed to answer one or more pings.
1503
    PingOutError {
1504
        /// Number of pings that the remote has failed to answer.
1505
        num_pings: NonZeroUsize,
1506
    },
1507
}
1508
1509
/// Type of inbound protocol.
1510
pub enum InboundTy {
1511
    Ping,
1512
    Request {
1513
        /// Maximum allowed size of the request.
1514
        /// If `None`, then no data is expected on the substream, not even the length of the
1515
        /// request.
1516
        // TODO: use a proper enum
1517
        request_max_size: Option<usize>,
1518
    },
1519
    Notifications {
1520
        max_handshake_size: usize,
1521
    },
1522
}
1523
1524
/// Error that can happen while processing an inbound substream.
1525
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXs4_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12InboundErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs4_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12InboundErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
1526
pub enum InboundError {
1527
    /// Error during protocol negotiation.
1528
    #[display(fmt = "Protocol negotiation error: {_0}")]
1529
    NegotiationError(multistream_select::Error),
1530
    /// Error while receiving an inbound request.
1531
    #[display(fmt = "Error receiving inbound request: {_0}")]
1532
    RequestInLebError(read_write::IncomingBytesTakeLeb128Error),
1533
    /// Substream has been unexpectedly closed.
1534
    #[display(fmt = "Substream unexpectedly closed")]
1535
    SubstreamClosed,
1536
    /// Unexpected end of file while receiving an inbound request.
1537
    RequestInExpectedEof,
1538
    /// Error while receiving an inbound notifications substream handshake.
1539
    #[display(fmt = "Error while receiving an inbound notifications substream handshake: {error}")]
1540
    NotificationsInError {
1541
        /// Error that happened.
1542
        error: read_write::IncomingBytesTakeLeb128Error,
1543
    },
1544
    /// Unexpected end of file while receiving an inbound notifications substream handshake.
1545
    #[display(
1546
        fmt = "Unexpected end of file while receiving an inbound notifications substream handshake"
1547
    )]
1548
    NotificationsInUnexpectedEof,
1549
}
1550
1551
/// Error that can happen during a request in a request-response scheme.
1552
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXs7_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs7_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12RequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
1553
pub enum RequestError {
1554
    /// Remote hasn't answered in time.
1555
    Timeout,
1556
    /// Remote doesn't support this protocol.
1557
    ProtocolNotAvailable,
1558
    /// Remote has decided to close the substream. This most likely indicates that the remote
1559
    /// is unwilling the respond to the request.
1560
    SubstreamClosed,
1561
    /// Remote has decided to `RST` the substream. This most likely indicates that the remote has
1562
    /// detected a protocol error.
1563
    SubstreamReset,
1564
    /// Error during protocol negotiation.
1565
    #[display(fmt = "Protocol negotiation error: {_0}")]
1566
    NegotiationError(multistream_select::Error),
1567
    /// Invalid LEB128 number when receiving the response.
1568
    ResponseInvalidLeb128,
1569
    /// Number of bytes decoded is larger than expected when receiving the response.
1570
    ResponseTooLarge,
1571
}
1572
1573
impl RequestError {
1574
    /// Returns `true` if the error is caused by a faulty behavior by the remote. Returns `false`
1575
    /// if the error can happen in normal situations.
1576
0
    pub fn is_protocol_error(&self) -> bool {
1577
0
        match self {
1578
0
            RequestError::Timeout => false, // Remote is likely overloaded.
1579
0
            RequestError::ProtocolNotAvailable => true,
1580
0
            RequestError::SubstreamClosed => false,
1581
0
            RequestError::SubstreamReset => true,
1582
0
            RequestError::NegotiationError(_) => true,
1583
0
            RequestError::ResponseInvalidLeb128 => true,
1584
0
            RequestError::ResponseTooLarge => true,
1585
        }
1586
0
    }
Unexecuted instantiation: _RNvMs0_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_12RequestError17is_protocol_error
Unexecuted instantiation: _RNvMs0_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_12RequestError17is_protocol_error
1587
}
1588
1589
/// Error potentially returned by [`Substream::respond_in_request`].
1590
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXs9_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_21RespondInRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs9_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_21RespondInRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
1591
pub enum RespondInRequestError {
1592
    /// The substream has already been closed.
1593
    SubstreamClosed,
1594
}
1595
1596
/// Error that can happen when trying to open an outbound notifications substream.
1597
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXsc_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsc_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_19NotificationsOutErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
1598
pub enum NotificationsOutErr {
1599
    /// Remote took too long to perform the handshake.
1600
    Timeout,
1601
    /// Remote has refused the handshake by closing the substream.
1602
    RefusedHandshake,
1603
    /// Remote has indicated that it doesn't support the requested protocol.
1604
    ProtocolNotAvailable,
1605
    /// Error during the multistream-select handshake.
1606
    #[display(fmt = "Protocol negotiation error: {_0}")]
1607
    NegotiationError(multistream_select::Error),
1608
    /// Substream has been reset during the negotiation.
1609
    SubstreamReset,
1610
    /// Error while receiving the remote's handshake.
1611
    #[display(fmt = "Error while receiving remote handshake: {_0}")]
1612
    HandshakeRecvError(read_write::IncomingBytesTakeLeb128Error),
1613
}
1614
1615
/// Reason why an inbound notifications substream has been closed.
1616
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXsf_NtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established9substreamNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsf_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established9substreamNtB5_24NotificationsInClosedErrNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
1617
pub enum NotificationsInClosedErr {
1618
    /// Error in the protocol.
1619
    #[display(fmt = "Error while receiving notification: {_0}")]
1620
    ProtocolError(read_write::IncomingBytesTakeLeb128Error),
1621
    /// Substream has been closed.
1622
    SubstreamClosed,
1623
    /// Substream has been reset.
1624
    SubstreamReset,
1625
    /// Substream has been force-closed because the graceful timeout has been reached.
1626
    CloseDesiredTimeout,
1627
}