/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/webrtc_framing.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2023 Pierre Krieger |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! |
19 | | //! See <https://github.com/libp2p/specs/blob/master/webrtc/README.md#multiplexing>. |
20 | | |
21 | | use crate::{ |
22 | | libp2p::read_write::ReadWrite, |
23 | | util::{leb128, protobuf}, |
24 | | }; |
25 | | |
26 | | use alloc::{borrow::ToOwned as _, vec::Vec}; |
27 | | use core::{cmp, fmt, mem, ops}; |
28 | | |
29 | | /// State of the framing. |
30 | | pub struct WebRtcFraming { |
31 | | /// Value of [`ReadWrite::expected_incoming_bytes`] of the inner stream the last time that |
32 | | /// [`WebRtcFraming::read_write`] was called. `None` if unknown. |
33 | | inner_stream_expected_incoming_bytes: Option<usize>, |
34 | | |
35 | | /// Buffer containing data from a previous frame, but that doesn't contain enough data for |
36 | | /// the underlying substream to accept it. |
37 | | /// |
38 | | /// In other words, `receive_buffer.len() < inner_stream_expected_incoming_bytes`. |
39 | | // TODO: shrink_to_fit? |
40 | | receive_buffer: Vec<u8>, |
41 | | |
42 | | /// State of the writing side of the remote. |
43 | | remote_write_state: RemoteWriteState, |
44 | | |
45 | | /// State of the local writing side. |
46 | | local_write_state: LocalWriteState, |
47 | | } |
48 | | |
49 | | enum LocalWriteState { |
50 | | Open, |
51 | | FinBuffered, |
52 | | FinAcked, |
53 | | } |
54 | | |
55 | | enum RemoteWriteState { |
56 | | Open, |
57 | | /// The remote has sent a `FIN` in the past. Any data in [`WebRtcFraming::receive_buffer`] |
58 | | /// is still valid and was received before the remote writing side was closed. |
59 | | Closed, |
60 | | ClosedAckBuffered, |
61 | | } |
62 | | |
63 | | const RECEIVE_BUFFER_CAPACITY: usize = 2048; |
64 | | /// Minimum size in bytes of the protobuf frame surrounding the message. |
65 | | const PROTOBUF_FRAME_MIN_LEN: usize = 2; |
66 | | /// Maximum size in bytes of the protobuf frame surrounding the message. |
67 | | const PROTOBUF_FRAME_MAX_LEN: usize = 8; // TODO: calculate better? |
68 | | const MAX_PROTOBUF_MESSAGE_LEN: usize = 16384; |
69 | | |
70 | | impl WebRtcFraming { |
71 | | /// Initializes a new [`WebRtcFraming`]. |
72 | 0 | pub fn new() -> Self { |
73 | 0 | WebRtcFraming { |
74 | 0 | inner_stream_expected_incoming_bytes: None, |
75 | 0 | receive_buffer: Vec::with_capacity(RECEIVE_BUFFER_CAPACITY), |
76 | 0 | remote_write_state: RemoteWriteState::Open, |
77 | 0 | local_write_state: LocalWriteState::Open, |
78 | 0 | } |
79 | 0 | } Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB2_13WebRtcFraming3new Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB2_13WebRtcFraming3new |
80 | | |
81 | | /// Feeds data coming from a socket and outputs data to write to the socket. |
82 | | /// |
83 | | /// Returns an object that implements `Deref<Target = ReadWrite>`. This object represents the |
84 | | /// decrypted stream of data. |
85 | | /// |
86 | | /// An error is returned if the protocol is being violated by the remote, if the remote wants |
87 | | /// to reset the substream. |
88 | 0 | pub fn read_write<'a, TNow: Clone>( |
89 | 0 | &'a mut self, |
90 | 0 | outer_read_write: &'a mut ReadWrite<TNow>, |
91 | 0 | ) -> Result<InnerReadWrite<'a, TNow>, Error> { |
92 | | // Read from the incoming buffer until we have enough data for the underlying substream. |
93 | | loop { |
94 | | // Immediately stop looping if there is enough data for the underlying substream. |
95 | | // Also stop looping if `inner_stream_expected_incoming_bytes` is `None`, as we always |
96 | | // want to process the inner substream the first time ever. |
97 | 0 | if self |
98 | 0 | .inner_stream_expected_incoming_bytes |
99 | 0 | .map_or(true, |rq_bytes| rq_bytes <= self.receive_buffer.len()) Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepE0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationE0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepE0Bb_ |
100 | | { |
101 | 0 | break; |
102 | 0 | } |
103 | | |
104 | | // Try to parse a frame from the incoming buffer. |
105 | 0 | let bytes_to_discard = { |
106 | | // TODO: we could in theory demand from the outside just the protobuf header, and then later the data, which would save some copying but might considerably complexifies the code |
107 | 0 | let mut parser = |
108 | 0 | nom::combinator::map_parser::<_, _, _, nom::error::Error<&[u8]>, _, _>( |
109 | 0 | nom::multi::length_data(crate::util::leb128::nom_leb128_usize), |
110 | 0 | protobuf::message_decode! { |
111 | 0 | #[optional] flags = 1 => protobuf::enum_tag_decode, |
112 | 0 | #[optional] message = 2 => protobuf::bytes_tag_decode, Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs4_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs4_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs4_0Bb_ |
113 | 0 | }, |
114 | 0 | ); |
115 | 0 | match parser(&outer_read_write.incoming_buffer) { |
116 | 0 | Ok((rest, framed_message)) => { |
117 | 0 | // The remote has sent a `RESET_STREAM` flag, immediately stop with an error. |
118 | 0 | // The specification mentions that the receiver may discard any data already |
119 | 0 | // received, which we do. |
120 | 0 | if framed_message.flags.map_or(false, |f| f == 2) { Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs_0Bb_ |
121 | 0 | return Err(Error::RemoteResetDesired); |
122 | 0 | } |
123 | 0 |
|
124 | 0 | // Some protocol check. |
125 | 0 | if framed_message.message.map_or(false, |msg| !msg.is_empty()) Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs0_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs0_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs0_0Bb_ |
126 | 0 | && !matches!(self.remote_write_state, RemoteWriteState::Open) |
127 | | { |
128 | 0 | return Err(Error::DataAfterFin); |
129 | 0 | } |
130 | 0 |
|
131 | 0 | // Process the `FIN_ACK` flag sent by the remote. |
132 | 0 | // Note that we don't treat it as an error if the remote sends the |
133 | 0 | // `FIN_ACK` flag multiple times, although this is opinionated. |
134 | 0 | if framed_message.flags.map_or(false, |f| f == 3) { Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs1_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs1_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs1_0Bb_ |
135 | 0 | if matches!(self.local_write_state, LocalWriteState::Open) { |
136 | 0 | return Err(Error::FinAckWithoutFin); |
137 | 0 | } |
138 | 0 | self.local_write_state = LocalWriteState::FinAcked; |
139 | 0 | } |
140 | | |
141 | | // Process the `FIN` flag sent by the remote. |
142 | 0 | if matches!(self.remote_write_state, RemoteWriteState::Open) |
143 | 0 | && framed_message.flags.map_or(false, |f| f == 0) Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs2_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs2_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs2_0Bb_ |
144 | 0 | { |
145 | 0 | self.remote_write_state = RemoteWriteState::Closed; |
146 | 0 | } |
147 | | |
148 | | // Note that any `STOP_SENDING` flag sent by the remote is ignored. |
149 | | |
150 | | // Copy the message of the remote out from the incoming buffer. |
151 | 0 | if let Some(message) = framed_message.message { |
152 | 0 | self.receive_buffer.extend_from_slice(message); |
153 | 0 | } |
154 | | |
155 | | // Number of bytes to discard is the size of the protobuf frame. |
156 | 0 | outer_read_write.incoming_buffer.len() - rest.len() |
157 | | } |
158 | 0 | Err(nom::Err::Incomplete(needed)) => { |
159 | | // Not enough data in the incoming buffer for a full frame. Requesting |
160 | | // more. |
161 | 0 | let Some(expected_incoming_bytes) = |
162 | 0 | &mut outer_read_write.expected_incoming_bytes |
163 | | else { |
164 | | // TODO: is this correct anyway? substreams are never supposed to close? |
165 | 0 | return Err(Error::EofIncompleteFrame); |
166 | | }; |
167 | 0 | *expected_incoming_bytes = outer_read_write.incoming_buffer.len() |
168 | 0 | + match needed { |
169 | 0 | nom::Needed::Size(s) => s.get(), |
170 | 0 | nom::Needed::Unknown => 1, |
171 | | }; |
172 | 0 | break; |
173 | | } |
174 | | Err(_) => { |
175 | | // Frame decoding error. |
176 | 0 | return Err(Error::InvalidFrame); |
177 | | } |
178 | | } |
179 | | }; |
180 | | |
181 | | // Discard the frame data. |
182 | 0 | let _extract_result = outer_read_write.incoming_bytes_take(bytes_to_discard); |
183 | 0 | debug_assert!(matches!(_extract_result, Ok(Some(_)))); |
184 | | } |
185 | | |
186 | | Ok(InnerReadWrite { |
187 | | inner_read_write: ReadWrite { |
188 | 0 | now: outer_read_write.now.clone(), |
189 | 0 | incoming_buffer: mem::take(&mut self.receive_buffer), |
190 | | read_bytes: 0, |
191 | 0 | expected_incoming_bytes: if matches!( |
192 | 0 | self.remote_write_state, |
193 | | RemoteWriteState::Open |
194 | | ) { |
195 | 0 | Some(0) |
196 | | } else { |
197 | 0 | None |
198 | | }, |
199 | 0 | write_buffers: Vec::new(), |
200 | | write_bytes_queued: 0, |
201 | 0 | write_bytes_queueable: if matches!(self.local_write_state, LocalWriteState::Open) { |
202 | 0 | outer_read_write |
203 | 0 | .write_bytes_queueable |
204 | 0 | .map(|outer_writable| { |
205 | 0 | cmp::min( |
206 | 0 | // TODO: what if the outer maximum queueable is <= PROTOBUF_FRAME_MAX_LEN? this will never happen in practice, but in theory it could |
207 | 0 | outer_writable.saturating_sub(PROTOBUF_FRAME_MAX_LEN), |
208 | 0 | MAX_PROTOBUF_MESSAGE_LEN - PROTOBUF_FRAME_MAX_LEN, |
209 | 0 | ) |
210 | 0 | }) Unexecuted instantiation: _RNCINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs3_0Bb_ Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationEs3_0CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNCINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_13WebRtcFraming10read_writepEs3_0Bb_ |
211 | | } else { |
212 | 0 | None |
213 | | }, |
214 | 0 | wake_up_after: outer_read_write.wake_up_after.clone(), |
215 | 0 | }, |
216 | 0 | framing: self, |
217 | 0 | outer_read_write, |
218 | | }) |
219 | 0 | } Unexecuted instantiation: _RINvMNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB3_13WebRtcFraming10read_writepEB9_ Unexecuted instantiation: _RINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB3_13WebRtcFraming10read_writeNtNtCsaYZPK01V26L_4core4time8DurationECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB3_13WebRtcFraming10read_writepEB9_ |
220 | | } |
221 | | |
222 | | impl fmt::Debug for WebRtcFraming { |
223 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
224 | 0 | f.debug_struct("WebRtcFraming").finish() |
225 | 0 | } Unexecuted instantiation: _RNvXs_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB4_13WebRtcFramingNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt Unexecuted instantiation: _RNvXs_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB4_13WebRtcFramingNtNtCsaYZPK01V26L_4core3fmt5Debug3fmt |
226 | | } |
227 | | |
228 | | /// Stream of data without the frames. See [`WebRtcFraming::read_write`]. |
229 | | pub struct InnerReadWrite<'a, TNow: Clone> { |
230 | | framing: &'a mut WebRtcFraming, |
231 | | outer_read_write: &'a mut ReadWrite<TNow>, |
232 | | inner_read_write: ReadWrite<TNow>, |
233 | | } |
234 | | |
235 | | impl<'a, TNow: Clone> ops::Deref for InnerReadWrite<'a, TNow> { |
236 | | type Target = ReadWrite<TNow>; |
237 | | |
238 | 0 | fn deref(&self) -> &Self::Target { |
239 | 0 | &self.inner_read_write |
240 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framings0_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefBb_ Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framings0_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefBb_ |
241 | | } |
242 | | |
243 | | impl<'a, TNow: Clone> ops::DerefMut for InnerReadWrite<'a, TNow> { |
244 | 0 | fn deref_mut(&mut self) -> &mut Self::Target { |
245 | 0 | &mut self.inner_read_write |
246 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framings1_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutBb_ Unexecuted instantiation: _RNvXs1_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingINtB5_14InnerReadWriteNtNtCsaYZPK01V26L_4core4time8DurationENtNtNtB1w_3ops5deref8DerefMut9deref_mutCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framings1_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutBb_ |
247 | | } |
248 | | |
249 | | impl<'a, TNow: Clone> Drop for InnerReadWrite<'a, TNow> { |
250 | 0 | fn drop(&mut self) { |
251 | 0 | // It is possible that the inner stream processes some bytes of `self.receive_buffer` |
252 | 0 | // and expects to be called again while no bytes was pulled from the outer `ReadWrite`. |
253 | 0 | // If that happens, the API user will not call `read_write` again and we will have a stall. |
254 | 0 | // For this reason, if the inner stream has read some bytes, we make sure that the outer |
255 | 0 | // `ReadWrite` wakes up as soon as possible. |
256 | 0 | // Additionally, we also do a first dummy substream processing without reading anything, |
257 | 0 | // in order to populate `inner_stream_expected_incoming_bytes`. If this is the case, we |
258 | 0 | // also immediately wake up again. |
259 | 0 | if self.framing.inner_stream_expected_incoming_bytes.is_none() |
260 | 0 | || self.inner_read_write.read_bytes != 0 |
261 | 0 | { |
262 | 0 | self.outer_read_write.wake_up_asap(); |
263 | 0 | } |
264 | | |
265 | | // Updating the timer and reading side of things. |
266 | 0 | self.outer_read_write.wake_up_after = self.inner_read_write.wake_up_after.clone(); |
267 | 0 | self.framing.receive_buffer = mem::take(&mut self.inner_read_write.incoming_buffer); |
268 | 0 | self.framing.inner_stream_expected_incoming_bytes = |
269 | 0 | Some(self.inner_read_write.expected_incoming_bytes.unwrap_or(0)); |
270 | 0 | if let Some(expected_incoming_bytes) = &mut self.outer_read_write.expected_incoming_bytes { |
271 | 0 | *expected_incoming_bytes = cmp::max( |
272 | 0 | *expected_incoming_bytes, |
273 | 0 | self.inner_read_write.expected_incoming_bytes.unwrap_or(0) + PROTOBUF_FRAME_MIN_LEN, |
274 | 0 | ); |
275 | 0 | } |
276 | | |
277 | | // Update the local state, and figure out the flag (if any) that we want to send out to |
278 | | // the remote. |
279 | | // Note that we never send the `RESET_STREAM` flag. It is unclear to me what purpose this |
280 | | // flag serves compares to simply closing the substream. |
281 | | // We also never send `STOP_SENDING`, as it doesn't fit in our API. |
282 | 0 | let flag_to_send_out: Option<u32> = |
283 | 0 | if matches!(self.framing.local_write_state, LocalWriteState::Open) |
284 | 0 | && self.inner_read_write.write_bytes_queueable.is_none() |
285 | | { |
286 | 0 | self.framing.local_write_state = LocalWriteState::FinBuffered; |
287 | 0 | Some(0) |
288 | 0 | } else if matches!(self.framing.remote_write_state, RemoteWriteState::Closed) { |
289 | | // `FIN_ACK` |
290 | 0 | self.framing.remote_write_state = RemoteWriteState::ClosedAckBuffered; |
291 | 0 | Some(3) |
292 | | } else { |
293 | 0 | None |
294 | | }; |
295 | | |
296 | | // Write out a message only if there is anything to write. |
297 | | // TODO: consider buffering data more before flushing, to reduce the overhead of the protobuf frame? |
298 | 0 | if flag_to_send_out.is_some() || self.inner_read_write.write_bytes_queued != 0 { |
299 | | // Reserve some space in `write_buffers` to later write the message length prefix. |
300 | 0 | let message_length_prefix_index = self.outer_read_write.write_buffers.len(); |
301 | 0 | self.outer_read_write |
302 | 0 | .write_buffers |
303 | 0 | .push(Vec::with_capacity(4)); |
304 | 0 |
|
305 | 0 | // Total number of bytes written below, excluding the length prefix. |
306 | 0 | let mut length_prefix_value = 0; |
307 | | |
308 | | // Write the flags, if any. |
309 | 0 | if let Some(flag_to_send_out) = flag_to_send_out { |
310 | 0 | for buffer in protobuf::uint32_tag_encode(1, flag_to_send_out) { |
311 | 0 | let buffer = buffer.as_ref(); |
312 | 0 | length_prefix_value += buffer.len(); |
313 | 0 | self.outer_read_write.write_buffers.push(buffer.to_owned()); |
314 | 0 | } |
315 | 0 | } |
316 | | |
317 | | // Write the data. This consists in a protobuf tag, a length, and the data itself. |
318 | 0 | let data_protobuf_tag = protobuf::tag_encode(2, 2).collect::<Vec<_>>(); |
319 | 0 | length_prefix_value += data_protobuf_tag.len(); |
320 | 0 | self.outer_read_write.write_buffers.push(data_protobuf_tag); |
321 | 0 | let data_len = |
322 | 0 | leb128::encode_usize(self.inner_read_write.write_bytes_queued).collect::<Vec<_>>(); |
323 | 0 | length_prefix_value += data_len.len(); |
324 | 0 | self.outer_read_write.write_buffers.push(data_len); |
325 | 0 | length_prefix_value += self.inner_read_write.write_bytes_queued; |
326 | 0 | self.outer_read_write |
327 | 0 | .write_buffers |
328 | 0 | .extend(mem::take(&mut self.inner_read_write.write_buffers)); |
329 | 0 |
|
330 | 0 | // Now write the length prefix. |
331 | 0 | let length_prefix = leb128::encode_usize(length_prefix_value).collect::<Vec<_>>(); |
332 | 0 | let total_length = length_prefix_value + length_prefix.len(); |
333 | 0 | self.outer_read_write.write_buffers[message_length_prefix_index] = length_prefix; |
334 | 0 |
|
335 | 0 | // Properly update the outer `ReadWrite`. |
336 | 0 | self.outer_read_write.write_bytes_queued += total_length; |
337 | 0 | *self |
338 | 0 | .outer_read_write |
339 | 0 | .write_bytes_queueable |
340 | 0 | .as_mut() |
341 | 0 | .unwrap() -= total_length; |
342 | 0 | } |
343 | 0 | } Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framings2_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_ Unexecuted instantiation: _RNvXs2_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingINtB5_14InnerReadWriteNtNtCsaYZPK01V26L_4core4time8DurationENtNtNtB1w_3ops4drop4Drop4dropCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framings2_0pEINtB5_14InnerReadWritepENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_ |
344 | | } |
345 | | |
346 | | /// Error while decoding data. |
347 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXs4_NtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection14webrtc_framingNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs4_NtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection14webrtc_framingNtB5_5ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
348 | | pub enum Error { |
349 | | /// The remote wants to reset the substream. This is a normal situation. |
350 | | RemoteResetDesired, |
351 | | /// Failed to decode the protobuf header. |
352 | | InvalidFrame, |
353 | | /// Remote has sent data after having sent a `FIN` flag in the past. |
354 | | DataAfterFin, |
355 | | /// Outer substream has closed in the middle of a frame. |
356 | | EofIncompleteFrame, |
357 | | /// Received a `FIN_ACK` flag without having sent a `FIN` flag. |
358 | | FinAckWithoutFin, |
359 | | } |