Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/multistream_select.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Multistream-select is a protocol whose purpose is to negotiate protocols.
19
//!
20
//! # Context
21
//!
22
//! The multistream-select protocol makes it possible for two parties to negotiate a protocol.
23
//!
24
//! When using TCP connections, it is used immediately after a connection opens in order to
25
//! negotiate which encryption protocol to use, then after the encryption protocol handshake to
26
//! negotiate which multiplexing protocol to use.
27
//!
28
//! It is also used every time a substream opens in order to negotiate which protocol to use for
29
//! this substream in particular.
30
//!
31
//! Once a protocol has been negotiated, the connection or substream immediately starts speaking
32
//! this protocol.
33
//!
34
//! The multistream-select protocol is asymmetric: one side needs to be the dialer and the other
35
//! side the listener. In the context of a TCP connection, the dialer and listener correspond to
36
//! the dialer and listener of the connection. In the context of a substream, the dialer is the
37
//! side that initiated the opening of the substream.
38
//!
39
//! # About protocol names
40
//!
41
//! Due to flaws in the wire protocol design, a protocol named `na` causes an ambiguity in
42
//! the exchange. Because protocol names are normally decided ahead of time, this situation is
43
//! expected to never arise, except in the presence of a malicious remote. The decision has been
44
//! taken that such protocol will always fail to negotiate, but will also not produce any error
45
//! or panic.
46
//!
47
//! Please don't intentionally name a protocol `na`.
48
//!
49
//! # Usage
50
//!
51
//! To be written.
52
//!
53
//! # See also
54
//!
55
//! - [Official repository](https://github.com/multiformats/multistream-select)
56
//!
57
58
// TODO: write usage
59
60
use super::super::read_write::ReadWrite;
61
use crate::{libp2p::read_write, util::leb128};
62
63
use alloc::{collections::VecDeque, string::String};
64
use core::{cmp, fmt, str};
65
66
/// Configuration of a multistream-select protocol.
67
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
68
pub enum Config<P> {
69
    /// Local node is the dialing side and requests the specific protocol.
70
    Dialer {
71
        /// Name of the protocol to try negotiate. The multistream-select negotiation will
72
        /// ultimately succeed if and only if the remote supports this protocol.
73
        requested_protocol: P,
74
    },
75
    /// Local node is the listening side.
76
    Listener {
77
        /// Maximum allowed length of a protocol. Set this to a value superior or equal to the
78
        /// length of the longest protocol that is supported locally.
79
        ///
80
        /// This limit is necessary in order to prevent the remote from sending an infinite stream
81
        /// of data for the protocol name.
82
        max_protocol_name_len: usize,
83
    },
84
}
85
86
/// Current state of a multistream-select negotiation.
87
#[derive(Debug)]
88
pub enum Negotiation<P> {
89
    /// Negotiation is still in progress. Use the provided [`InProgress`] object to inject and
90
    /// extract more data from/to the remote.
91
    InProgress(InProgress<P>),
92
    /// Negotiation is still in progress and is waiting for accepting or refusing the protocol
93
    /// requested by the remote.
94
    ///
95
    /// Can never happen if configured as the dialing side.
96
    ListenerAcceptOrDeny(ListenerAcceptOrDeny<P>),
97
    /// Negotiation has ended successfully. A protocol has been negotiated.
98
    Success,
99
    /// Negotiation has ended, but there isn't any protocol in common between the two parties.
100
    ///
101
    /// Can only ever happen as the dialing side.
102
    NotAvailable,
103
}
104
105
impl<P> Negotiation<P>
106
where
107
    P: AsRef<str>,
