/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/multistream_select.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! Multistream-select is a protocol whose purpose is to negotiate protocols. |
19 | | //! |
20 | | //! # Context |
21 | | //! |
22 | | //! The multistream-select protocol makes it possible for two parties to negotiate a protocol. |
23 | | //! |
24 | | //! When using TCP connections, it is used immediately after a connection opens in order to |
25 | | //! negotiate which encryption protocol to use, then after the encryption protocol handshake to |
26 | | //! negotiate which multiplexing protocol to use. |
27 | | //! |
28 | | //! It is also used every time a substream opens in order to negotiate which protocol to use for |
29 | | //! this substream in particular. |
30 | | //! |
31 | | //! Once a protocol has been negotiated, the connection or substream immediately starts speaking |
32 | | //! this protocol. |
33 | | //! |
34 | | //! The multistream-select protocol is asymmetric: one side needs to be the dialer and the other |
35 | | //! side the listener. In the context of a TCP connection, the dialer and listener correspond to |
36 | | //! the dialer and listener of the connection. In the context of a substream, the dialer is the |
37 | | //! side that initiated the opening of the substream. |
38 | | //! |
39 | | //! # About protocol names |
40 | | //! |
41 | | //! Due to flaws in the wire protocol design, a protocol named `na` causes an ambiguity in |
42 | | //! the exchange. Because protocol names are normally decided ahead of time, this situation is |
43 | | //! expected to never arise, except in the presence of a malicious remote. The decision has been |
44 | | //! taken that such protocol will always fail to negotiate, but will also not produce any error |
45 | | //! or panic. |
46 | | //! |
47 | | //! Please don't intentionally name a protocol `na`. |
48 | | //! |
49 | | //! # Usage |
50 | | //! |
51 | | //! To be written. |
52 | | //! |
53 | | //! # See also |
54 | | //! |
55 | | //! - [Official repository](https://github.com/multiformats/multistream-select) |
56 | | //! |
57 | | |
58 | | // TODO: write usage |
59 | | |
60 | | use super::super::read_write::ReadWrite; |
61 | | use crate::{libp2p::read_write, util::leb128}; |
62 | | |
63 | | use alloc::{collections::VecDeque, string::String}; |
64 | | use core::{cmp, fmt, str}; |
65 | | |
66 | | /// Configuration of a multistream-select protocol. |
67 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
68 | | pub enum Config<P> { |
69 | | /// Local node is the dialing side and requests the specific protocol. |
70 | | Dialer { |
71 | | /// Name of the protocol to try negotiate. The multistream-select negotiation will |
72 | | /// ultimately succeed if and only if the remote supports this protocol. |
73 | | requested_protocol: P, |
74 | | }, |
75 | | /// Local node is the listening side. |
76 | | Listener { |
77 | | /// Maximum allowed length of a protocol. Set this to a value superior or equal to the |
78 | | /// length of the longest protocol that is supported locally. |
79 | | /// |
80 | | /// This limit is necessary in order to prevent the remote from sending an infinite stream |
81 | | /// of data for the protocol name. |
82 | | max_protocol_name_len: usize, |
83 | | }, |
84 | | } |
85 | | |
86 | | /// Current state of a multistream-select negotiation. |
87 | | #[derive(Debug)] |
88 | | pub enum Negotiation<P> { |
89 | | /// Negotiation is still in progress. Use the provided [`InProgress`] object to inject and |
90 | | /// extract more data from/to the remote. |
91 | | InProgress(InProgress<P>), |
92 | | /// Negotiation is still in progress and is waiting for accepting or refusing the protocol |
93 | | /// requested by the remote. |
94 | | /// |
95 | | /// Can never happen if configured as the dialing side. |
96 | | ListenerAcceptOrDeny(ListenerAcceptOrDeny<P>), |
97 | | /// Negotiation has ended successfully. A protocol has been negotiated. |
98 | | Success, |
99 | | /// Negotiation has ended, but there isn't any protocol in common between the two parties. |
100 | | /// |
101 | | /// Can only ever happen as the dialing side. |
102 | | NotAvailable, |
103 | | } |
104 | | |
105 | | impl<P> Negotiation<P> |
106 | | where |
107 | | P: AsRef<str>, |
108 | | { |
109 | | /// Shortcut method for [`InProgress::new`] and wrapping the [`InProgress`] in a |
110 | | /// [`Negotiation`]. |
111 | 8 | pub fn new(config: Config<P>) -> Self { |
112 | 8 | Negotiation::InProgress(InProgress::new(config)) |
113 | 8 | } _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationReE3newB8_ Line | Count | Source | 111 | 4 | pub fn new(config: Config<P>) -> Self { | 112 | 4 | Negotiation::InProgress(InProgress::new(config)) | 113 | 4 | } |
_RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationNtNtCsdZExvAaxgia_5alloc6string6StringE3newB8_ Line | Count | Source | 111 | 4 | pub fn new(config: Config<P>) -> Self { | 112 | 4 | Negotiation::InProgress(InProgress::new(config)) | 113 | 4 | } |
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB2_11NegotiationpE3newB8_ |
114 | | } |
115 | | |
116 | | /// Negotiation is still in progress and is waiting for accepting or refusing the protocol |
117 | | /// requested by the remote. |
118 | | #[derive(Debug)] |
119 | | pub struct ListenerAcceptOrDeny<P> { |
120 | | inner: InProgress<P>, |
121 | | protocol: String, |
122 | | } |
123 | | |
124 | | impl<P> ListenerAcceptOrDeny<P> { |
125 | | /// Name of the protocol requested by the remote. |
126 | 40 | pub fn requested_protocol(&self) -> &str { |
127 | 40 | &self.protocol |
128 | 40 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolBa_ Line | Count | Source | 126 | 24 | pub fn requested_protocol(&self) -> &str { | 127 | 24 | &self.protocol | 128 | 24 | } |
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolBa_ Line | Count | Source | 126 | 16 | pub fn requested_protocol(&self) -> &str { | 127 | 16 | &self.protocol | 128 | 16 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE18requested_protocolBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE18requested_protocolCsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE18requested_protocolCsiUjFBJteJ7x_17smoldot_full_node |
129 | | |
130 | | /// Accept the requested protocol and resume the handshake. |
131 | 39 | pub fn accept(mut self) -> InProgress<P> { |
132 | 39 | debug_assert!(matches!0 (self.inner.state, InProgressState::CommandExpected)); |
133 | 39 | write_message( |
134 | 39 | Message::ProtocolOk(self.protocol.into_bytes()), |
135 | 39 | &mut self.inner.data_send_out, |
136 | 39 | ); |
137 | 39 | self.inner.state = InProgressState::Finishing; |
138 | 39 | self.inner |
139 | 39 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptBa_ Line | Count | Source | 131 | 23 | pub fn accept(mut self) -> InProgress<P> { | 132 | 23 | debug_assert!(matches!0 (self.inner.state, InProgressState::CommandExpected)); | 133 | 23 | write_message( | 134 | 23 | Message::ProtocolOk(self.protocol.into_bytes()), | 135 | 23 | &mut self.inner.data_send_out, | 136 | 23 | ); | 137 | 23 | self.inner.state = InProgressState::Finishing; | 138 | 23 | self.inner | 139 | 23 | } |
_RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptBa_ Line | Count | Source | 131 | 16 | pub fn accept(mut self) -> InProgress<P> { | 132 | 16 | debug_assert!(matches!0 (self.inner.state, InProgressState::CommandExpected)); | 133 | 16 | write_message( | 134 | 16 | Message::ProtocolOk(self.protocol.into_bytes()), | 135 | 16 | &mut self.inner.data_send_out, | 136 | 16 | ); | 137 | 16 | self.inner.state = InProgressState::Finishing; | 138 | 16 | self.inner | 139 | 16 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE6acceptBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6acceptCsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6acceptCsiUjFBJteJ7x_17smoldot_full_node |
140 | | |
141 | | /// Reject the requested protocol and resume the handshake. |
142 | 1 | pub fn reject(mut self) -> InProgress<P> { |
143 | 1 | debug_assert!(matches!0 (self.inner.state, InProgressState::CommandExpected)); |
144 | 1 | write_message( |
145 | 1 | Message::<&'static [u8]>::ProtocolNa, |
146 | 1 | &mut self.inner.data_send_out, |
147 | 1 | ); |
148 | 1 | self.inner |
149 | 1 | } _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectBa_ Line | Count | Source | 142 | 1 | pub fn reject(mut self) -> InProgress<P> { | 143 | 1 | debug_assert!(matches!0 (self.inner.state, InProgressState::CommandExpected)); | 144 | 1 | write_message( | 145 | 1 | Message::<&'static [u8]>::ProtocolNa, | 146 | 1 | &mut self.inner.data_send_out, | 147 | 1 | ); | 148 | 1 | self.inner | 149 | 1 | } |
Unexecuted instantiation: _RNvMs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenypE6rejectBa_ Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyNtNtCsdZExvAaxgia_5alloc6string6StringE6rejectCsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNvMs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB4_20ListenerAcceptOrDenyReE6rejectCsiUjFBJteJ7x_17smoldot_full_node |
150 | | } |
151 | | |
152 | | /// Negotiation in progress. |
153 | | pub struct InProgress<P> { |
154 | | /// Configuration of the negotiation. |
155 | | config: Config<P>, |
156 | | /// Data currently being sent out. |
157 | | data_send_out: VecDeque<u8>, |
158 | | /// Current state of the negotiation. |
159 | | state: InProgressState, |
160 | | /// Maximum allowed size of an incoming frame. |
161 | | max_in_frame_len: usize, |
162 | | /// Size of the next frame to receive, or `None` if not known yet. If `Some`, we have already |
163 | | /// extracted the length from the incoming buffer. |
164 | | next_in_frame_len: Option<usize>, |
165 | | } |
166 | | |
167 | | /// Current state of the negotiation. |
168 | | #[derive(Debug, Copy, Clone)] |
169 | | enum InProgressState { |
170 | | Finishing, |
171 | | HandshakeExpected, |
172 | | CommandExpected, |
173 | | ProtocolRequestAnswerExpected, |
174 | | } |
175 | | |
176 | | impl<P> InProgress<P> |
177 | | where |
178 | | P: AsRef<str>, |
179 | | { |
180 | | /// Initializes a new handshake state machine. |
181 | 80 | pub fn new(config: Config<P>) -> Self { |
182 | | // Length, in bytes, of the longest protocol name. |
183 | 80 | let max_proto_name_len = match &config { |
184 | 40 | Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(), |
185 | | Config::Listener { |
186 | 40 | max_protocol_name_len, |
187 | 40 | } => *max_protocol_name_len, |
188 | | }; |
189 | | |
190 | | // Any incoming frame larger than `max_frame_len` will trigger a protocol error. |
191 | | // This means that a protocol error might be reported in situations where the dialer |
192 | | // legitimately requests a protocol that the listener doesn't support. In order to prevent |
193 | | // confusion, a minimum length is applied to the protocol name length. Any protocol name |
194 | | // smaller than this will never trigger a protocol error, even if it isn't supported. |
195 | | const MIN_PROTO_LEN_NO_ERR: usize = 512; |
196 | 80 | let max_frame_len = cmp::max( |
197 | 80 | cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR), |
198 | 80 | HANDSHAKE.len(), |
199 | 80 | ) + 1; |
200 | 80 | |
201 | 80 | InProgress { |
202 | 80 | // Note that the listener theoretically doesn't necessarily have to immediately send |
203 | 80 | // a handshake, and could instead wait for a command from the dialer. In practice, |
204 | 80 | // however, the specification doesn't mention anything about this, and some libraries |
205 | 80 | // such as js-libp2p wait for the listener to send a handshake before emitting a |
206 | 80 | // command. |
207 | 80 | data_send_out: { |
208 | 80 | let mut data = VecDeque::new(); |
209 | 80 | write_message(Message::<&'static [u8]>::Handshake, &mut data); |
210 | 80 | if let Config::Dialer { requested_protocol40 } = &config { |
211 | 40 | write_message( |
212 | 40 | Message::ProtocolRequest(requested_protocol.as_ref()), |
213 | 40 | &mut data, |
214 | 40 | ); |
215 | 40 | } |
216 | 80 | data |
217 | 80 | }, |
218 | 80 | config, |
219 | 80 | state: InProgressState::HandshakeExpected, |
220 | 80 | max_in_frame_len: max_frame_len, |
221 | 80 | next_in_frame_len: None, |
222 | 80 | } |
223 | 80 | } _RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newBb_ Line | Count | Source | 181 | 44 | pub fn new(config: Config<P>) -> Self { | 182 | | // Length, in bytes, of the longest protocol name. | 183 | 44 | let max_proto_name_len = match &config { | 184 | 20 | Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(), | 185 | | Config::Listener { | 186 | 24 | max_protocol_name_len, | 187 | 24 | } => *max_protocol_name_len, | 188 | | }; | 189 | | | 190 | | // Any incoming frame larger than `max_frame_len` will trigger a protocol error. | 191 | | // This means that a protocol error might be reported in situations where the dialer | 192 | | // legitimately requests a protocol that the listener doesn't support. In order to prevent | 193 | | // confusion, a minimum length is applied to the protocol name length. Any protocol name | 194 | | // smaller than this will never trigger a protocol error, even if it isn't supported. | 195 | | const MIN_PROTO_LEN_NO_ERR: usize = 512; | 196 | 44 | let max_frame_len = cmp::max( | 197 | 44 | cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR), | 198 | 44 | HANDSHAKE.len(), | 199 | 44 | ) + 1; | 200 | 44 | | 201 | 44 | InProgress { | 202 | 44 | // Note that the listener theoretically doesn't necessarily have to immediately send | 203 | 44 | // a handshake, and could instead wait for a command from the dialer. In practice, | 204 | 44 | // however, the specification doesn't mention anything about this, and some libraries | 205 | 44 | // such as js-libp2p wait for the listener to send a handshake before emitting a | 206 | 44 | // command. | 207 | 44 | data_send_out: { | 208 | 44 | let mut data = VecDeque::new(); | 209 | 44 | write_message(Message::<&'static [u8]>::Handshake, &mut data); | 210 | 44 | if let Config::Dialer { requested_protocol20 } = &config { | 211 | 20 | write_message( | 212 | 20 | Message::ProtocolRequest(requested_protocol.as_ref()), | 213 | 20 | &mut data, | 214 | 20 | ); | 215 | 24 | } | 216 | 44 | data | 217 | 44 | }, | 218 | 44 | config, | 219 | 44 | state: InProgressState::HandshakeExpected, | 220 | 44 | max_in_frame_len: max_frame_len, | 221 | 44 | next_in_frame_len: None, | 222 | 44 | } | 223 | 44 | } |
_RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newBb_ Line | Count | Source | 181 | 36 | pub fn new(config: Config<P>) -> Self { | 182 | | // Length, in bytes, of the longest protocol name. | 183 | 36 | let max_proto_name_len = match &config { | 184 | 20 | Config::Dialer { requested_protocol } => requested_protocol.as_ref().len(), | 185 | | Config::Listener { | 186 | 16 | max_protocol_name_len, | 187 | 16 | } => *max_protocol_name_len, | 188 | | }; | 189 | | | 190 | | // Any incoming frame larger than `max_frame_len` will trigger a protocol error. | 191 | | // This means that a protocol error might be reported in situations where the dialer | 192 | | // legitimately requests a protocol that the listener doesn't support. In order to prevent | 193 | | // confusion, a minimum length is applied to the protocol name length. Any protocol name | 194 | | // smaller than this will never trigger a protocol error, even if it isn't supported. | 195 | | const MIN_PROTO_LEN_NO_ERR: usize = 512; | 196 | 36 | let max_frame_len = cmp::max( | 197 | 36 | cmp::max(max_proto_name_len, MIN_PROTO_LEN_NO_ERR), | 198 | 36 | HANDSHAKE.len(), | 199 | 36 | ) + 1; | 200 | 36 | | 201 | 36 | InProgress { | 202 | 36 | // Note that the listener theoretically doesn't necessarily have to immediately send | 203 | 36 | // a handshake, and could instead wait for a command from the dialer. In practice, | 204 | 36 | // however, the specification doesn't mention anything about this, and some libraries | 205 | 36 | // such as js-libp2p wait for the listener to send a handshake before emitting a | 206 | 36 | // command. | 207 | 36 | data_send_out: { | 208 | 36 | let mut data = VecDeque::new(); | 209 | 36 | write_message(Message::<&'static [u8]>::Handshake, &mut data); | 210 | 36 | if let Config::Dialer { requested_protocol20 } = &config { | 211 | 20 | write_message( | 212 | 20 | Message::ProtocolRequest(requested_protocol.as_ref()), | 213 | 20 | &mut data, | 214 | 20 | ); | 215 | 20 | }16 | 216 | 36 | data | 217 | 36 | }, | 218 | 36 | config, | 219 | 36 | state: InProgressState::HandshakeExpected, | 220 | 36 | max_in_frame_len: max_frame_len, | 221 | 36 | next_in_frame_len: None, | 222 | 36 | } | 223 | 36 | } |
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newBb_ Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE3newCsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressReE3newCsiUjFBJteJ7x_17smoldot_full_node |
224 | | |
225 | | /// If this function returns true, then the multistream-select handshake has finished writing |
226 | | /// all its data, and the API user can now start writing the protocol-specific data if it |
227 | | /// desires, even though the multistream-handshake isn't finished. |
228 | | /// |
229 | | /// If the remote supports the requested protocol, then doing so will save one networking |
230 | | /// round-trip. If however the remote doesn't support the requested protocol, then doing so |
231 | | /// will lead to confusing errors on the remote, as it will interpret the protocol-specific |
232 | | /// data as being from the multistream-select protocol, and the substream will be rendered |
233 | | /// unusable. Overall, saving a round-trip is usually seen as preferable over confusing |
234 | | /// errors. |
235 | 124 | pub fn can_write_protocol_data(&self) -> bool { |
236 | 124 | matches!78 (self.state, InProgressState::ProtocolRequestAnswerExpected) |
237 | 124 | } _RNvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataBb_ Line | Count | Source | 235 | 124 | pub fn can_write_protocol_data(&self) -> bool { | 236 | 124 | matches!78 (self.state, InProgressState::ProtocolRequestAnswerExpected) | 237 | 124 | } |
Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgresspE23can_write_protocol_dataBb_ Unexecuted instantiation: _RNvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB5_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE23can_write_protocol_dataCsiUjFBJteJ7x_17smoldot_full_node |
238 | | |
239 | | /// Feeds data coming from a socket, updates the internal state machine, and writes data |
240 | | /// destined to the socket. |
241 | | /// |
242 | | /// On success, returns the new state of the negotiation. |
243 | | /// |
244 | | /// An error is returned if the reading or writing are closed, or if the protocol is being |
245 | | /// violated by the remote. When that happens, the connection should be closed altogether. |
246 | 366 | pub fn read_write<TNow>( |
247 | 366 | mut self, |
248 | 366 | read_write: &mut ReadWrite<TNow>, |
249 | 366 | ) -> Result<Negotiation<P>, Error> { |
250 | | loop { |
251 | | // First, try to send out data currently being queued for sending. |
252 | 599 | read_write.write_from_vec_deque(&mut self.data_send_out); |
253 | 599 | |
254 | 599 | // The `Finishing` state is special because it doesn't expect any incoming message |
255 | 599 | // anymore, and just finishes the negotiation after all the data has been written out. |
256 | 599 | // As such, we do not proceed further unless we have finished sending out everything. |
257 | 599 | if let InProgressState::Finishing = self.state { |
258 | 39 | debug_assert!(matches!0 (self.config, Config::Listener { .. })); |
259 | 39 | if self.data_send_out.is_empty() { |
260 | 38 | return Ok(Negotiation::Success); |
261 | | } else { |
262 | 1 | break; |
263 | | } |
264 | 560 | } |
265 | | |
266 | | // Try to extract a message from the incoming buffer. |
267 | 560 | let mut frame154 = if let Some(next_frame_len234 ) = self.next_in_frame_len { |
268 | 234 | match read_write.incoming_bytes_take(next_frame_len) { |
269 | 79 | Ok(None) => return Ok(Negotiation::InProgress(self)), |
270 | 154 | Ok(Some(frame)) => { |
271 | 154 | self.next_in_frame_len = None; |
272 | 154 | frame |
273 | | } |
274 | 1 | Err(err) => return Err(Error::Frame(err)), |
275 | | } |
276 | | } else { |
277 | 326 | match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) { |
278 | 171 | Ok(None) => return Ok(Negotiation::InProgress(self)), |
279 | 155 | Ok(Some(size)) => { |
280 | 155 | self.next_in_frame_len = Some(size); |
281 | 155 | continue; |
282 | | } |
283 | 0 | Err(err) => return Err(Error::FrameLength(err)), |
284 | | } |
285 | | }; |
286 | | |
287 | 154 | match (self.state, &self.config) { |
288 | | (InProgressState::HandshakeExpected, Config::Dialer { .. }) => { |
289 | 38 | if &*frame != HANDSHAKE { |
290 | 0 | return Err(Error::BadHandshake); |
291 | 38 | } |
292 | 38 | |
293 | 38 | // The dialer immediately sends the request after its handshake and before |
294 | 38 | // waiting for the handshake from the listener. As such, after receiving the |
295 | 38 | // handshake, the next step is to wait for the request answer. |
296 | 38 | self.state = InProgressState::ProtocolRequestAnswerExpected; |
297 | | } |
298 | | |
299 | | (InProgressState::HandshakeExpected, Config::Listener { .. }) => { |
300 | 40 | if &*frame != HANDSHAKE { |
301 | 0 | return Err(Error::BadHandshake); |
302 | 40 | } |
303 | 40 | |
304 | 40 | // The listener immediately sends the handshake at initialization. When this |
305 | 40 | // code is reached, it has therefore already been sent. |
306 | 40 | self.state = InProgressState::CommandExpected; |
307 | | } |
308 | | |
309 | | (InProgressState::CommandExpected, Config::Listener { .. }) => { |
310 | 40 | if frame.pop() != Some(b'\n') { |
311 | 0 | return Err(Error::InvalidCommand); |
312 | 40 | } |
313 | | |
314 | 40 | let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand0 )?0 ; Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0Be_ Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writelE0Be_ Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0Be_ Unexecuted instantiation: _RNCINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writelE0Be_ Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgresspE10read_writepE0Be_ Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantE0CsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RNCINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB8_10InProgressReE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantE0CsiUjFBJteJ7x_17smoldot_full_node |
315 | | |
316 | 40 | return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny { |
317 | 40 | inner: self, |
318 | 40 | protocol, |
319 | 40 | })); |
320 | | } |
321 | | |
322 | | ( |
323 | | InProgressState::ProtocolRequestAnswerExpected, |
324 | 36 | Config::Dialer { requested_protocol }, |
325 | 36 | ) => { |
326 | 36 | if frame.pop() != Some(b'\n') { |
327 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); |
328 | 36 | } |
329 | 36 | if &*frame == b"na" { |
330 | | // Because of the order of checks, a protocol named `na` will never be |
331 | | // successfully negotiated. Debugging is expected to be less confusing if |
332 | | // the negotiation always fails. |
333 | 1 | return Ok(Negotiation::NotAvailable); |
334 | 35 | } |
335 | 35 | if frame != requested_protocol.as_ref().as_bytes() { |
336 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); |
337 | 35 | } |
338 | 35 | return Ok(Negotiation::Success); |
339 | | } |
340 | | |
341 | | // Invalid states. |
342 | | (InProgressState::CommandExpected, Config::Dialer { .. }) |
343 | | | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. }) |
344 | | | (InProgressState::Finishing, _) => { |
345 | 0 | unreachable!(); |
346 | | } |
347 | | }; |
348 | | } |
349 | | |
350 | | // This point should be reached only if data is lacking in order to proceed. |
351 | 1 | Ok(Negotiation::InProgress(self)) |
352 | 366 | } _RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEBc_ Line | Count | Source | 246 | 260 | pub fn read_write<TNow>( | 247 | 260 | mut self, | 248 | 260 | read_write: &mut ReadWrite<TNow>, | 249 | 260 | ) -> Result<Negotiation<P>, Error> { | 250 | | loop { | 251 | | // First, try to send out data currently being queued for sending. | 252 | 373 | read_write.write_from_vec_deque(&mut self.data_send_out); | 253 | 373 | | 254 | 373 | // The `Finishing` state is special because it doesn't expect any incoming message | 255 | 373 | // anymore, and just finishes the negotiation after all the data has been written out. | 256 | 373 | // As such, we do not proceed further unless we have finished sending out everything. | 257 | 373 | if let InProgressState::Finishing = self.state { | 258 | 18 | debug_assert!(matches!0 (self.config, Config::Listener { .. })); | 259 | 18 | if self.data_send_out.is_empty() { | 260 | 18 | return Ok(Negotiation::Success); | 261 | | } else { | 262 | 0 | break; | 263 | | } | 264 | 355 | } | 265 | | | 266 | | // Try to extract a message from the incoming buffer. | 267 | 355 | let mut frame74 = if let Some(next_frame_len149 ) = self.next_in_frame_len { | 268 | 149 | match read_write.incoming_bytes_take(next_frame_len) { | 269 | 74 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 270 | 74 | Ok(Some(frame)) => { | 271 | 74 | self.next_in_frame_len = None; | 272 | 74 | frame | 273 | | } | 274 | 1 | Err(err) => return Err(Error::Frame(err)), | 275 | | } | 276 | | } else { | 277 | 206 | match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) { | 278 | 131 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 279 | 75 | Ok(Some(size)) => { | 280 | 75 | self.next_in_frame_len = Some(size); | 281 | 75 | continue; | 282 | | } | 283 | 0 | Err(err) => return Err(Error::FrameLength(err)), | 284 | | } | 285 | | }; | 286 | | | 287 | 74 | match (self.state, &self.config) { | 288 | | (InProgressState::HandshakeExpected, Config::Dialer { .. }) => { | 289 | 18 | if &*frame != HANDSHAKE { | 290 | 0 | return Err(Error::BadHandshake); | 291 | 18 | } | 292 | 18 | | 293 | 18 | // The dialer immediately sends the request after its handshake and before | 294 | 18 | // waiting for the handshake from the listener. As such, after receiving the | 295 | 18 | // handshake, the next step is to wait for the request answer. | 296 | 18 | self.state = InProgressState::ProtocolRequestAnswerExpected; | 297 | | } | 298 | | | 299 | | (InProgressState::HandshakeExpected, Config::Listener { .. }) => { | 300 | 20 | if &*frame != HANDSHAKE { | 301 | 0 | return Err(Error::BadHandshake); | 302 | 20 | } | 303 | 20 | | 304 | 20 | // The listener immediately sends the handshake at initialization. When this | 305 | 20 | // code is reached, it has therefore already been sent. | 306 | 20 | self.state = InProgressState::CommandExpected; | 307 | | } | 308 | | | 309 | | (InProgressState::CommandExpected, Config::Listener { .. }) => { | 310 | 20 | if frame.pop() != Some(b'\n') { | 311 | 0 | return Err(Error::InvalidCommand); | 312 | 20 | } | 313 | | | 314 | 20 | let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)?0 ; | 315 | | | 316 | 20 | return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny { | 317 | 20 | inner: self, | 318 | 20 | protocol, | 319 | 20 | })); | 320 | | } | 321 | | | 322 | | ( | 323 | | InProgressState::ProtocolRequestAnswerExpected, | 324 | 16 | Config::Dialer { requested_protocol }, | 325 | 16 | ) => { | 326 | 16 | if frame.pop() != Some(b'\n') { | 327 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 328 | 16 | } | 329 | 16 | if &*frame == b"na" { | 330 | | // Because of the order of checks, a protocol named `na` will never be | 331 | | // successfully negotiated. Debugging is expected to be less confusing if | 332 | | // the negotiation always fails. | 333 | 1 | return Ok(Negotiation::NotAvailable); | 334 | 15 | } | 335 | 15 | if frame != requested_protocol.as_ref().as_bytes() { | 336 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 337 | 15 | } | 338 | 15 | return Ok(Negotiation::Success); | 339 | | } | 340 | | | 341 | | // Invalid states. | 342 | | (InProgressState::CommandExpected, Config::Dialer { .. }) | 343 | | | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. }) | 344 | | | (InProgressState::Finishing, _) => { | 345 | 0 | unreachable!(); | 346 | | } | 347 | | }; | 348 | | } | 349 | | | 350 | | // This point should be reached only if data is lacking in order to proceed. | 351 | 0 | Ok(Negotiation::InProgress(self)) | 352 | 260 | } |
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writelEBc_ Line | Count | Source | 246 | 13 | pub fn read_write<TNow>( | 247 | 13 | mut self, | 248 | 13 | read_write: &mut ReadWrite<TNow>, | 249 | 13 | ) -> Result<Negotiation<P>, Error> { | 250 | | loop { | 251 | | // First, try to send out data currently being queued for sending. | 252 | 25 | read_write.write_from_vec_deque(&mut self.data_send_out); | 253 | 25 | | 254 | 25 | // The `Finishing` state is special because it doesn't expect any incoming message | 255 | 25 | // anymore, and just finishes the negotiation after all the data has been written out. | 256 | 25 | // As such, we do not proceed further unless we have finished sending out everything. | 257 | 25 | if let InProgressState::Finishing = self.state { | 258 | 5 | debug_assert!(matches!0 (self.config, Config::Listener { .. })); | 259 | 5 | if self.data_send_out.is_empty() { | 260 | 4 | return Ok(Negotiation::Success); | 261 | | } else { | 262 | 1 | break; | 263 | | } | 264 | 20 | } | 265 | | | 266 | | // Try to extract a message from the incoming buffer. | 267 | 20 | let mut frame8 = if let Some(next_frame_len10 ) = self.next_in_frame_len { | 268 | 10 | match read_write.incoming_bytes_take(next_frame_len) { | 269 | 2 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 270 | 8 | Ok(Some(frame)) => { | 271 | 8 | self.next_in_frame_len = None; | 272 | 8 | frame | 273 | | } | 274 | 0 | Err(err) => return Err(Error::Frame(err)), | 275 | | } | 276 | | } else { | 277 | 10 | match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) { | 278 | 2 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 279 | 8 | Ok(Some(size)) => { | 280 | 8 | self.next_in_frame_len = Some(size); | 281 | 8 | continue; | 282 | | } | 283 | 0 | Err(err) => return Err(Error::FrameLength(err)), | 284 | | } | 285 | | }; | 286 | | | 287 | 8 | match (self.state, &self.config) { | 288 | | (InProgressState::HandshakeExpected, Config::Dialer { .. }) => { | 289 | 0 | if &*frame != HANDSHAKE { | 290 | 0 | return Err(Error::BadHandshake); | 291 | 0 | } | 292 | 0 |
| 293 | 0 | // The dialer immediately sends the request after its handshake and before | 294 | 0 | // waiting for the handshake from the listener. As such, after receiving the | 295 | 0 | // handshake, the next step is to wait for the request answer. | 296 | 0 | self.state = InProgressState::ProtocolRequestAnswerExpected; | 297 | | } | 298 | | | 299 | | (InProgressState::HandshakeExpected, Config::Listener { .. }) => { | 300 | 4 | if &*frame != HANDSHAKE { | 301 | 0 | return Err(Error::BadHandshake); | 302 | 4 | } | 303 | 4 | | 304 | 4 | // The listener immediately sends the handshake at initialization. When this | 305 | 4 | // code is reached, it has therefore already been sent. | 306 | 4 | self.state = InProgressState::CommandExpected; | 307 | | } | 308 | | | 309 | | (InProgressState::CommandExpected, Config::Listener { .. }) => { | 310 | 4 | if frame.pop() != Some(b'\n') { | 311 | 0 | return Err(Error::InvalidCommand); | 312 | 4 | } | 313 | | | 314 | 4 | let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)?0 ; | 315 | | | 316 | 4 | return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny { | 317 | 4 | inner: self, | 318 | 4 | protocol, | 319 | 4 | })); | 320 | | } | 321 | | | 322 | | ( | 323 | | InProgressState::ProtocolRequestAnswerExpected, | 324 | 0 | Config::Dialer { requested_protocol }, | 325 | 0 | ) => { | 326 | 0 | if frame.pop() != Some(b'\n') { | 327 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 328 | 0 | } | 329 | 0 | if &*frame == b"na" { | 330 | | // Because of the order of checks, a protocol named `na` will never be | 331 | | // successfully negotiated. Debugging is expected to be less confusing if | 332 | | // the negotiation always fails. | 333 | 0 | return Ok(Negotiation::NotAvailable); | 334 | 0 | } | 335 | 0 | if frame != requested_protocol.as_ref().as_bytes() { | 336 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 337 | 0 | } | 338 | 0 | return Ok(Negotiation::Success); | 339 | | } | 340 | | | 341 | | // Invalid states. | 342 | | (InProgressState::CommandExpected, Config::Dialer { .. }) | 343 | | | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. }) | 344 | | | (InProgressState::Finishing, _) => { | 345 | 0 | unreachable!(); | 346 | | } | 347 | | }; | 348 | | } | 349 | | | 350 | | // This point should be reached only if data is lacking in order to proceed. | 351 | 1 | Ok(Negotiation::InProgress(self)) | 352 | 13 | } |
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEBc_ Line | Count | Source | 246 | 63 | pub fn read_write<TNow>( | 247 | 63 | mut self, | 248 | 63 | read_write: &mut ReadWrite<TNow>, | 249 | 63 | ) -> Result<Negotiation<P>, Error> { | 250 | | loop { | 251 | | // First, try to send out data currently being queued for sending. | 252 | 147 | read_write.write_from_vec_deque(&mut self.data_send_out); | 253 | 147 | | 254 | 147 | // The `Finishing` state is special because it doesn't expect any incoming message | 255 | 147 | // anymore, and just finishes the negotiation after all the data has been written out. | 256 | 147 | // As such, we do not proceed further unless we have finished sending out everything. | 257 | 147 | if let InProgressState::Finishing = self.state { | 258 | 14 | debug_assert!(matches!0 (self.config, Config::Listener { .. })); | 259 | 14 | if self.data_send_out.is_empty() { | 260 | 14 | return Ok(Negotiation::Success); | 261 | | } else { | 262 | 0 | break; | 263 | | } | 264 | 133 | } | 265 | | | 266 | | // Try to extract a message from the incoming buffer. | 267 | 133 | let mut frame56 = if let Some(next_frame_len56 ) = self.next_in_frame_len { | 268 | 56 | match read_write.incoming_bytes_take(next_frame_len) { | 269 | 0 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 270 | 56 | Ok(Some(frame)) => { | 271 | 56 | self.next_in_frame_len = None; | 272 | 56 | frame | 273 | | } | 274 | 0 | Err(err) => return Err(Error::Frame(err)), | 275 | | } | 276 | | } else { | 277 | 77 | match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) { | 278 | 21 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 279 | 56 | Ok(Some(size)) => { | 280 | 56 | self.next_in_frame_len = Some(size); | 281 | 56 | continue; | 282 | | } | 283 | 0 | Err(err) => return Err(Error::FrameLength(err)), | 284 | | } | 285 | | }; | 286 | | | 287 | 56 | match (self.state, &self.config) { | 288 | | (InProgressState::HandshakeExpected, Config::Dialer { .. }) => { | 289 | 14 | if &*frame != HANDSHAKE { | 290 | 0 | return Err(Error::BadHandshake); | 291 | 14 | } | 292 | 14 | | 293 | 14 | // The dialer immediately sends the request after its handshake and before | 294 | 14 | // waiting for the handshake from the listener. As such, after receiving the | 295 | 14 | // handshake, the next step is to wait for the request answer. | 296 | 14 | self.state = InProgressState::ProtocolRequestAnswerExpected; | 297 | | } | 298 | | | 299 | | (InProgressState::HandshakeExpected, Config::Listener { .. }) => { | 300 | 14 | if &*frame != HANDSHAKE { | 301 | 0 | return Err(Error::BadHandshake); | 302 | 14 | } | 303 | 14 | | 304 | 14 | // The listener immediately sends the handshake at initialization. When this | 305 | 14 | // code is reached, it has therefore already been sent. | 306 | 14 | self.state = InProgressState::CommandExpected; | 307 | | } | 308 | | | 309 | | (InProgressState::CommandExpected, Config::Listener { .. }) => { | 310 | 14 | if frame.pop() != Some(b'\n') { | 311 | 0 | return Err(Error::InvalidCommand); | 312 | 14 | } | 313 | | | 314 | 14 | let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)?0 ; | 315 | | | 316 | 14 | return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny { | 317 | 14 | inner: self, | 318 | 14 | protocol, | 319 | 14 | })); | 320 | | } | 321 | | | 322 | | ( | 323 | | InProgressState::ProtocolRequestAnswerExpected, | 324 | 14 | Config::Dialer { requested_protocol }, | 325 | 14 | ) => { | 326 | 14 | if frame.pop() != Some(b'\n') { | 327 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 328 | 14 | } | 329 | 14 | if &*frame == b"na" { | 330 | | // Because of the order of checks, a protocol named `na` will never be | 331 | | // successfully negotiated. Debugging is expected to be less confusing if | 332 | | // the negotiation always fails. | 333 | 0 | return Ok(Negotiation::NotAvailable); | 334 | 14 | } | 335 | 14 | if frame != requested_protocol.as_ref().as_bytes() { | 336 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 337 | 14 | } | 338 | 14 | return Ok(Negotiation::Success); | 339 | | } | 340 | | | 341 | | // Invalid states. | 342 | | (InProgressState::CommandExpected, Config::Dialer { .. }) | 343 | | | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. }) | 344 | | | (InProgressState::Finishing, _) => { | 345 | 0 | unreachable!(); | 346 | | } | 347 | | }; | 348 | | } | 349 | | | 350 | | // This point should be reached only if data is lacking in order to proceed. | 351 | 0 | Ok(Negotiation::InProgress(self)) | 352 | 63 | } |
_RINvMs0_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writelEBc_ Line | Count | Source | 246 | 30 | pub fn read_write<TNow>( | 247 | 30 | mut self, | 248 | 30 | read_write: &mut ReadWrite<TNow>, | 249 | 30 | ) -> Result<Negotiation<P>, Error> { | 250 | | loop { | 251 | | // First, try to send out data currently being queued for sending. | 252 | 54 | read_write.write_from_vec_deque(&mut self.data_send_out); | 253 | 54 | | 254 | 54 | // The `Finishing` state is special because it doesn't expect any incoming message | 255 | 54 | // anymore, and just finishes the negotiation after all the data has been written out. | 256 | 54 | // As such, we do not proceed further unless we have finished sending out everything. | 257 | 54 | if let InProgressState::Finishing = self.state { | 258 | 2 | debug_assert!(matches!0 (self.config, Config::Listener { .. })); | 259 | 2 | if self.data_send_out.is_empty() { | 260 | 2 | return Ok(Negotiation::Success); | 261 | | } else { | 262 | 0 | break; | 263 | | } | 264 | 52 | } | 265 | | | 266 | | // Try to extract a message from the incoming buffer. | 267 | 52 | let mut frame16 = if let Some(next_frame_len19 ) = self.next_in_frame_len { | 268 | 19 | match read_write.incoming_bytes_take(next_frame_len) { | 269 | 3 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 270 | 16 | Ok(Some(frame)) => { | 271 | 16 | self.next_in_frame_len = None; | 272 | 16 | frame | 273 | | } | 274 | 0 | Err(err) => return Err(Error::Frame(err)), | 275 | | } | 276 | | } else { | 277 | 33 | match read_write.incoming_bytes_take_leb128(self.max_in_frame_len) { | 278 | 17 | Ok(None) => return Ok(Negotiation::InProgress(self)), | 279 | 16 | Ok(Some(size)) => { | 280 | 16 | self.next_in_frame_len = Some(size); | 281 | 16 | continue; | 282 | | } | 283 | 0 | Err(err) => return Err(Error::FrameLength(err)), | 284 | | } | 285 | | }; | 286 | | | 287 | 16 | match (self.state, &self.config) { | 288 | | (InProgressState::HandshakeExpected, Config::Dialer { .. }) => { | 289 | 6 | if &*frame != HANDSHAKE { | 290 | 0 | return Err(Error::BadHandshake); | 291 | 6 | } | 292 | 6 | | 293 | 6 | // The dialer immediately sends the request after its handshake and before | 294 | 6 | // waiting for the handshake from the listener. As such, after receiving the | 295 | 6 | // handshake, the next step is to wait for the request answer. | 296 | 6 | self.state = InProgressState::ProtocolRequestAnswerExpected; | 297 | | } | 298 | | | 299 | | (InProgressState::HandshakeExpected, Config::Listener { .. }) => { | 300 | 2 | if &*frame != HANDSHAKE { | 301 | 0 | return Err(Error::BadHandshake); | 302 | 2 | } | 303 | 2 | | 304 | 2 | // The listener immediately sends the handshake at initialization. When this | 305 | 2 | // code is reached, it has therefore already been sent. | 306 | 2 | self.state = InProgressState::CommandExpected; | 307 | | } | 308 | | | 309 | | (InProgressState::CommandExpected, Config::Listener { .. }) => { | 310 | 2 | if frame.pop() != Some(b'\n') { | 311 | 0 | return Err(Error::InvalidCommand); | 312 | 2 | } | 313 | | | 314 | 2 | let protocol = String::from_utf8(frame).map_err(|_| Error::InvalidCommand)?0 ; | 315 | | | 316 | 2 | return Ok(Negotiation::ListenerAcceptOrDeny(ListenerAcceptOrDeny { | 317 | 2 | inner: self, | 318 | 2 | protocol, | 319 | 2 | })); | 320 | | } | 321 | | | 322 | | ( | 323 | | InProgressState::ProtocolRequestAnswerExpected, | 324 | 6 | Config::Dialer { requested_protocol }, | 325 | 6 | ) => { | 326 | 6 | if frame.pop() != Some(b'\n') { | 327 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 328 | 6 | } | 329 | 6 | if &*frame == b"na" { | 330 | | // Because of the order of checks, a protocol named `na` will never be | 331 | | // successfully negotiated. Debugging is expected to be less confusing if | 332 | | // the negotiation always fails. | 333 | 0 | return Ok(Negotiation::NotAvailable); | 334 | 6 | } | 335 | 6 | if frame != requested_protocol.as_ref().as_bytes() { | 336 | 0 | return Err(Error::UnexpectedProtocolRequestAnswer); | 337 | 6 | } | 338 | 6 | return Ok(Negotiation::Success); | 339 | | } | 340 | | | 341 | | // Invalid states. | 342 | | (InProgressState::CommandExpected, Config::Dialer { .. }) | 343 | | | (InProgressState::ProtocolRequestAnswerExpected, Config::Listener { .. }) | 344 | | | (InProgressState::Finishing, _) => { | 345 | 0 | unreachable!(); | 346 | | } | 347 | | }; | 348 | | } | 349 | | | 350 | | // This point should be reached only if data is lacking in order to proceed. | 351 | 0 | Ok(Negotiation::InProgress(self)) | 352 | 30 | } |
Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsaYZPK01V26L_4core4time8DurationECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgresspE10read_writepEBc_ Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressNtNtCsdZExvAaxgia_5alloc6string6StringE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantECsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RINvMs0_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectINtB6_10InProgressReE10read_writeNtNtCsbpXXxgr6u8g_3std4time7InstantECsiUjFBJteJ7x_17smoldot_full_node |
353 | | } |
354 | | |
355 | | impl<P> fmt::Debug for InProgress<P> { |
356 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
357 | 0 | f.debug_tuple("InProgress").finish() |
358 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selects1_0pEINtB5_10InProgresspENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selects1_0pEINtB5_10InProgresspENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBb_ |
359 | | } |
360 | | |
361 | | /// Error that can happen during the negotiation. |
362 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXsf_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_selectNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsf_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_selectNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
363 | | pub enum Error { |
364 | | /// Reading side of the connection is closed. The handshake can't proceed further. |
365 | | ReadClosed, |
366 | | /// Writing side of the connection is closed. The handshake can't proceed further. |
367 | | WriteClosed, |
368 | | /// Error while decoding a frame length, or frame size limit reached. |
369 | | #[display(fmt = "LEB128 frame error: {_0}")] |
370 | | FrameLength(read_write::IncomingBytesTakeLeb128Error), |
371 | | /// Error while decoding a frame. |
372 | | #[display(fmt = "LEB128 frame error: {_0}")] |
373 | | Frame(read_write::IncomingBytesTakeError), |
374 | | /// Unknown handshake or unknown multistream-select protocol version. |
375 | | BadHandshake, |
376 | | /// Received empty command. |
377 | | InvalidCommand, |
378 | | /// Received answer to protocol request that doesn't match the requested protocol. |
379 | | UnexpectedProtocolRequestAnswer, |
380 | | } |
381 | | |
382 | | /// Handshake message sent by both parties at the beginning of each multistream-select negotiation. |
383 | | const HANDSHAKE: &[u8] = b"/multistream/1.0.0\n"; |
384 | | |
385 | | /// Message on the multistream-select protocol. |
386 | | #[derive(Debug, Copy, Clone)] |
387 | | enum Message<P> { |
388 | | Handshake, |
389 | | ProtocolRequest(P), |
390 | | ProtocolOk(P), |
391 | | ProtocolNa, |
392 | | } |
393 | | |
394 | 163 | fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) { |
395 | 163 | match message { |
396 | 81 | Message::Handshake => { |
397 | 81 | out.reserve(HANDSHAKE.len() + 4); |
398 | 81 | out.extend(leb128::encode_usize(HANDSHAKE.len())); |
399 | 81 | out.extend(HANDSHAKE); |
400 | 81 | } |
401 | 80 | Message::ProtocolRequest(p41 ) | Message::ProtocolOk(p39 ) => { |
402 | 80 | let p = p.as_ref(); |
403 | 80 | out.reserve(p.len() + 5); |
404 | 80 | out.extend(leb128::encode_usize(p.len() + 1)); |
405 | 80 | out.extend(p); |
406 | 80 | out.push_back(b'\n'); |
407 | 80 | } |
408 | 2 | Message::ProtocolNa => { |
409 | 2 | out.reserve(8); |
410 | 2 | out.extend(leb128::encode_usize(3)); |
411 | 2 | out.extend(b"na\n"); |
412 | 2 | } |
413 | | } |
414 | 163 | } _RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEEB8_ Line | Count | Source | 394 | 39 | fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) { | 395 | 39 | match message { | 396 | 0 | Message::Handshake => { | 397 | 0 | out.reserve(HANDSHAKE.len() + 4); | 398 | 0 | out.extend(leb128::encode_usize(HANDSHAKE.len())); | 399 | 0 | out.extend(HANDSHAKE); | 400 | 0 | } | 401 | 39 | Message::ProtocolRequest(p0 ) | Message::ProtocolOk(p) => { | 402 | 39 | let p = p.as_ref(); | 403 | 39 | out.reserve(p.len() + 5); | 404 | 39 | out.extend(leb128::encode_usize(p.len() + 1)); | 405 | 39 | out.extend(p); | 406 | 39 | out.push_back(b'\n'); | 407 | 39 | } | 408 | 0 | Message::ProtocolNa => { | 409 | 0 | out.reserve(8); | 410 | 0 | out.extend(leb128::encode_usize(3)); | 411 | 0 | out.extend(b"na\n"); | 412 | 0 | } | 413 | | } | 414 | 39 | } |
_RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageRShEB8_ Line | Count | Source | 394 | 83 | fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) { | 395 | 83 | match message { | 396 | 81 | Message::Handshake => { | 397 | 81 | out.reserve(HANDSHAKE.len() + 4); | 398 | 81 | out.extend(leb128::encode_usize(HANDSHAKE.len())); | 399 | 81 | out.extend(HANDSHAKE); | 400 | 81 | } | 401 | 0 | Message::ProtocolRequest(p) | Message::ProtocolOk(p) => { | 402 | 0 | let p = p.as_ref(); | 403 | 0 | out.reserve(p.len() + 5); | 404 | 0 | out.extend(leb128::encode_usize(p.len() + 1)); | 405 | 0 | out.extend(p); | 406 | 0 | out.push_back(b'\n'); | 407 | 0 | } | 408 | 2 | Message::ProtocolNa => { | 409 | 2 | out.reserve(8); | 410 | 2 | out.extend(leb128::encode_usize(3)); | 411 | 2 | out.extend(b"na\n"); | 412 | 2 | } | 413 | | } | 414 | 83 | } |
_RINvNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection18multistream_select13write_messageReEB8_ Line | Count | Source | 394 | 41 | fn write_message(message: Message<impl AsRef<[u8]>>, out: &mut VecDeque<u8>) { | 395 | 41 | match message { | 396 | 0 | Message::Handshake => { | 397 | 0 | out.reserve(HANDSHAKE.len() + 4); | 398 | 0 | out.extend(leb128::encode_usize(HANDSHAKE.len())); | 399 | 0 | out.extend(HANDSHAKE); | 400 | 0 | } | 401 | 41 | Message::ProtocolRequest(p) | Message::ProtocolOk(p0 ) => { | 402 | 41 | let p = p.as_ref(); | 403 | 41 | out.reserve(p.len() + 5); | 404 | 41 | out.extend(leb128::encode_usize(p.len() + 1)); | 405 | 41 | out.extend(p); | 406 | 41 | out.push_back(b'\n'); | 407 | 41 | } | 408 | 0 | Message::ProtocolNa => { | 409 | 0 | out.reserve(8); | 410 | 0 | out.extend(leb128::encode_usize(3)); | 411 | 0 | out.extend(b"na\n"); | 412 | 0 | } | 413 | | } | 414 | 41 | } |
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShEB8_ Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReEB8_ Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageINtNtCsdZExvAaxgia_5alloc3vec3VechEECsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageRShECsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection18multistream_select13write_messageReECsiUjFBJteJ7x_17smoldot_full_node |
415 | | |
416 | | #[cfg(test)] |
417 | | mod tests { |
418 | | use alloc::collections::VecDeque; |
419 | | use core::{cmp, mem}; |
420 | | |
421 | | use super::{super::super::read_write::ReadWrite, write_message, Config, Message, Negotiation}; |
422 | | |
423 | | #[test] |
424 | 1 | fn encode() { |
425 | 1 | let mut message = VecDeque::new(); |
426 | 1 | |
427 | 1 | write_message(Message::<&'static [u8]>::Handshake, &mut message); |
428 | 1 | assert_eq!( |
429 | 1 | message.drain(..).collect::<Vec<_>>(), |
430 | 1 | b"\x13/multistream/1.0.0\n".to_vec() |
431 | 1 | ); |
432 | | |
433 | 1 | write_message(Message::ProtocolRequest("/hello"), &mut message); |
434 | 1 | assert_eq!( |
435 | 1 | message.drain(..).collect::<Vec<_>>(), |
436 | 1 | b"\x07/hello\n".to_vec() |
437 | 1 | ); |
438 | | |
439 | 1 | write_message(Message::<&'static [u8]>::ProtocolNa, &mut message); |
440 | 1 | assert_eq!(message.drain(..).collect::<Vec<_>>(), b"\x03na\n".to_vec()); |
441 | 1 | } |
442 | | |
443 | | #[test] |
444 | 1 | fn negotiation_basic_works() { |
445 | 4 | fn test_with_buffer_sizes(mut size1: usize, mut size2: usize) { |
446 | 4 | let mut negotiation1 = Negotiation::new(Config::Dialer { |
447 | 4 | requested_protocol: "/foo", |
448 | 4 | }); |
449 | 4 | let mut negotiation2 = Negotiation::new(Config::<String>::Listener { |
450 | 4 | max_protocol_name_len: 4, |
451 | 4 | }); |
452 | 4 | |
453 | 4 | let mut buf_1_to_2 = Vec::new(); |
454 | 4 | let mut buf_2_to_1 = Vec::new(); |
455 | 4 | |
456 | 4 | let mut num_iterations = 0; |
457 | 1 | |
458 | 21 | while !matches!( |
459 | 25 | (&negotiation1, &negotiation2), |
460 | 1 | (Negotiation::Success, Negotiation::Success) |
461 | 1 | ) { |
462 | 21 | num_iterations += 1; |
463 | 21 | assert!(num_iterations <= 5000); |
464 | 1 | |
465 | 21 | match negotiation1 { |
466 | 21 | Negotiation::InProgress(nego) => { |
467 | 21 | let mut read_write = ReadWrite { |
468 | 21 | now: 0, |
469 | 21 | incoming_buffer: buf_2_to_1, |
470 | 21 | expected_incoming_bytes: Some(0), |
471 | 21 | read_bytes: 0, |
472 | 21 | write_bytes_queued: buf_1_to_2.len(), |
473 | 21 | write_bytes_queueable: Some(size1 - buf_1_to_2.len()), |
474 | 21 | write_buffers: vec![mem::take(&mut buf_1_to_2)], |
475 | 21 | wake_up_after: None, |
476 | 21 | }; |
477 | 21 | negotiation1 = nego.read_write(&mut read_write).unwrap(); |
478 | 21 | buf_2_to_1 = read_write.incoming_buffer; |
479 | 21 | buf_1_to_2.extend( |
480 | 21 | read_write |
481 | 21 | .write_buffers |
482 | 21 | .drain(..) |
483 | 37 | .flat_map(|b| b.into_iter()), |
484 | 21 | ); |
485 | 21 | size2 = cmp::max(size2, read_write.expected_incoming_bytes.unwrap_or(0)); |
486 | 21 | } |
487 | 1 | Negotiation::Success => {}0 |
488 | 1 | Negotiation::ListenerAcceptOrDeny(_) => unreachable!()0 , |
489 | 1 | Negotiation::NotAvailable => panic!()0 , |
490 | 1 | } |
491 | 1 | |
492 | 4 | match negotiation2 { |
493 | 13 | Negotiation::InProgress(nego) => { |
494 | 13 | let mut read_write = ReadWrite { |
495 | 13 | now: 0, |
496 | 13 | incoming_buffer: buf_1_to_2, |
497 | 13 | expected_incoming_bytes: Some(0), |
498 | 13 | read_bytes: 0, |
499 | 13 | write_bytes_queued: buf_2_to_1.len(), |
500 | 13 | write_bytes_queueable: Some(size2 - buf_2_to_1.len()), |
501 | 13 | write_buffers: vec![mem::take(&mut buf_2_to_1)], |
502 | 13 | wake_up_after: None, |
503 | 13 | }; |
504 | 13 | negotiation2 = nego.read_write(&mut read_write).unwrap(); |
505 | 13 | buf_1_to_2 = read_write.incoming_buffer; |
506 | 13 | buf_2_to_1.extend( |
507 | 13 | read_write |
508 | 13 | .write_buffers |
509 | 13 | .drain(..) |
510 | 33 | .flat_map(|b| b.into_iter()), |
511 | 13 | ); |
512 | 13 | size1 = cmp::max(size1, read_write.expected_incoming_bytes.unwrap_or(0)); |
513 | 13 | } |
514 | 4 | Negotiation::ListenerAcceptOrDeny(accept_reject) |
515 | 4 | if accept_reject.requested_protocol() == "/foo" => |
516 | 4 | { |
517 | 4 | negotiation2 = Negotiation::InProgress(accept_reject.accept()); |
518 | 4 | } |
519 | 1 | Negotiation::ListenerAcceptOrDeny(accept_reject) => { |
520 | 0 | negotiation2 = Negotiation::InProgress(accept_reject.reject()); |
521 | 0 | } |
522 | 4 | Negotiation::Success => {} |
523 | 1 | Negotiation::NotAvailable => panic!()0 , |
524 | 1 | } |
525 | 1 | } |
526 | 4 | } |
527 | 1 | |
528 | 1 | test_with_buffer_sizes(256, 256); |
529 | 1 | test_with_buffer_sizes(1, 1); |
530 | 1 | test_with_buffer_sizes(1, 2048); |
531 | 1 | test_with_buffer_sizes(2048, 1); |
532 | 1 | } |
533 | | } |