/__w/smoldot/smoldot/repo/lib/src/libp2p/read_write.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use core::{cmp, mem}; |
19 | | |
20 | | use alloc::{collections::VecDeque, vec::Vec}; |
21 | | |
22 | | // TODO: documentation |
23 | | |
24 | | #[must_use] |
25 | | pub struct ReadWrite<TNow> { |
26 | | pub now: TNow, |
27 | | |
28 | | /// Buffer of socket data ready to be processed. |
29 | | pub incoming_buffer: Vec<u8>, |
30 | | |
31 | | /// Number of bytes that [`ReadWrite::incoming_buffer`] should contain. `None` if the remote |
32 | | /// has closed their reading side of the socket. |
33 | | pub expected_incoming_bytes: Option<usize>, |
34 | | |
35 | | /// Total number of bytes that have been read from [`ReadWrite::incoming_buffer`]. |
36 | | /// |
37 | | /// [`ReadWrite::incoming_buffer`] must have been advanced after these bytes. |
38 | | // TODO: is this field actually useful? |
39 | | pub read_bytes: usize, |
40 | | |
41 | | /// List of buffers containing data to the written out. The consumer of the [`ReadWrite`] is |
42 | | /// expected to add buffers. |
43 | | // TODO: consider changing the inner `Vec` to `Box<dyn AsRef<[u8]>>` |
44 | | pub write_buffers: Vec<Vec<u8>>, |
45 | | |
46 | | /// Amount of data already queued, both outside and including [`ReadWrite::write_buffers`]. |
47 | | // TODO: is this field actually useful? |
48 | | pub write_bytes_queued: usize, |
49 | | |
50 | | /// Number of additional bytes that are allowed to be pushed to [`ReadWrite::write_buffers`]. |
51 | | /// `None` if the writing side of the stream is closed. |
52 | | pub write_bytes_queueable: Option<usize>, |
53 | | |
54 | | /// If `Some`, the socket must be waken up after the given `TNow` is reached. |
55 | | pub wake_up_after: Option<TNow>, |
56 | | } |
57 | | |
58 | | impl<TNow> ReadWrite<TNow> { |
59 | | /// Returns true if the connection should be considered dead. That is, both |
60 | | /// [`ReadWrite::expected_incoming_bytes`] is `None` and [`ReadWrite::write_bytes_queueable`] |
61 | | /// is `None`. |
62 | 3 | pub fn is_dead(&self) -> bool { |
63 | 3 | self.expected_incoming_bytes.is_none() && self.write_bytes_queueable.is_none()1 |
64 | 3 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE7is_deadB6_ Line | Count | Source | 62 | 3 | pub fn is_dead(&self) -> bool { | 63 | 3 | self.expected_incoming_bytes.is_none() && self.write_bytes_queueable.is_none()1 | 64 | 3 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE7is_deadCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE7is_deadB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE7is_deadCsiUjFBJteJ7x_17smoldot_full_node |
65 | | |
66 | | /// Sets the writing side of the connection to closed. |
67 | | /// |
68 | | /// This is simply a shortcut for setting [`ReadWrite::write_bytes_queueable`] to `None`. |
69 | 15 | pub fn close_write(&mut self) { |
70 | 15 | self.write_bytes_queueable = None; |
71 | 15 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE11close_writeB6_ Line | Count | Source | 69 | 15 | pub fn close_write(&mut self) { | 70 | 15 | self.write_bytes_queueable = None; | 71 | 15 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE11close_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE11close_writeB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE11close_writeCsiUjFBJteJ7x_17smoldot_full_node |
72 | | |
73 | | /// Returns the size of the data available in the incoming buffer. |
74 | 0 | pub fn incoming_buffer_available(&self) -> usize { |
75 | 0 | self.incoming_buffer.len() |
76 | 0 | } Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritepE25incoming_buffer_availableB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE25incoming_buffer_availableB6_ |
77 | | |
78 | | /// Discards all the incoming data. Updates [`ReadWrite::read_bytes`] and decreases |
79 | | /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes. |
80 | 6 | pub fn discard_all_incoming(&mut self) { |
81 | 6 | self.read_bytes += self.incoming_buffer.len(); |
82 | 6 | if let Some(expected_incoming_bytes5 ) = &mut self.expected_incoming_bytes { |
83 | 5 | *expected_incoming_bytes = |
84 | 5 | expected_incoming_bytes.saturating_sub(self.incoming_buffer.len()); |
85 | 5 | }1 |
86 | 6 | self.incoming_buffer.clear(); |
87 | 6 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20discard_all_incomingB6_ Line | Count | Source | 80 | 6 | pub fn discard_all_incoming(&mut self) { | 81 | 6 | self.read_bytes += self.incoming_buffer.len(); | 82 | 6 | if let Some(expected_incoming_bytes5 ) = &mut self.expected_incoming_bytes { | 83 | 5 | *expected_incoming_bytes = | 84 | 5 | expected_incoming_bytes.saturating_sub(self.incoming_buffer.len()); | 85 | 5 | }1 | 86 | 6 | self.incoming_buffer.clear(); | 87 | 6 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20discard_all_incomingCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE20discard_all_incomingB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE20discard_all_incomingCsiUjFBJteJ7x_17smoldot_full_node |
88 | | |
89 | | /// Extract a certain number of bytes from the read buffer. |
90 | | /// |
91 | | /// On success, updates [`ReadWrite::read_bytes`] and decreases |
92 | | /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes. |
93 | | /// |
94 | | /// If not enough bytes are available, returns `None` and sets |
95 | | /// [`ReadWrite::expected_incoming_bytes`] to the requested number of bytes. |
96 | 1.19k | pub fn incoming_bytes_take( |
97 | 1.19k | &mut self, |
98 | 1.19k | num: usize, |
99 | 1.19k | ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> { |
100 | 1.19k | if self.incoming_buffer.len() < num { |
101 | 481 | if let Some(expected_incoming_bytes478 ) = self.expected_incoming_bytes.as_mut() { |
102 | 478 | *expected_incoming_bytes = num; |
103 | 478 | return Ok(None); |
104 | | } else { |
105 | 3 | return Err(IncomingBytesTakeError::ReadClosed); |
106 | | } |
107 | 712 | } |
108 | 712 | |
109 | 712 | self.read_bytes += num; |
110 | | |
111 | 712 | if let Some(expected_incoming_bytes709 ) = self.expected_incoming_bytes.as_mut() { |
112 | 709 | *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num); |
113 | 709 | }3 |
114 | | |
115 | 712 | if self.incoming_buffer.len() == num { |
116 | 314 | Ok(Some(mem::take(&mut self.incoming_buffer))) |
117 | 398 | } else if self.incoming_buffer.len() - num < num.saturating_mul(2) { |
118 | 108 | let remains = self.incoming_buffer.split_at(num).1.to_vec(); |
119 | 108 | self.incoming_buffer.truncate(num); |
120 | 108 | Ok(Some(mem::replace(&mut self.incoming_buffer, remains))) |
121 | | } else { |
122 | 290 | let to_ret = self.incoming_buffer.split_at(num).0.to_vec(); |
123 | 290 | self.incoming_buffer.copy_within(num.., 0); |
124 | 290 | self.incoming_buffer |
125 | 290 | .truncate(self.incoming_buffer.len() - num); |
126 | 290 | Ok(Some(to_ret)) |
127 | | } |
128 | 1.19k | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE19incoming_bytes_takeB6_ Line | Count | Source | 96 | 1.10k | pub fn incoming_bytes_take( | 97 | 1.10k | &mut self, | 98 | 1.10k | num: usize, | 99 | 1.10k | ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> { | 100 | 1.10k | if self.incoming_buffer.len() < num { | 101 | 452 | if let Some(expected_incoming_bytes451 ) = self.expected_incoming_bytes.as_mut() { | 102 | 451 | *expected_incoming_bytes = num; | 103 | 451 | return Ok(None); | 104 | | } else { | 105 | 1 | return Err(IncomingBytesTakeError::ReadClosed); | 106 | | } | 107 | 649 | } | 108 | 649 | | 109 | 649 | self.read_bytes += num; | 110 | | | 111 | 649 | if let Some(expected_incoming_bytes647 ) = self.expected_incoming_bytes.as_mut() { | 112 | 647 | *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num); | 113 | 647 | }2 | 114 | | | 115 | 649 | if self.incoming_buffer.len() == num { | 116 | 275 | Ok(Some(mem::take(&mut self.incoming_buffer))) | 117 | 374 | } else if self.incoming_buffer.len() - num < num.saturating_mul(2) { | 118 | 100 | let remains = self.incoming_buffer.split_at(num).1.to_vec(); | 119 | 100 | self.incoming_buffer.truncate(num); | 120 | 100 | Ok(Some(mem::replace(&mut self.incoming_buffer, remains))) | 121 | | } else { | 122 | 274 | let to_ret = self.incoming_buffer.split_at(num).0.to_vec(); | 123 | 274 | self.incoming_buffer.copy_within(num.., 0); | 124 | 274 | self.incoming_buffer | 125 | 274 | .truncate(self.incoming_buffer.len() - num); | 126 | 274 | Ok(Some(to_ret)) | 127 | | } | 128 | 1.10k | } |
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE19incoming_bytes_takeB6_ Line | Count | Source | 96 | 92 | pub fn incoming_bytes_take( | 97 | 92 | &mut self, | 98 | 92 | num: usize, | 99 | 92 | ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> { | 100 | 92 | if self.incoming_buffer.len() < num { | 101 | 29 | if let Some(expected_incoming_bytes27 ) = self.expected_incoming_bytes.as_mut() { | 102 | 27 | *expected_incoming_bytes = num; | 103 | 27 | return Ok(None); | 104 | | } else { | 105 | 2 | return Err(IncomingBytesTakeError::ReadClosed); | 106 | | } | 107 | 63 | } | 108 | 63 | | 109 | 63 | self.read_bytes += num; | 110 | | | 111 | 63 | if let Some(expected_incoming_bytes62 ) = self.expected_incoming_bytes.as_mut() { | 112 | 62 | *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num); | 113 | 62 | }1 | 114 | | | 115 | 63 | if self.incoming_buffer.len() == num { | 116 | 39 | Ok(Some(mem::take(&mut self.incoming_buffer))) | 117 | 24 | } else if self.incoming_buffer.len() - num < num.saturating_mul(2) { | 118 | 8 | let remains = self.incoming_buffer.split_at(num).1.to_vec(); | 119 | 8 | self.incoming_buffer.truncate(num); | 120 | 8 | Ok(Some(mem::replace(&mut self.incoming_buffer, remains))) | 121 | | } else { | 122 | 16 | let to_ret = self.incoming_buffer.split_at(num).0.to_vec(); | 123 | 16 | self.incoming_buffer.copy_within(num.., 0); | 124 | 16 | self.incoming_buffer | 125 | 16 | .truncate(self.incoming_buffer.len() - num); | 126 | 16 | Ok(Some(to_ret)) | 127 | | } | 128 | 92 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE19incoming_bytes_takeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE19incoming_bytes_takeB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE19incoming_bytes_takeCsiUjFBJteJ7x_17smoldot_full_node |
129 | | |
130 | | /// Same as [`ReadWrite::incoming_bytes_take_array`], but reads a number of bytes as a |
131 | | /// compile-time constant. |
132 | 468 | pub fn incoming_bytes_take_array<const N: usize>( |
133 | 468 | &mut self, |
134 | 468 | ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> { |
135 | 468 | let Some(vec158 ) = self.incoming_bytes_take(N)?0 else { |
136 | 310 | return Ok(None); |
137 | | }; |
138 | | |
139 | 158 | let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap(); |
140 | 158 | Ok(Some(*bytes)) |
141 | 468 | } _RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKj2_EB7_ Line | Count | Source | 132 | 244 | pub fn incoming_bytes_take_array<const N: usize>( | 133 | 244 | &mut self, | 134 | 244 | ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> { | 135 | 244 | let Some(vec88 ) = self.incoming_bytes_take(N)?0 else { | 136 | 156 | return Ok(None); | 137 | | }; | 138 | | | 139 | 88 | let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap(); | 140 | 88 | Ok(Some(*bytes)) | 141 | 244 | } |
_RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKjc_EB7_ Line | Count | Source | 132 | 219 | pub fn incoming_bytes_take_array<const N: usize>( | 133 | 219 | &mut self, | 134 | 219 | ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> { | 135 | 219 | let Some(vec67 ) = self.incoming_bytes_take(N)?0 else { | 136 | 152 | return Ok(None); | 137 | | }; | 138 | | | 139 | 67 | let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap(); | 140 | 67 | Ok(Some(*bytes)) | 141 | 219 | } |
_RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWritelE25incoming_bytes_take_arrayKj2_EB7_ Line | Count | Source | 132 | 5 | pub fn incoming_bytes_take_array<const N: usize>( | 133 | 5 | &mut self, | 134 | 5 | ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> { | 135 | 5 | let Some(vec3 ) = self.incoming_bytes_take(N)?0 else { | 136 | 2 | return Ok(None); | 137 | | }; | 138 | | | 139 | 3 | let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap(); | 140 | 3 | Ok(Some(*bytes)) | 141 | 5 | } |
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKj2_ECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKjc_ECsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWritepE25incoming_bytes_take_arrayKpEB7_ Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE25incoming_bytes_take_arrayKj2_ECsiUjFBJteJ7x_17smoldot_full_node Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE25incoming_bytes_take_arrayKjc_ECsiUjFBJteJ7x_17smoldot_full_node |
142 | | |
143 | | /// Extract an LEB128-encoded number from the start of the read buffer. |
144 | | /// |
145 | | /// On success, updates [`ReadWrite::read_bytes`] and decreases |
146 | | /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes. |
147 | | /// |
148 | | /// If not enough bytes are available, returns `None` and sets |
149 | | /// [`ReadWrite::expected_incoming_bytes`] to the required number of bytes. |
150 | | /// |
151 | | /// Must be passed the maximum value that this function can return on success. An error is |
152 | | /// returned if the value sent by the remote is higher than this maximum. This parameter, |
153 | | /// while not strictly necessary, is here for safety, as it is easy to forget to check the |
154 | | /// value against a maximum. |
155 | 343 | pub fn incoming_bytes_take_leb128( |
156 | 343 | &mut self, |
157 | 343 | max_decoded_number: usize, |
158 | 343 | ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> { |
159 | 343 | match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>( |
160 | 343 | &self.incoming_buffer, |
161 | | ) { |
162 | 164 | Ok((rest, num)) => { |
163 | 164 | if num > max_decoded_number { |
164 | | // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte |
165 | 0 | return Err(IncomingBytesTakeLeb128Error::TooLarge); |
166 | 164 | } |
167 | 164 | |
168 | 164 | let consumed_bytes = self.incoming_buffer.len() - rest.len(); |
169 | 164 | if !rest.is_empty() { |
170 | 83 | self.incoming_buffer.copy_within(consumed_bytes.., 0); |
171 | 83 | self.incoming_buffer |
172 | 83 | .truncate(self.incoming_buffer.len() - consumed_bytes); |
173 | 83 | } else { |
174 | 81 | self.incoming_buffer.clear(); |
175 | 81 | } |
176 | 164 | self.read_bytes += consumed_bytes; |
177 | 164 | if let Some(expected_incoming_bytes161 ) = self.expected_incoming_bytes.as_mut() { |
178 | 161 | *expected_incoming_bytes = |
179 | 161 | expected_incoming_bytes.saturating_sub(consumed_bytes); |
180 | 161 | }3 |
181 | 164 | Ok(Some(num)) |
182 | | } |
183 | 0 | Err(nom::Err::Incomplete(nom::Needed::Size(num))) => { |
184 | 0 | if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() { |
185 | 0 | *expected_incoming_bytes = self.incoming_buffer.len() + num.get(); |
186 | 0 | } else { |
187 | 0 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); |
188 | | } |
189 | 0 | Ok(None) |
190 | | } |
191 | | Err(nom::Err::Incomplete(nom::Needed::Unknown)) => { |
192 | 179 | if let Some(expected_incoming_bytes178 ) = self.expected_incoming_bytes.as_mut() { |
193 | 178 | *expected_incoming_bytes = self.incoming_buffer.len() + 1; |
194 | 178 | } else { |
195 | 1 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); |
196 | | } |
197 | 178 | Ok(None) |
198 | | } |
199 | 0 | Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128), |
200 | | } |
201 | 343 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE26incoming_bytes_take_leb128B6_ Line | Count | Source | 155 | 300 | pub fn incoming_bytes_take_leb128( | 156 | 300 | &mut self, | 157 | 300 | max_decoded_number: usize, | 158 | 300 | ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> { | 159 | 300 | match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>( | 160 | 300 | &self.incoming_buffer, | 161 | | ) { | 162 | 140 | Ok((rest, num)) => { | 163 | 140 | if num > max_decoded_number { | 164 | | // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte | 165 | 0 | return Err(IncomingBytesTakeLeb128Error::TooLarge); | 166 | 140 | } | 167 | 140 | | 168 | 140 | let consumed_bytes = self.incoming_buffer.len() - rest.len(); | 169 | 140 | if !rest.is_empty() { | 170 | 63 | self.incoming_buffer.copy_within(consumed_bytes.., 0); | 171 | 63 | self.incoming_buffer | 172 | 63 | .truncate(self.incoming_buffer.len() - consumed_bytes); | 173 | 77 | } else { | 174 | 77 | self.incoming_buffer.clear(); | 175 | 77 | } | 176 | 140 | self.read_bytes += consumed_bytes; | 177 | 140 | if let Some(expected_incoming_bytes137 ) = self.expected_incoming_bytes.as_mut() { | 178 | 137 | *expected_incoming_bytes = | 179 | 137 | expected_incoming_bytes.saturating_sub(consumed_bytes); | 180 | 137 | }3 | 181 | 140 | Ok(Some(num)) | 182 | | } | 183 | 0 | Err(nom::Err::Incomplete(nom::Needed::Size(num))) => { | 184 | 0 | if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() { | 185 | 0 | *expected_incoming_bytes = self.incoming_buffer.len() + num.get(); | 186 | 0 | } else { | 187 | 0 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); | 188 | | } | 189 | 0 | Ok(None) | 190 | | } | 191 | | Err(nom::Err::Incomplete(nom::Needed::Unknown)) => { | 192 | 160 | if let Some(expected_incoming_bytes159 ) = self.expected_incoming_bytes.as_mut() { | 193 | 159 | *expected_incoming_bytes = self.incoming_buffer.len() + 1; | 194 | 159 | } else { | 195 | 1 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); | 196 | | } | 197 | 159 | Ok(None) | 198 | | } | 199 | 0 | Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128), | 200 | | } | 201 | 300 | } |
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE26incoming_bytes_take_leb128B6_ Line | Count | Source | 155 | 43 | pub fn incoming_bytes_take_leb128( | 156 | 43 | &mut self, | 157 | 43 | max_decoded_number: usize, | 158 | 43 | ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> { | 159 | 43 | match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>( | 160 | 43 | &self.incoming_buffer, | 161 | | ) { | 162 | 24 | Ok((rest, num)) => { | 163 | 24 | if num > max_decoded_number { | 164 | | // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte | 165 | 0 | return Err(IncomingBytesTakeLeb128Error::TooLarge); | 166 | 24 | } | 167 | 24 | | 168 | 24 | let consumed_bytes = self.incoming_buffer.len() - rest.len(); | 169 | 24 | if !rest.is_empty() { | 170 | 20 | self.incoming_buffer.copy_within(consumed_bytes.., 0); | 171 | 20 | self.incoming_buffer | 172 | 20 | .truncate(self.incoming_buffer.len() - consumed_bytes); | 173 | 20 | } else { | 174 | 4 | self.incoming_buffer.clear(); | 175 | 4 | } | 176 | 24 | self.read_bytes += consumed_bytes; | 177 | 24 | if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() { | 178 | 24 | *expected_incoming_bytes = | 179 | 24 | expected_incoming_bytes.saturating_sub(consumed_bytes); | 180 | 24 | }0 | 181 | 24 | Ok(Some(num)) | 182 | | } | 183 | 0 | Err(nom::Err::Incomplete(nom::Needed::Size(num))) => { | 184 | 0 | if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() { | 185 | 0 | *expected_incoming_bytes = self.incoming_buffer.len() + num.get(); | 186 | 0 | } else { | 187 | 0 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); | 188 | | } | 189 | 0 | Ok(None) | 190 | | } | 191 | | Err(nom::Err::Incomplete(nom::Needed::Unknown)) => { | 192 | 19 | if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() { | 193 | 19 | *expected_incoming_bytes = self.incoming_buffer.len() + 1; | 194 | 19 | } else { | 195 | 0 | return Err(IncomingBytesTakeLeb128Error::ReadClosed); | 196 | | } | 197 | 19 | Ok(None) | 198 | | } | 199 | 0 | Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128), | 200 | | } | 201 | 43 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE26incoming_bytes_take_leb128CsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE26incoming_bytes_take_leb128B6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE26incoming_bytes_take_leb128CsiUjFBJteJ7x_17smoldot_full_node |
202 | | |
203 | | /// Copies as much as possible from the content of `data` to [`ReadWrite::write_buffers`] |
204 | | /// and updates [`ReadWrite::write_bytes_queued`] and [`ReadWrite::write_bytes_queueable`]. |
205 | | /// The bytes that have been written are removed from `data`. |
206 | | /// |
207 | | /// This function is recommended only if the `Vec` is small. |
208 | 0 | pub fn write_from_vec(&mut self, data: &mut Vec<u8>) { |
209 | 0 | let Some(queueable) = self.write_bytes_queueable.as_mut() else { |
210 | 0 | return; |
211 | | }; |
212 | | |
213 | 0 | let to_copy = cmp::min(data.len(), *queueable); |
214 | 0 | if to_copy == 0 { |
215 | 0 | return; |
216 | 0 | } |
217 | 0 |
|
218 | 0 | if to_copy == data.len() { |
219 | 0 | self.write_buffers.push(mem::take(data)); |
220 | 0 | } else { |
221 | 0 | self.write_buffers.push(data[..to_copy].to_vec()); |
222 | 0 | data.copy_within(to_copy.., 0); |
223 | 0 | data.truncate(data.len() - to_copy); |
224 | 0 | } |
225 | | |
226 | 0 | self.write_bytes_queued += to_copy; |
227 | 0 | *queueable -= to_copy; |
228 | 0 | } Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE14write_from_vecB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE14write_from_vecCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE14write_from_vecB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE14write_from_vecCsiUjFBJteJ7x_17smoldot_full_node |
229 | | |
230 | | /// Copies as much as possible from the content of `data` to [`ReadWrite::write_buffers`] |
231 | | /// and updates [`ReadWrite::write_bytes_queued`] and [`ReadWrite::write_bytes_queueable`]. |
232 | | /// The bytes that have been written are removed from `data`. |
233 | 849 | pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) { |
234 | 849 | let (slice1, slice2) = data.as_slices(); |
235 | 849 | |
236 | 849 | let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0)); |
237 | 849 | let to_copy2 = if to_copy1 == slice1.len() { |
238 | 830 | cmp::min( |
239 | 830 | slice2.len(), |
240 | 830 | self.write_bytes_queueable.unwrap_or(0) - to_copy1, |
241 | 830 | ) |
242 | | } else { |
243 | 19 | 0 |
244 | | }; |
245 | | |
246 | 849 | let total_tocopy = to_copy1 + to_copy2; |
247 | 849 | |
248 | 849 | if total_tocopy == 0 { |
249 | 670 | return; |
250 | 179 | } |
251 | 179 | |
252 | 179 | self.write_buffers.push(slice1[..to_copy1].to_vec()); |
253 | 179 | self.write_buffers.push(slice2[..to_copy2].to_vec()); |
254 | 179 | |
255 | 179 | self.write_bytes_queued += total_tocopy; |
256 | 179 | *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy; |
257 | 179 | |
258 | 7.38k | for _ in 0..total_tocopy { |
259 | 7.38k | data.pop_front(); |
260 | 7.38k | } |
261 | 849 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20write_from_vec_dequeB6_ Line | Count | Source | 233 | 701 | pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) { | 234 | 701 | let (slice1, slice2) = data.as_slices(); | 235 | 701 | | 236 | 701 | let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0)); | 237 | 701 | let to_copy2 = if to_copy1 == slice1.len() { | 238 | 701 | cmp::min( | 239 | 701 | slice2.len(), | 240 | 701 | self.write_bytes_queueable.unwrap_or(0) - to_copy1, | 241 | 701 | ) | 242 | | } else { | 243 | 0 | 0 | 244 | | }; | 245 | | | 246 | 701 | let total_tocopy = to_copy1 + to_copy2; | 247 | 701 | | 248 | 701 | if total_tocopy == 0 { | 249 | 571 | return; | 250 | 130 | } | 251 | 130 | | 252 | 130 | self.write_buffers.push(slice1[..to_copy1].to_vec()); | 253 | 130 | self.write_buffers.push(slice2[..to_copy2].to_vec()); | 254 | 130 | | 255 | 130 | self.write_bytes_queued += total_tocopy; | 256 | 130 | *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy; | 257 | 130 | | 258 | 5.01k | for _ in 0..total_tocopy { | 259 | 5.01k | data.pop_front(); | 260 | 5.01k | } | 261 | 701 | } |
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE20write_from_vec_dequeB6_ Line | Count | Source | 233 | 148 | pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) { | 234 | 148 | let (slice1, slice2) = data.as_slices(); | 235 | 148 | | 236 | 148 | let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0)); | 237 | 148 | let to_copy2 = if to_copy1 == slice1.len() { | 238 | 129 | cmp::min( | 239 | 129 | slice2.len(), | 240 | 129 | self.write_bytes_queueable.unwrap_or(0) - to_copy1, | 241 | 129 | ) | 242 | | } else { | 243 | 19 | 0 | 244 | | }; | 245 | | | 246 | 148 | let total_tocopy = to_copy1 + to_copy2; | 247 | 148 | | 248 | 148 | if total_tocopy == 0 { | 249 | 99 | return; | 250 | 49 | } | 251 | 49 | | 252 | 49 | self.write_buffers.push(slice1[..to_copy1].to_vec()); | 253 | 49 | self.write_buffers.push(slice2[..to_copy2].to_vec()); | 254 | 49 | | 255 | 49 | self.write_bytes_queued += total_tocopy; | 256 | 49 | *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy; | 257 | 49 | | 258 | 2.37k | for _ in 0..total_tocopy { | 259 | 2.37k | data.pop_front(); | 260 | 2.37k | } | 261 | 148 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20write_from_vec_dequeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE20write_from_vec_dequeB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE20write_from_vec_dequeCsiUjFBJteJ7x_17smoldot_full_node |
262 | | |
263 | | /// Adds the `data` to [`ReadWrite::write_buffers`], increases |
264 | | /// [`ReadWrite::write_bytes_queued`], and decreases [`ReadWrite::write_bytes_queueable`]. |
265 | | /// |
266 | | /// # Panic |
267 | | /// |
268 | | /// Panics if `data.len() > write_bytes_queueable`. |
269 | | /// Panics if the writing side is closed and `data` isn't empty. |
270 | | /// |
271 | | // TODO: is this function necessary? it seems dangerous due to the panic regarding queuable bytes |
272 | 1 | pub fn write_out(&mut self, data: Vec<u8>) { |
273 | 1 | if data.is_empty() { |
274 | 0 | return; |
275 | 1 | } |
276 | 1 | assert!(data.len() <= self.write_bytes_queueable.unwrap_or(0)); |
277 | 1 | self.write_bytes_queued += data.len(); |
278 | 1 | *self.write_bytes_queueable.as_mut().unwrap() -= data.len(); |
279 | 1 | self.write_buffers.push(data); |
280 | 1 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE9write_outB6_ Line | Count | Source | 272 | 1 | pub fn write_out(&mut self, data: Vec<u8>) { | 273 | 1 | if data.is_empty() { | 274 | 0 | return; | 275 | 1 | } | 276 | 1 | assert!(data.len() <= self.write_bytes_queueable.unwrap_or(0)); | 277 | 1 | self.write_bytes_queued += data.len(); | 278 | 1 | *self.write_bytes_queueable.as_mut().unwrap() -= data.len(); | 279 | 1 | self.write_buffers.push(data); | 280 | 1 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE9write_outB6_ |
281 | | |
282 | | /// Sets [`ReadWrite::wake_up_after`] to `min(wake_up_after, after)`. |
283 | 483 | pub fn wake_up_after(&mut self, after: &TNow) |
284 | 483 | where |
285 | 483 | TNow: Clone + Ord, |
286 | 483 | { |
287 | 350 | match self.wake_up_after { |
288 | 350 | Some(ref mut t) if *t < *after => {}300 |
289 | 50 | Some(ref mut t) => *t = after.clone(), |
290 | 133 | ref mut t @ None => *t = Some(after.clone()), |
291 | | } |
292 | 483 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE13wake_up_afterB6_ Line | Count | Source | 283 | 483 | pub fn wake_up_after(&mut self, after: &TNow) | 284 | 483 | where | 285 | 483 | TNow: Clone + Ord, | 286 | 483 | { | 287 | 350 | match self.wake_up_after { | 288 | 350 | Some(ref mut t) if *t < *after => {}300 | 289 | 50 | Some(ref mut t) => *t = after.clone(), | 290 | 133 | ref mut t @ None => *t = Some(after.clone()), | 291 | | } | 292 | 483 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE13wake_up_afterCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE13wake_up_afterB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE13wake_up_afterCsiUjFBJteJ7x_17smoldot_full_node |
293 | | |
294 | | /// Sets [`ReadWrite::wake_up_after`] to the value in [`ReadWrite::now`]. |
295 | 522 | pub fn wake_up_asap(&mut self) |
296 | 522 | where |
297 | 522 | TNow: Clone, |
298 | 522 | { |
299 | 522 | self.wake_up_after = Some(self.now.clone()); |
300 | 522 | } _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE12wake_up_asapB6_ Line | Count | Source | 295 | 519 | pub fn wake_up_asap(&mut self) | 296 | 519 | where | 297 | 519 | TNow: Clone, | 298 | 519 | { | 299 | 519 | self.wake_up_after = Some(self.now.clone()); | 300 | 519 | } |
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE12wake_up_asapB6_ Line | Count | Source | 295 | 3 | pub fn wake_up_asap(&mut self) | 296 | 3 | where | 297 | 3 | TNow: Clone, | 298 | 3 | { | 299 | 3 | self.wake_up_after = Some(self.now.clone()); | 300 | 3 | } |
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE12wake_up_asapCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE12wake_up_asapB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE12wake_up_asapCsiUjFBJteJ7x_17smoldot_full_node |
301 | | } |
302 | | |
303 | | /// Error potentially returned by [`ReadWrite::incoming_bytes_take`]. |
304 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXs1_NtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeNtB5_22IncomingBytesTakeErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeNtB5_22IncomingBytesTakeErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
305 | | pub enum IncomingBytesTakeError { |
306 | | /// Reading side of the stream is closed. |
307 | | ReadClosed, |
308 | | } |
309 | | |
310 | | /// Error potentially returned by [`ReadWrite::incoming_bytes_take_leb128`]. |
311 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXs4_NtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeNtB5_28IncomingBytesTakeLeb128ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs4_NtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeNtB5_28IncomingBytesTakeLeb128ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
312 | | pub enum IncomingBytesTakeLeb128Error { |
313 | | /// Invalid LEB128 number. |
314 | | InvalidLeb128, |
315 | | /// Reading side of the stream is closed. |
316 | | ReadClosed, |
317 | | /// Number of bytes decoded is larger than expected. |
318 | | TooLarge, |
319 | | } |
320 | | |
321 | | #[cfg(test)] |
322 | | mod tests { |
323 | | use super::{IncomingBytesTakeError, ReadWrite}; |
324 | | |
325 | | #[test] |
326 | 1 | fn take_bytes() { |
327 | 1 | let mut rw = ReadWrite { |
328 | 1 | now: 0, |
329 | 1 | incoming_buffer: vec![0x80; 64], |
330 | 1 | expected_incoming_bytes: Some(12), |
331 | 1 | read_bytes: 2, |
332 | 1 | write_buffers: Vec::new(), |
333 | 1 | write_bytes_queued: 0, |
334 | 1 | write_bytes_queueable: None, |
335 | 1 | wake_up_after: None, |
336 | 1 | }; |
337 | 1 | |
338 | 1 | let buffer = rw.incoming_bytes_take(5).unwrap().unwrap(); |
339 | 1 | assert_eq!(buffer, &[0x80, 0x80, 0x80, 0x80, 0x80]); |
340 | 1 | assert_eq!(rw.incoming_buffer.len(), 59); |
341 | 1 | assert_eq!(rw.read_bytes, 7); |
342 | 1 | assert_eq!(rw.expected_incoming_bytes, Some(7)); |
343 | | |
344 | 1 | assert!(matches!0 (rw.incoming_bytes_take(1000), Ok(None))); |
345 | 1 | assert_eq!(rw.read_bytes, 7); |
346 | 1 | assert_eq!(rw.expected_incoming_bytes, Some(1000)); |
347 | | |
348 | 1 | let buffer = rw.incoming_bytes_take(57).unwrap().unwrap(); |
349 | 1 | assert_eq!(buffer.len(), 57); |
350 | 1 | assert_eq!(rw.incoming_buffer.len(), 2); |
351 | 1 | assert_eq!(rw.read_bytes, 64); |
352 | 1 | assert_eq!(rw.expected_incoming_bytes, Some(1000 - 57)); |
353 | 1 | } |
354 | | |
355 | | #[test] |
356 | 1 | fn take_bytes_closed() { |
357 | 1 | let mut rw = ReadWrite { |
358 | 1 | now: 0, |
359 | 1 | incoming_buffer: vec![0x80; 64], |
360 | 1 | expected_incoming_bytes: None, |
361 | 1 | read_bytes: 2, |
362 | 1 | write_buffers: Vec::new(), |
363 | 1 | write_bytes_queued: 0, |
364 | 1 | write_bytes_queueable: None, |
365 | 1 | wake_up_after: None, |
366 | 1 | }; |
367 | 1 | |
368 | 1 | assert!(matches!0 ( |
369 | 1 | rw.incoming_bytes_take(1000), |
370 | | Err(IncomingBytesTakeError::ReadClosed) |
371 | | )); |
372 | 1 | assert_eq!(rw.expected_incoming_bytes, None); |
373 | | |
374 | 1 | let buffer = rw.incoming_bytes_take(5).unwrap().unwrap(); |
375 | 1 | assert_eq!(buffer, &[0x80, 0x80, 0x80, 0x80, 0x80]); |
376 | 1 | assert_eq!(rw.incoming_buffer.len(), 59); |
377 | 1 | assert_eq!(rw.read_bytes, 7); |
378 | 1 | assert_eq!(rw.expected_incoming_bytes, None); |
379 | | |
380 | 1 | assert!(matches!0 ( |
381 | 1 | rw.incoming_bytes_take(1000), |
382 | | Err(IncomingBytesTakeError::ReadClosed) |
383 | | )); |
384 | 1 | assert_eq!(rw.expected_incoming_bytes, None); |
385 | 1 | } |
386 | | |
387 | | #[test] |
388 | 1 | fn write_out() { |
389 | 1 | let mut rw = ReadWrite { |
390 | 1 | now: 0, |
391 | 1 | incoming_buffer: Vec::new(), |
392 | 1 | expected_incoming_bytes: None, |
393 | 1 | read_bytes: 0, |
394 | 1 | write_buffers: Vec::new(), |
395 | 1 | write_bytes_queued: 11, |
396 | 1 | write_bytes_queueable: Some(10), |
397 | 1 | wake_up_after: None, |
398 | 1 | }; |
399 | 1 | |
400 | 1 | rw.write_out(b"hello".to_vec()); |
401 | 1 | assert_eq!(rw.write_buffers.len(), 1); |
402 | 1 | assert_eq!(rw.write_bytes_queued, 16); |
403 | 1 | assert_eq!(rw.write_bytes_queueable, Some(5)); |
404 | 1 | } |
405 | | |
406 | | #[test] |
407 | 1 | fn write_from_vec_deque_smaller() { |
408 | 1 | let mut input = [1, 2, 3, 4].iter().cloned().collect(); |
409 | 1 | |
410 | 1 | let mut rw = ReadWrite { |
411 | 1 | now: 0, |
412 | 1 | incoming_buffer: Vec::new(), |
413 | 1 | expected_incoming_bytes: None, |
414 | 1 | read_bytes: 0, |
415 | 1 | write_buffers: Vec::new(), |
416 | 1 | write_bytes_queueable: Some(5), |
417 | 1 | write_bytes_queued: 5, |
418 | 1 | wake_up_after: None, |
419 | 1 | }; |
420 | 1 | |
421 | 1 | rw.write_from_vec_deque(&mut input); |
422 | 1 | assert!(input.is_empty()); |
423 | 1 | assert_eq!(rw.write_bytes_queued, 9); |
424 | 1 | assert_eq!(rw.write_bytes_queueable, Some(1)); |
425 | 1 | } |
426 | | |
427 | | #[test] |
428 | 1 | fn write_from_vec_deque_larger() { |
429 | 1 | let mut input = [1, 2, 3, 4, 5, 6].iter().cloned().collect(); |
430 | 1 | |
431 | 1 | let mut rw = ReadWrite { |
432 | 1 | now: 0, |
433 | 1 | incoming_buffer: Vec::new(), |
434 | 1 | expected_incoming_bytes: None, |
435 | 1 | read_bytes: 0, |
436 | 1 | write_buffers: Vec::new(), |
437 | 1 | write_bytes_queueable: Some(5), |
438 | 1 | write_bytes_queued: 5, |
439 | 1 | wake_up_after: None, |
440 | 1 | }; |
441 | 1 | |
442 | 1 | rw.write_from_vec_deque(&mut input); |
443 | 1 | assert_eq!(input.into_iter().collect::<Vec<_>>(), &[6]); |
444 | 1 | assert_eq!(rw.write_bytes_queued, 10); |
445 | 1 | assert_eq!(rw.write_bytes_queueable, Some(0)); |
446 | 1 | } |
447 | | } |