108
{
109
    /// Shortcut method for [`InProgress::new`] and wrapping the [`InProgress`] in a
110
    /// [`Negotiation`].
111
8
    pub fn new(config: Config<P>) -> Self {
112
8
        Negotiation::InProgress(InProgress::new(config))
113
8
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationReE3newB8_
Line
Count
Source
111
4
    pub fn new(config: Config<P>) -> Self {
112
4
        Negotiation::InProgress(InProgress::new(config))
113
4
    }
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationNtNtCsdZExvAaxgia_5alloc6string6StringE3newB8_
Line
Count
Source
111
4
    pub fn new(config: Config<P>) -> Self {
112
4
        Negotiation::InProgress(InProgress::new(config))
113
4
    }
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationpE3newB8_
114
}
115
116
/// Negotiation is still in progress and is waiting for accepting or refusing the protocol
117
/// requested by the remote.
118
#[derive(Debug)]
119
pub struct ListenerAcceptOrDeny<P> {
120
    inner: InProgress<P>,
121
    protocol: String,
122
}
123
124
impl<P> ListenerAcceptOrDeny<P> {
125
    /// Name of the protocol requested by the remote.
126
40
    pub fn requested_protocol(&self) -> &str {
127
40
        &self.protocol
128
40
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolBa_
Line
Count
Source
126
24
    pub fn requested_protocol(&self) -> &str {
127
24
        &self.protocol
128
24
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolBa_
Line
Count
Source
126
16
    pub fn requested_protocol(&self) -> &str {
127
16
        &self.protocol
128
16
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE18requested_protocolBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolCsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolCsiUjFBJteJ7x_17smoldot_full_node
129
130
    /// Accept the requested protocol and resume the handshake.
131
39
    pub fn accept(mut self) -> InProgress<P> {
132
39
        debug_assert!(
matches!0
(self.inner.state, InProgressState::CommandExpected));
133
39
        write_message(
134
39
            Message::ProtocolOk(self.protocol.into_bytes()),
135
39
            &mut self.inner.data_send_out,
136
39
        );
137
39
        self.inner.state = InProgressState::Finishing;
138
39
        self.inner
139
39
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptBa_
Line
Count
Source
131
23
    pub fn accept(mut self) -> InProgress<P> {
132
23
        debug_assert!(
matches!0
(self.inner.state, InProgressState::CommandExpected));
133
23
        write_message(
134
23
            Message::ProtocolOk(self.protocol.into_bytes()),
135
23
            &mut self.inner.data_send_out,
136
23
        );
137
23
        self.inner.state = InProgressState::Finishing;
138
23
        self.inner
139
23
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptBa_
Line
Count
Source
131
16
    pub fn accept(mut self) -> InProgress<P> {
132
16
        debug_assert!(
matches!0
(self.inner.state, InProgressState::CommandExpected));
133
16
        write_message(
134
16
            Message::ProtocolOk(self.protocol.into_bytes()),
135
16
            &mut self.inner.data_send_out,
136
16
        );
137
16
        self.inner.state = InProgressState::Finishing;
138
16
        self.inner
139
16
    }
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE6acceptBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptCsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptCsiUjFBJteJ7x_17smoldot_full_node
140
141
    /// Reject the requested protocol and resume the handshake.
142
1
    pub fn reject(mut self) -> InProgress<P> {
143
1
        debug_assert!(
matches!0
(self.inner.state, InProgressState::CommandExpected));
144
1
        write_message(
145
1
            Message::<&'static [u8]>::ProtocolNa,
146
1
            &mut self.inner.data_send_out,
147
1
        );
148
1
        self.inner
149
1
    }
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectBa_
Line
Count
Source
142
1
    pub fn reject(mut self) -> InProgress<P> {
143
1
        debug_assert!(
matches!0
(self.inner.state, InProgressState::CommandExpected));
144
1
        write_message(
145
1
            Message::<&'static [u8]>::ProtocolNa,
146
1
            &mut self.inner.data_send_out,
147
1
        );
148
1
        self.inner
149
1
    }
Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE6rejectBa_
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectCsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectCsiUjFBJteJ7x_17smoldot_full_node
150
}
151
152
/// Negotiation in progress.
153
pub struct InProgress<P> {
154
    /// Configuration of the negotiation.
155
    config: Config<P>,
156
    /// Data currently being sent out.
157
    data_send_out: VecDeque<u8>,
158
    /// Current state of the negotiation.
159
    state: InProgressState,
160
    /// Maximum allowed size of an incoming frame.
161
    max_in_frame_len: usize,
162
    /// Size of the next frame to receive, or `None` if not known yet. If `Some`, we have already
163
    /// extracted the length from the incoming buffer.
164
    next_in_frame_len: Option<usize>,
165
}
166
167
/// Current state of the negotiation.
168
#[derive(Debug, Copy, Clone)]
169
enum InProgressState {
170
    Finishing,
171
    HandshakeExpected,
172
    CommandExpected,
173
    ProtocolRequestAnswerExpected,
174
}
175
176
impl<P> InProgress<P>
177
where
178
    P: AsRef<str>,
179
{
180
    /// Initializes a new handshake state machine.
181
80
    pub fn new(config: Config<P>) -> Self {
182
        // Length, in bytes, of the longest protocol name.
183
80
        let max_proto_name_len = match &config {
184
40
            Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(),
185
            Config::Listener {
186
40
                max_protocol_name_len,
187
40
            } => *max_protocol_name_len,
188
        };
189
190
        // Any incoming frame larger than `max_frame_len` will trigger a protocol error.
191
        // This means that a protocol error might be reported in situations where the dialer
192
        // legitimately requests a protocol that the listener doesn't support. In order to prevent
193
        // confusion, a minimum length is applied to the protocol name length. Any protocol name
194
        // smaller than this will never trigger a protocol error, even if it isn't supported.
195
        const MIN_PROTO_LEN_NO_ERR: usize = 512;
196
80
        let max_frame_len = cmp::max(
197
80
            cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR),
198
80
            HANDSHAKE.len(),
199
80
        ) + 1;
200
80
201
80
        InProgress {
202
80
            // Note that the listener theoretically doesn't necessarily have to immediately send
203
80
            // a handshake, and could instead wait for a command from the dialer. In practice,
204
80
            // however, the specification doesn't mention anything about this, and some libraries
205
80
            // such as js-libp2p wait for the listener to send a handshake before emitting a
206
80
            // command.
207
80
            data_send_out: {
208
80
                let mut data = VecDeque::new();
209
80
                write_message(Message::<&'static [u8]>::Handshake, &mut data);
210
80
                if let Config::Dialer { 
requested_protocol40
} = &config {
211
40
                    write_message(
212
40
                        Message::ProtocolRequest(requested_protocol.as_ref()),
213
40
                        &mut data,
214
40
                    );
215
40
                }
216
80
                data
217
80
            },
218
80
            config,
219
80
            state: InProgressState::HandshakeExpected,
220
80
            max_in_frame_len: max_frame_len,
221
80
            next_in_frame_len: None,
222
80
        }
223
80
    }
_RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newBb_
Line
Count
Source
181
44
    pub fn new(config: Config<P>) -> Self {
182
        // Length, in bytes, of the longest protocol name.
183
44
        let max_proto_name_len = match &config {
184
20
            Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(),
185
            Config::Listener {
186
24
                max_protocol_name_len,
187
24
            } => *max_protocol_name_len,
188
        };
189
190
        // Any incoming frame larger than `max_frame_len` will trigger a protocol error.
191
        // This means that a protocol error might be reported in situations where the dialer
192
        // legitimately requests a protocol that the listener doesn't support. In order to prevent
193
        // confusion, a minimum length is applied to the protocol name length. Any protocol name
194
        // smaller than this will never trigger a protocol error, even if it isn't supported.
195
        const MIN_PROTO_LEN_NO_ERR: usize = 512;
196
44
        let max_frame_len = cmp::max(
197
44
            cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR),
198
44
            HANDSHAKE.len(),
199
44
        ) + 1;
200
44
201
44
        InProgress {
202
44
            // Note that the listener theoretically doesn't necessarily have to immediately send
203
44
            // a handshake, and could instead wait for a command from the dialer. In practice,
204
44
            // however, the specification doesn't mention anything about this, and some libraries
205
44
            // such as js-libp2p wait for the listener to send a handshake before emitting a
206
44
            // command.
207
44
            data_send_out: {
208
44
                let mut data = VecDeque::new();
209
44
                write_message(Message::<&'static [u8]>::Handshake, &mut data);
210
44
                if let Config::Dialer { 
requested_protocol20
} = &config {
211
20
                    write_message(
212
20
                        Message::ProtocolRequest(requested_protocol.as_ref()),
213
20
                        &mut data,
214
20
                    );
215
24
                }
216
44
                data
217
44
            },
218
44
            config,
219
44
            state: InProgressState::HandshakeExpected,
220
44
            max_in_frame_len: max_frame_len,
221
44
            next_in_frame_len: None,
222
44
        }
223
44
    }
_RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newBb_
Line
Count
Source
181
36
    pub fn new(config: Config<P>) -> Self {
182
        // Length, in bytes, of the longest protocol name.
183
36
        let max_proto_name_len = match &config {
184
20
            Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(),
185
            Config::Listener {
186
16
                max_protocol_name_len,
187
16
            } => *max_protocol_name_len,
188
        };
189
190
        // Any incoming frame larger than `max_frame_len` will trigger a protocol error.
191
        // This means that a protocol error might be reported in situations where the dialer
192
        // legitimately requests a protocol that the listener doesn't support. In order to prevent
193
        // confusion, a minimum length is applied to the protocol name length. Any protocol name
194
        // smaller than this will never trigger a protocol error, even if it isn't supported.
195
        const MIN_PROTO_LEN_NO_ERR: usize = 512;
196
36
        let max_frame_len = cmp::max(
197
36
            cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR),
198
36
            HANDSHAKE.len(),
199
36
        ) + 1;
200
36
201
36
        InProgress {
202
36
            // Note that the listener theoretically doesn't necessarily have to immediately send
203
36
            // a handshake, and could instead wait for a command from the dialer. In practice,
204
36
            // however, the specification doesn't mention anything about this, and some libraries
205
36
            // such as js-libp2p wait for the listener to send a handshake before emitting a
206
36
            // command.
207
36
            data_send_out: {
208
36
                let mut data = VecDeque::new();
209
36
                write_message(Message::<&'static [u8]>::Handshake, &mut data);
210
36
                if let Config::Dialer { 
requested_protocol20
} = &config {
211
20
                    write_message(
212
20
                        Message::ProtocolRequest(requested_protocol.as_ref()),
213
20
                        &mut data,
214
20
                    );
215
20
                }
16
216
36
                data
217
36
            },
218
36
            config,
219
36
            state: InProgressState::HandshakeExpected,
220
36
            max_in_frame_len: max_frame_len,
221
36
            next_in_frame_len: None,
222
36
        }
223
36
    }
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newBb_
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newCsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newCsiUjFBJteJ7x_17smoldot_full_node
224
225
    /// If this function returns true, then the multistream-select handshake has finished writing
226
    /// all its data, and the API user can now start writing the protocol-specific data if it
227
    /// desires, even though the multistream-handshake isn't finished.
228
    ///
229
    /// If the remote supports the requested protocol, then doing so will save one networking
230
    /// round-trip. If however the remote doesn't support the requested protocol, then doing so
231
    /// will lead to confusing errors on the remote, as it will interpret the protocol-specific
232
    /// data as being from the multistream-select protocol, and the substream will be rendered
233
    /// unusable. Overall, saving a round-trip is usually seen as preferable over confusing
234
    /// errors.
235
124
    pub fn can_write_protocol_data(&self) -> bool {
236
124
        
matches!78
(self.state, InProgressState::ProtocolRequestAnswerExpected)
237
124
    }
_RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataBb_
Line
Count
Source
235
124
    pub fn can_write_protocol_data(&self) -> bool {
236
124
        
matches!78
(self.state, InProgressState::ProtocolRequestAnswerExpected)
237
124
    }
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgresspE23can_write_protocol_dataBb_
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataCsiUjFBJteJ7x_17smoldot_full_node
238
239
    /// Feeds data coming from a socket, updates the internal state machine, and writes data
240
    /// destined to the socket.
241
    ///
242
    /// On success, returns the new state of the negotiation.
243
    ///
244
    /// An error is returned if the reading or writing are closed, or if the protocol is being
245
    /// violated by the remote. When that happens, the connection should be closed altogether.
246
366
    pub fn read_write<TNow>(
247
366
        mut self,
248
366
        read_write: &mut ReadWrite<TNow>,
249
366
    ) -> Result<Negotiation<P>, Error> {
250
        loop {
251
            // First, try to send out data currently being queued for sending.
252
599
            read_write.write_from_vec_deque(&mut self.data_send_out);
253
599
254
599
            // The `Finishing` state is special because it doesn't expect any incoming message
255
599
            // anymore, and just finishes the negotiation after all the data has been written out.
256
599
            // As such, we do not proceed further unless we have finished sending out everything.
257
599
            if let InProgressState::Finishing = self.state {
258
39
                debug_assert!(
matches!0
(self.config, Config::Listener { .. }));
259
39
                if self.data_send_out.is_empty() {
260
38
                    return Ok(Negotiation::Success);
261
                } else {
262
1
                    break;
263
                }
264
560
            }
265
266
            // Try to extract a message from the incoming buffer.
267
560
            let 
mut frame154
= if let Some(
next_frame_len234
) = self.next_in_frame_len {
268
234
                match read_write.incoming_bytes_take(next_frame_len) {
269
79
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
270
154
                    Ok(Some(frame)) => {
271
154
                        self.next_in_frame_len = None;
272
154
                        frame
273
                    }
274
1
                    Err(err) => return Err(Error::Frame(err)),
275
                }
276
            } else {
277
326
                match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) {
278
171
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
279
155
                    Ok(Some(size)) => {
280
155
                        self.next_in_frame_len = Some(size);
281
155
                        continue;
282
                    }
283
0
                    Err(err) => return Err(Error::FrameLength(err)),
284
                }
285
            };
286
287
154
            match (self.state, &self.config) {
288
                (InProgressState::HandshakeExpected, Config::Dialer { .. }) => {
289
38
                    if &*frame != HANDSHAKE {
290
0
                        return Err(Error::BadHandshake);
291
38
                    }
292
38
293
38
                    // The dialer immediately sends the request after its handshake and before
294
38
                    // waiting for the handshake from the listener. As such, after receiving the
295
38
                    // handshake, the next step is to wait for the request answer.
296
38
                    self.state = InProgressState::ProtocolRequestAnswerExpected;
297
                }
298
299
                (InProgressState::HandshakeExpected, Config::Listener { .. }) => {
300
40
                    if &*frame != HANDSHAKE {
301
0
                        return Err(Error::BadHandshake);
302
40
                    }
303
40
304
40
                    // The listener immediately sends the handshake at initialization. When this
305
40
                    // code is reached, it has therefore already been sent.
306
40
                    self.state = InProgressState::CommandExpected;
307
                }
308
309
                (InProgressState::CommandExpected, Config::Listener { .. }) => {
310
40
                    if frame.pop() != Some(b'\n') {
311
0
                        return Err(Error::InvalidCommand);
312
40
                    }
313
314
40
                    let protocol = String::from_utf8(frame).map_err(|_| 
Error::InvalidCommand0
)
?0
;
Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0Be_
Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writelE0Be_
Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0Be_
Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writelE0Be_
Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgresspE10read_writepE0Be_
Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantE0CsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantE0CsiUjFBJteJ7x_17smoldot_full_node
315
316
40
                    return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny {
317
40
                        inner: self,
318
40
                        protocol,
319
40
                    }));
320
                }
321
322
                (
323
                    InProgressState::ProtocolRequestAnswerExpected,
324
36
                    Config::Dialer { requested_protocol },
325
36
                ) => {
326
36
                    if frame.pop() != Some(b'\n') {
327
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
328
36
                    }
329
36
                    if &*frame == b"na" {
330
                        // Because of the order of checks, a protocol named `na` will never be
331
                        // successfully negotiated. Debugging is expected to be less confusing if
332
                        // the negotiation always fails.
333
1
                        return Ok(Negotiation::NotAvailable);
334
35
                    }
335
35
                    if frame != requested_protocol.as_ref().as_bytes() {
336
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
337
35
                    }
338
35
                    return Ok(Negotiation::Success);
339
                }
340
341
                // Invalid states.
342
                (InProgressState::CommandExpected, Config::Dialer { .. })
343
                | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. })
344
                | (InProgressState::Finishing, _) => {
345
0
                    unreachable!();
346
                }
347
            };
348
        }
349
350
        // This point should be reached only if data is lacking in order to proceed.
351
1
        Ok(Negotiation::InProgress(self))
352
366
    }
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEBc_
Line
Count
Source
246
260
    pub fn read_write<TNow>(
247
260
        mut self,
248
260
        read_write: &mut ReadWrite<TNow>,
249
260
    ) -> Result<Negotiation<P>, Error> {
250
        loop {
251
            // First, try to send out data currently being queued for sending.
252
373
            read_write.write_from_vec_deque(&mut self.data_send_out);
253
373
254
373
            // The `Finishing` state is special because it doesn't expect any incoming message
255
373
            // anymore, and just finishes the negotiation after all the data has been written out.
256
373
            // As such, we do not proceed further unless we have finished sending out everything.
257
373
            if let InProgressState::Finishing = self.state {
258
18
                debug_assert!(
matches!0
(self.config, Config::Listener { .. }));
259
18
                if self.data_send_out.is_empty() {
260
18
                    return Ok(Negotiation::Success);
261
                } else {
262
0
                    break;
263
                }
264
355
            }
265
266
            // Try to extract a message from the incoming buffer.
267
355
            let 
mut frame74
= if let Some(
next_frame_len149
) = self.next_in_frame_len {
268
149
                match read_write.incoming_bytes_take(next_frame_len) {
269
74
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
270
74
                    Ok(Some(frame)) => {
271
74
                        self.next_in_frame_len = None;
272
74
                        frame
273
                    }
274
1
                    Err(err) => return Err(Error::Frame(err)),
275
                }
276
            } else {
277
206
                match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) {
278
131
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
279
75
                    Ok(Some(size)) => {
280
75
                        self.next_in_frame_len = Some(size);
281
75
                        continue;
282
                    }
283
0
                    Err(err) => return Err(Error::FrameLength(err)),
284
                }
285
            };
286
287
74
            match (self.state, &self.config) {
288
                (InProgressState::HandshakeExpected, Config::Dialer { .. }) => {
289
18
                    if &*frame != HANDSHAKE {
290
0
                        return Err(Error::BadHandshake);
291
18
                    }
292
18
293
18
                    // The dialer immediately sends the request after its handshake and before
294
18
                    // waiting for the handshake from the listener. As such, after receiving the
295
18
                    // handshake, the next step is to wait for the request answer.
296
18
                    self.state = InProgressState::ProtocolRequestAnswerExpected;
297
                }
298
299
                (InProgressState::HandshakeExpected, Config::Listener { .. }) => {
300
20
                    if &*frame != HANDSHAKE {
301
0
                        return Err(Error::BadHandshake);
302
20
                    }
303
20
304
20
                    // The listener immediately sends the handshake at initialization. When this
305
20
                    // code is reached, it has therefore already been sent.
306
20
                    self.state = InProgressState::CommandExpected;
307
                }
308
309
                (InProgressState::CommandExpected, Config::Listener { .. }) => {
310
20
                    if frame.pop() != Some(b'\n') {
311
0
                        return Err(Error::InvalidCommand);
312
20
                    }
313
314
20
                    let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)
?0
;
315
316
20
                    return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny {
317
20
                        inner: self,
318
20
                        protocol,
319
20
                    }));
320
                }
321
322
                (
323
                    InProgressState::ProtocolRequestAnswerExpected,
324
16
                    Config::Dialer { requested_protocol },
325
16
                ) => {
326
16
                    if frame.pop() != Some(b'\n') {
327
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
328
16
                    }
329
16
                    if &*frame == b"na" {
330
                        // Because of the order of checks, a protocol named `na` will never be
331
                        // successfully negotiated. Debugging is expected to be less confusing if
332
                        // the negotiation always fails.
333
1
                        return Ok(Negotiation::NotAvailable);
334
15
                    }
335
15
                    if frame != requested_protocol.as_ref().as_bytes() {
336
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
337
15
                    }
338
15
                    return Ok(Negotiation::Success);
339
                }
340
341
                // Invalid states.
342
                (InProgressState::CommandExpected, Config::Dialer { .. })
343
                | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. })
344
                | (InProgressState::Finishing, _) => {
345
0
                    unreachable!();
346
                }
347
            };
348
        }
349
350
        // This point should be reached only if data is lacking in order to proceed.
351
0
        Ok(Negotiation::InProgress(self))
352
260
    }
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writelEBc_
Line
Count
Source
246
13
    pub fn read_write<TNow>(
247
13
        mut self,
248
13
        read_write: &mut ReadWrite<TNow>,
249
13
    ) -> Result<Negotiation<P>, Error> {
250
        loop {
251
            // First, try to send out data currently being queued for sending.
252
25
            read_write.write_from_vec_deque(&mut self.data_send_out);
253
25
254
25
            // The `Finishing` state is special because it doesn't expect any incoming message
255
25
            // anymore, and just finishes the negotiation after all the data has been written out.
256
25
            // As such, we do not proceed further unless we have finished sending out everything.
257
25
            if let InProgressState::Finishing = self.state {
258
5
                debug_assert!(
matches!0
(self.config, Config::Listener { .. }));
259
5
                if self.data_send_out.is_empty() {
260
4
                    return Ok(Negotiation::Success);
261
                } else {
262
1
                    break;
263
                }
264
20
            }
265
266
            // Try to extract a message from the incoming buffer.
267
20
            let 
mut frame8
= if let Some(
next_frame_len10
) = self.next_in_frame_len {
268
10
                match read_write.incoming_bytes_take(next_frame_len) {
269
2
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
270
8
                    Ok(Some(frame)) => {
271
8
                        self.next_in_frame_len = None;
272
8
                        frame
273
                    }
274
0
                    Err(err) => return Err(Error::Frame(err)),
275
                }
276
            } else {
277
10
                match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) {
278
2
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
279
8
                    Ok(Some(size)) => {
280
8
                        self.next_in_frame_len = Some(size);
281
8
                        continue;
282
                    }
283
0
                    Err(err) => return Err(Error::FrameLength(err)),
284
                }
285
            };
286
287
8
            match (self.state, &self.config) {
288
                (InProgressState::HandshakeExpected, Config::Dialer { .. }) => {
289
0
                    if &*frame != HANDSHAKE {
290
0
                        return Err(Error::BadHandshake);
291
0
                    }
292
0
293
0
                    // The dialer immediately sends the request after its handshake and before
294
0
                    // waiting for the handshake from the listener. As such, after receiving the
295
0
                    // handshake, the next step is to wait for the request answer.
296
0
                    self.state = InProgressState::ProtocolRequestAnswerExpected;
297
                }
298
299
                (InProgressState::HandshakeExpected, Config::Listener { .. }) => {
300
4
                    if &*frame != HANDSHAKE {
301
0
                        return Err(Error::BadHandshake);
302
4
                    }
303
4
304
4
                    // The listener immediately sends the handshake at initialization. When this
305
4
                    // code is reached, it has therefore already been sent.
306
4
                    self.state = InProgressState::CommandExpected;
307
                }
308
309
                (InProgressState::CommandExpected, Config::Listener { .. }) => {
310
4
                    if frame.pop() != Some(b'\n') {
311
0
                        return Err(Error::InvalidCommand);
312
4
                    }
313
314
4
                    let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)
?0
;
315
316
4
                    return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny {
317
4
                        inner: self,
318
4
                        protocol,
319
4
                    }));
320
                }
321
322
                (
323
                    InProgressState::ProtocolRequestAnswerExpected,
324
0
                    Config::Dialer { requested_protocol },
325
0
                ) => {
326
0
                    if frame.pop() != Some(b'\n') {
327
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
328
0
                    }
329
0
                    if &*frame == b"na" {
330
                        // Because of the order of checks, a protocol named `na` will never be
331
                        // successfully negotiated. Debugging is expected to be less confusing if
332
                        // the negotiation always fails.
333
0
                        return Ok(Negotiation::NotAvailable);
334
0
                    }
335
0
                    if frame != requested_protocol.as_ref().as_bytes() {
336
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
337
0
                    }
338
0
                    return Ok(Negotiation::Success);
339
                }
340
341
                // Invalid states.
342
                (InProgressState::CommandExpected, Config::Dialer { .. })
343
                | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. })
344
                | (InProgressState::Finishing, _) => {
345
0
                    unreachable!();
346
                }
347
            };
348
        }
349
350
        // This point should be reached only if data is lacking in order to proceed.
351
1
        Ok(Negotiation::InProgress(self))
352
13
    }
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEBc_
Line
Count
Source
246
63
    pub fn read_write<TNow>(
247
63
        mut self,
248
63
        read_write: &mut ReadWrite<TNow>,
249
63
    ) -> Result<Negotiation<P>, Error> {
250
        loop {
251
            // First, try to send out data currently being queued for sending.
252
147
            read_write.write_from_vec_deque(&mut self.data_send_out);
253
147
254
147
            // The `Finishing` state is special because it doesn't expect any incoming message
255
147
            // anymore, and just finishes the negotiation after all the data has been written out.
256
147
            // As such, we do not proceed further unless we have finished sending out everything.
257
147
            if let InProgressState::Finishing = self.state {
258
14
                debug_assert!(
matches!0
(self.config, Config::Listener { .. }));
259
14
                if self.data_send_out.is_empty() {
260
14
                    return Ok(Negotiation::Success);
261
                } else {
262
0
                    break;
263
                }
264
133
            }
265
266
            // Try to extract a message from the incoming buffer.
267
133
            let 
mut frame56
= if let Some(
next_frame_len56
) = self.next_in_frame_len {
268
56
                match read_write.incoming_bytes_take(next_frame_len) {
269
0
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
270
56
                    Ok(Some(frame)) => {
271
56
                        self.next_in_frame_len = None;
272
56
                        frame
273
                    }
274
0
                    Err(err) => return Err(Error::Frame(err)),
275
                }
276
            } else {
277
77
                match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) {
278
21
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
279
56
                    Ok(Some(size)) => {
280
56
                        self.next_in_frame_len = Some(size);
281
56
                        continue;
282
                    }
283
0
                    Err(err) => return Err(Error::FrameLength(err)),
284
                }
285
            };
286
287
56
            match (self.state, &self.config) {
288
                (InProgressState::HandshakeExpected, Config::Dialer { .. }) => {
289
14
                    if &*frame != HANDSHAKE {
290
0
                        return Err(Error::BadHandshake);
291
14
                    }
292
14
293
14
                    // The dialer immediately sends the request after its handshake and before
294
14
                    // waiting for the handshake from the listener. As such, after receiving the
295
14
                    // handshake, the next step is to wait for the request answer.
296
14
                    self.state = InProgressState::ProtocolRequestAnswerExpected;
297
                }
298
299
                (InProgressState::HandshakeExpected, Config::Listener { .. }) => {
300
14
                    if &*frame != HANDSHAKE {
301
0
                        return Err(Error::BadHandshake);
302
14
                    }
303
14
304
14
                    // The listener immediately sends the handshake at initialization. When this
305
14
                    // code is reached, it has therefore already been sent.
306
14
                    self.state = InProgressState::CommandExpected;
307
                }
308
309
                (InProgressState::CommandExpected, Config::Listener { .. }) => {
310
14
                    if frame.pop() != Some(b'\n') {
311
0
                        return Err(Error::InvalidCommand);
312
14
                    }
313
314
14
                    let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)
?0
;
315
316
14
                    return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny {
317
14
                        inner: self,
318
14
                        protocol,
319
14
                    }));
320
                }
321
322
                (
323
                    InProgressState::ProtocolRequestAnswerExpected,
324
14
                    Config::Dialer { requested_protocol },
325
14
                ) => {
326
14
                    if frame.pop() != Some(b'\n') {
327
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
328
14
                    }
329
14
                    if &*frame == b"na" {
330
                        // Because of the order of checks, a protocol named `na` will never be
331
                        // successfully negotiated. Debugging is expected to be less confusing if
332
                        // the negotiation always fails.
333
0
                        return Ok(Negotiation::NotAvailable);
334
14
                    }
335
14
                    if frame != requested_protocol.as_ref().as_bytes() {
336
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
337
14
                    }
338
14
                    return Ok(Negotiation::Success);
339
                }
340
341
                // Invalid states.
342
                (InProgressState::CommandExpected, Config::Dialer { .. })
343
                | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. })
344
                | (InProgressState::Finishing, _) => {
345
0
                    unreachable!();
346
                }
347
            };
348
        }
349
350
        // This point should be reached only if data is lacking in order to proceed.
351
0
        Ok(Negotiation::InProgress(self))
352
63
    }
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writelEBc_
Line
Count
Source
246
30
    pub fn read_write<TNow>(
247
30
        mut self,
248
30
        read_write: &mut ReadWrite<TNow>,
249
30
    ) -> Result<Negotiation<P>, Error> {
250
        loop {
251
            // First, try to send out data currently being queued for sending.
252
54
            read_write.write_from_vec_deque(&mut self.data_send_out);
253
54
254
54
            // The `Finishing` state is special because it doesn't expect any incoming message
255
54
            // anymore, and just finishes the negotiation after all the data has been written out.
256
54
            // As such, we do not proceed further unless we have finished sending out everything.
257
54
            if let InProgressState::Finishing = self.state {
258
2
                debug_assert!(
matches!0
(self.config, Config::Listener { .. }));
259
2
                if self.data_send_out.is_empty() {
260
2
                    return Ok(Negotiation::Success);
261
                } else {
262
0
                    break;
263
                }
264
52
            }
265
266
            // Try to extract a message from the incoming buffer.
267
52
            let 
mut frame16
= if let Some(
next_frame_len19
) = self.next_in_frame_len {
268
19
                match read_write.incoming_bytes_take(next_frame_len) {
269
3
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
270
16
                    Ok(Some(frame)) => {
271
16
                        self.next_in_frame_len = None;
272
16
                        frame
273
                    }
274
0
                    Err(err) => return Err(Error::Frame(err)),
275
                }
276
            } else {
277
33
                match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) {
278
17
                    Ok(None) => return Ok(Negotiation::InProgress(self)),
279
16
                    Ok(Some(size)) => {
280
16
                        self.next_in_frame_len = Some(size);
281
16
                        continue;
282
                    }
283
0
                    Err(err) => return Err(Error::FrameLength(err)),
284
                }
285
            };
286
287
16
            match (self.state, &self.config) {
288
                (InProgressState::HandshakeExpected, Config::Dialer { .. }) => {
289
6
                    if &*frame != HANDSHAKE {
290
0
                        return Err(Error::BadHandshake);
291
6
                    }
292
6
293
6
                    // The dialer immediately sends the request after its handshake and before
294
6
                    // waiting for the handshake from the listener. As such, after receiving the
295
6
                    // handshake, the next step is to wait for the request answer.
296
6
                    self.state = InProgressState::ProtocolRequestAnswerExpected;
297
                }
298
299
                (InProgressState::HandshakeExpected, Config::Listener { .. }) => {
300
2
                    if &*frame != HANDSHAKE {
301
0
                        return Err(Error::BadHandshake);
302
2
                    }
303
2
304
2
                    // The listener immediately sends the handshake at initialization. When this
305
2
                    // code is reached, it has therefore already been sent.
306
2
                    self.state = InProgressState::CommandExpected;
307
                }
308
309
                (InProgressState::CommandExpected, Config::Listener { .. }) => {
310
2
                    if frame.pop() != Some(b'\n') {
311
0
                        return Err(Error::InvalidCommand);
312
2
                    }
313
314
2
                    let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)
?0
;
315
316
2
                    return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny {
317
2
                        inner: self,
318
2
                        protocol,
319
2
                    }));
320
                }
321
322
                (
323
                    InProgressState::ProtocolRequestAnswerExpected,
324
6
                    Config::Dialer { requested_protocol },
325
6
                ) => {
326
6
                    if frame.pop() != Some(b'\n') {
327
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
328
6
                    }
329
6
                    if &*frame == b"na" {
330
                        // Because of the order of checks, a protocol named `na` will never be
331
                        // successfully negotiated. Debugging is expected to be less confusing if
332
                        // the negotiation always fails.
333
0
                        return Ok(Negotiation::NotAvailable);
334
6
                    }
335
6
                    if frame != requested_protocol.as_ref().as_bytes() {
336
0
                        return Err(Error::UnexpectedProtocolRequestAnswer);
337
6
                    }
338
6
                    return Ok(Negotiation::Success);
339
                }
340
341
                // Invalid states.
342
                (InProgressState::CommandExpected, Config::Dialer { .. })
343
                | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. })
344
                | (InProgressState::Finishing, _) => {
345
0
                    unreachable!();
346
                }
347
            };
348
        }
349
350
        // This point should be reached only if data is lacking in order to proceed.
351
0
        Ok(Negotiation::InProgress(self))
352
30
    }
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgresspE10read_writepEBc_
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantECsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantECsiUjFBJteJ7x_17smoldot_full_node
353
}
354
355
impl<P> fmt::Debug for InProgress<P> {
356
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
357
0
        f.debug_tuple("InProgress").finish()
358
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selects1_0pEINtB5_10InProgresspENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selects1_0pEINtB5_10InProgresspENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_
359
}
360
361
/// Error that can happen during the negotiation.
362
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXsf_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsf_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
363
pub enum Error {
364
    /// Reading side of the connection is closed. The handshake can't proceed further.
365
    ReadClosed,
366
    /// Writing side of the connection is closed. The handshake can't proceed further.
367
    WriteClosed,
368
    /// Error while decoding a frame length, or frame size limit reached.
369
    #[display(fmt = "LEB128 frame error: {_0}")]
370
    FrameLength(read_write::IncomingBytesTakeLeb128Error),
371
    /// Error while decoding a frame.
372
    #[display(fmt = "LEB128 frame error: {_0}")]
373
    Frame(read_write::IncomingBytesTakeError),
374
    /// Unknown handshake or unknown multistream-select protocol version.
375
    BadHandshake,
376
    /// Received empty command.
377
    InvalidCommand,
378
    /// Received answer to protocol request that doesn't match the requested protocol.
379
    UnexpectedProtocolRequestAnswer,
380
}
381
382
/// Handshake message sent by both parties at the beginning of each multistream-select negotiation.
383
const HANDSHAKE: &[u8] = b"/multistream/1.0.0\n";
384
385
/// Message on the multistream-select protocol.
386
#[derive(Debug, Copy, Clone)]
387
enum Message<P> {
388
    Handshake,
389
    ProtocolRequest(P),
390
    ProtocolOk(P),
391
    ProtocolNa,
392
}
393
394
163
fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) {
395
163
    match message {
396
81
        Message::Handshake => {
397
81
            out.reserve(HANDSHAKE.len() + 4);
398
81
            out.extend(leb128::encode_usize(HANDSHAKE.len()));
399
81
            out.extend(HANDSHAKE);
400
81
        }
401
80
        Message::ProtocolRequest(
p41
) | Message::ProtocolOk(
p39
) => {
402
80
            let p = p.as_ref();
403
80
            out.reserve(p.len() + 5);
404
80
            out.extend(leb128::encode_usize(p.len() + 1));
405
80
            out.extend(p);
406
80
            out.push_back(b'\n');
407
80
        }
408
2
        Message::ProtocolNa => {
409
2
            out.reserve(8);
410
2
            out.extend(leb128::encode_usize(3));
411
2
            out.extend(b"na\n");
412
2
        }
413
    }
414
163
}
_RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEEB8_
Line
Count
Source
394
39
fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) {
395
39
    match message {
396
0
        Message::Handshake => {
397
0
            out.reserve(HANDSHAKE.len() + 4);
398
0
            out.extend(leb128::encode_usize(HANDSHAKE.len()));
399
0
            out.extend(HANDSHAKE);
400
0
        }
401
39
        Message::ProtocolRequest(
p0
) | Message::ProtocolOk(p) => {
402
39
            let p = p.as_ref();
403
39
            out.reserve(p.len() + 5);
404
39
            out.extend(leb128::encode_usize(p.len() + 1));
405
39
            out.extend(p);
406
39
            out.push_back(b'\n');
407
39
        }
408
0
        Message::ProtocolNa => {
409
0
            out.reserve(8);
410
0
            out.extend(leb128::encode_usize(3));
411
0
            out.extend(b"na\n");
412
0
        }
413
    }
414
39
}
_RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageRShEB8_
Line
Count
Source
394
83
fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) {
395
83
    match message {
396
81
        Message::Handshake => {
397
81
            out.reserve(HANDSHAKE.len() + 4);
398
81
            out.extend(leb128::encode_usize(HANDSHAKE.len()));
399
81
            out.extend(HANDSHAKE);
400
81
        }
401
0
        Message::ProtocolRequest(p) | Message::ProtocolOk(p) => {
402
0
            let p = p.as_ref();
403
0
            out.reserve(p.len() + 5);
404
0
            out.extend(leb128::encode_usize(p.len() + 1));
405
0
            out.extend(p);
406
0
            out.push_back(b'\n');
407
0
        }
408
2
        Message::ProtocolNa => {
409
2
            out.reserve(8);
410
2
            out.extend(leb128::encode_usize(3));
411
2
            out.extend(b"na\n");
412
2
        }
413
    }
414
83
}
_RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageReEB8_
Line
Count
Source
394
41
fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) {
395
41
    match message {
396
0
        Message::Handshake => {
397
0
            out.reserve(HANDSHAKE.len() + 4);
398
0
            out.extend(leb128::encode_usize(HANDSHAKE.len()));
399
0
            out.extend(HANDSHAKE);
400
0
        }
401
41
        Message::ProtocolRequest(p) | Message::ProtocolOk(
p0
) => {
402
41
            let p = p.as_ref();
403
41
            out.reserve(p.len() + 5);
404
41
            out.extend(leb128::encode_usize(p.len() + 1));
405
41
            out.extend(p);
406
41
            out.push_back(b'\n');
407
41
        }
408
0
        Message::ProtocolNa => {
409
0
            out.reserve(8);
410
0
            out.extend(leb128::encode_usize(3));
411
0
            out.extend(b"na\n");
412
0
        }
413
    }
414
41
}
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShEB8_
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReEB8_
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShECsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReECsiUjFBJteJ7x_17smoldot_full_node
415
416
#[cfg(test)]
417
mod tests {
418
    use alloc::collections::VecDeque;
419
    use core::{cmp, mem};
420
421
    use super::{super::super::read_write::ReadWrite, write_message, Config, Message, Negotiation};
422
423
    #[test]
424
1
    fn encode() {
425
1
        let mut message = VecDeque::new();
426
1
427
1
        write_message(Message::<&'static [u8]>::Handshake, &mut message);
428
1
        assert_eq!(
429
1
            message.drain(..).collect::<Vec<_>>(),
430
1
            b"\x13/multistream/1.0.0\n".to_vec()
431
1
        );
432
433
1
        write_message(Message::ProtocolRequest("/hello"), &mut message);
434
1
        assert_eq!(
435
1
            message.drain(..).collect::<Vec<_>>(),
436
1
            b"\x07/hello\n".to_vec()
437
1
        );
438
439
1
        write_message(Message::<&'static [u8]>::ProtocolNa, &mut message);
440
1
        assert_eq!(message.drain(..).collect::<Vec<_>>(), b"\x03na\n".to_vec());
441
1
    }
442
443
    #[test]
444
1
    fn negotiation_basic_works() {
445
4
        fn test_with_buffer_sizes(mut size1: usize, mut size2: usize) {
446
4
            let mut negotiation1 = Negotiation::new(Config::Dialer {
447
4
                requested_protocol: "/foo",
448
4
            });
449
4
            let mut negotiation2 = Negotiation::new(Config::<String>::Listener {
450
4
                max_protocol_name_len: 4,
451
4
            });
452
4
453
4
            let mut buf_1_to_2 = Vec::new();
454
4
            let mut buf_2_to_1 = Vec::new();
455
4
456
4
            let mut num_iterations = 0;
457
1
458
21
            while !matches!(
459
25
                (&negotiation1, &negotiation2),
460
1
                (Negotiation::Success, Negotiation::Success)
461
1
            ) {
462
21
                num_iterations += 1;
463
21
                assert!(num_iterations <= 5000);
464
1
465
21
                match negotiation1 {
466
21
                    Negotiation::InProgress(nego) => {
467
21
                        let mut read_write = ReadWrite {
468
21
                            now: 0,
469
21
                            incoming_buffer: buf_2_to_1,
470
21
                            expected_incoming_bytes: Some(0),
471
21
                            read_bytes: 0,
472
21
                            write_bytes_queued: buf_1_to_2.len(),
473
21
                            write_bytes_queueable: Some(size1 - buf_1_to_2.len()),
474
21
                            write_buffers: vec![mem::take(&mut buf_1_to_2)],
475
21
                            wake_up_after: None,
476
21
                        };
477
21
                        negotiation1 = nego.read_write(&mut read_write).unwrap();
478
21
                        buf_2_to_1 = read_write.incoming_buffer;
479
21
                        buf_1_to_2.extend(
480
21
                            read_write
481
21
                                .write_buffers
482
21
                                .drain(..)
483
37
                                .flat_map(|b| b.into_iter()),
484
21
                        );
485
21
                        size2 = cmp::max(size2, read_write.expected_incoming_bytes.unwrap_or(0));
486
21
                    }
487
1
                    Negotiation::Success => 
{}0
488
1
                    Negotiation::ListenerAcceptOrDeny(_) => 
unreachable!()0
,
489
1
                    Negotiation::NotAvailable => 
panic!()0
,
490
1
                }
491
1
492
4
                match negotiation2 {
493
13
                    Negotiation::InProgress(nego) => {
494
13
                        let mut read_write = ReadWrite {
495
13
                            now: 0,
496
13
                            incoming_buffer: buf_1_to_2,
497
13
                            expected_incoming_bytes: Some(0),
498
13
                            read_bytes: 0,
499
13
                            write_bytes_queued: buf_2_to_1.len(),
500
13
                            write_bytes_queueable: Some(size2 - buf_2_to_1.len()),
501
13
                            write_buffers: vec![mem::take(&mut buf_2_to_1)],
502
13
                            wake_up_after: None,
503
13
                        };
504
13
                        negotiation2 = nego.read_write(&mut read_write).unwrap();
505
13
                        buf_1_to_2 = read_write.incoming_buffer;
506
13
                        buf_2_to_1.extend(
507
13
                            read_write
508
13
                                .write_buffers
509
13
                                .drain(..)
510
33
                                .flat_map(|b| b.into_iter()),
511
13
                        );
512
13
                        size1 = cmp::max(size1, read_write.expected_incoming_bytes.unwrap_or(0));
513
13
                    }
514
4
                    Negotiation::ListenerAcceptOrDeny(accept_reject)
515
4
                        if accept_reject.requested_protocol() == "/foo" =>
516
4
                    {
517
4
                        negotiation2 = Negotiation::InProgress(accept_reject.accept());
518
4
                    }
519
1
                    Negotiation::ListenerAcceptOrDeny(accept_reject) => {
520
0
                        negotiation2 = Negotiation::InProgress(accept_reject.reject());
521
0
                    }
522
4
                    Negotiation::Success => {}
523
1
                    Negotiation::NotAvailable => 
panic!()0
,
524
1
                }
525
1
            }
526
4
        }
527
1
528
1
        test_with_buffer_sizes(256, 256);
529
1
        test_with_buffer_sizes(1, 1);
530
1
        test_with_buffer_sizes(1, 2048);
531
1
        test_with_buffer_sizes(2048, 1);
532
1
    }
533
}