Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/read_write.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use core::{cmp, mem};
19
20
use alloc::{collections::VecDeque, vec::Vec};
21
22
// TODO: documentation
23
24
#[must_use]
25
pub struct ReadWrite<TNow> {
26
    pub now: TNow,
27
28
    /// Buffer of socket data ready to be processed.
29
    pub incoming_buffer: Vec<u8>,
30
31
    /// Number of bytes that [`ReadWrite::incoming_buffer`] should contain. `None` if the remote
32
    /// has closed their reading side of the socket.
33
    pub expected_incoming_bytes: Option<usize>,
34
35
    /// Total number of bytes that have been read from [`ReadWrite::incoming_buffer`].
36
    ///
37
    /// [`ReadWrite::incoming_buffer`] must have been advanced after these bytes.
38
    // TODO: is this field actually useful?
39
    pub read_bytes: usize,
40
41
    /// List of buffers containing data to the written out. The consumer of the [`ReadWrite`] is
42
    /// expected to add buffers.
43
    // TODO: consider changing the inner `Vec` to `Box<dyn AsRef<[u8]>>`
44
    pub write_buffers: Vec<Vec<u8>>,
45
46
    /// Amount of data already queued, both outside and including [`ReadWrite::write_buffers`].
47
    // TODO: is this field actually useful?
48
    pub write_bytes_queued: usize,
49
50
    /// Number of additional bytes that are allowed to be pushed to [`ReadWrite::write_buffers`].
51
    /// `None` if the writing side of the stream is closed.
52
    pub write_bytes_queueable: Option<usize>,
53
54
    /// If `Some`, the socket must be waken up after the given `TNow` is reached.
55
    pub wake_up_after: Option<TNow>,
56
}
57
58
impl<TNow> ReadWrite<TNow> {
59
    /// Returns true if the connection should be considered dead. That is, both
60
    /// [`ReadWrite::expected_incoming_bytes`] is `None` and [`ReadWrite::write_bytes_queueable`]
61
    /// is `None`.
62
3
    pub fn is_dead(&self) -> bool {
63
3
        self.expected_incoming_bytes.is_none() && 
self.write_bytes_queueable.is_none()1
64
3
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE7is_deadB6_
Line
Count
Source
62
3
    pub fn is_dead(&self) -> bool {
63
3
        self.expected_incoming_bytes.is_none() && 
self.write_bytes_queueable.is_none()1
64
3
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE7is_deadCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE7is_deadB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE7is_deadCsiUjFBJteJ7x_17smoldot_full_node
65
66
    /// Sets the writing side of the connection to closed.
67
    ///
68
    /// This is simply a shortcut for setting [`ReadWrite::write_bytes_queueable`] to `None`.
69
15
    pub fn close_write(&mut self) {
70
15
        self.write_bytes_queueable = None;
71
15
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE11close_writeB6_
Line
Count
Source
69
15
    pub fn close_write(&mut self) {
70
15
        self.write_bytes_queueable = None;
71
15
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE11close_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE11close_writeB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE11close_writeCsiUjFBJteJ7x_17smoldot_full_node
72
73
    /// Returns the size of the data available in the incoming buffer.
74
0
    pub fn incoming_buffer_available(&self) -> usize {
75
0
        self.incoming_buffer.len()
76
0
    }
Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritepE25incoming_buffer_availableB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE25incoming_buffer_availableB6_
77
78
    /// Discards all the incoming data. Updates [`ReadWrite::read_bytes`] and decreases
79
    /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes.
80
6
    pub fn discard_all_incoming(&mut self) {
81
6
        self.read_bytes += self.incoming_buffer.len();
82
6
        if let Some(
expected_incoming_bytes5
) = &mut self.expected_incoming_bytes {
83
5
            *expected_incoming_bytes =
84
5
                expected_incoming_bytes.saturating_sub(self.incoming_buffer.len());
85
5
        }
1
86
6
        self.incoming_buffer.clear();
87
6
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20discard_all_incomingB6_
Line
Count
Source
80
6
    pub fn discard_all_incoming(&mut self) {
81
6
        self.read_bytes += self.incoming_buffer.len();
82
6
        if let Some(
expected_incoming_bytes5
) = &mut self.expected_incoming_bytes {
83
5
            *expected_incoming_bytes =
84
5
                expected_incoming_bytes.saturating_sub(self.incoming_buffer.len());
85
5
        }
1
86
6
        self.incoming_buffer.clear();
87
6
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20discard_all_incomingCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE20discard_all_incomingB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE20discard_all_incomingCsiUjFBJteJ7x_17smoldot_full_node
88
89
    /// Extract a certain number of bytes from the read buffer.
90
    ///
91
    /// On success, updates [`ReadWrite::read_bytes`] and decreases
92
    /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes.
93
    ///
94
    /// If not enough bytes are available, returns `None` and sets
95
    /// [`ReadWrite::expected_incoming_bytes`] to the requested number of bytes.
96
1.19k
    pub fn incoming_bytes_take(
97
1.19k
        &mut self,
98
1.19k
        num: usize,
99
1.19k
    ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> {
100
1.19k
        if self.incoming_buffer.len() < num {
101
481
            if let Some(
expected_incoming_bytes478
) = self.expected_incoming_bytes.as_mut() {
102
478
                *expected_incoming_bytes = num;
103
478
                return Ok(None);
104
            } else {
105
3
                return Err(IncomingBytesTakeError::ReadClosed);
106
            }
107
712
        }
108
712
109
712
        self.read_bytes += num;
110
111
712
        if let Some(
expected_incoming_bytes709
) = self.expected_incoming_bytes.as_mut() {
112
709
            *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num);
113
709
        }
3
114
115
712
        if self.incoming_buffer.len() == num {
116
314
            Ok(Some(mem::take(&mut self.incoming_buffer)))
117
398
        } else if self.incoming_buffer.len() - num < num.saturating_mul(2) {
118
108
            let remains = self.incoming_buffer.split_at(num).1.to_vec();
119
108
            self.incoming_buffer.truncate(num);
120
108
            Ok(Some(mem::replace(&mut self.incoming_buffer, remains)))
121
        } else {
122
290
            let to_ret = self.incoming_buffer.split_at(num).0.to_vec();
123
290
            self.incoming_buffer.copy_within(num.., 0);
124
290
            self.incoming_buffer
125
290
                .truncate(self.incoming_buffer.len() - num);
126
290
            Ok(Some(to_ret))
127
        }
128
1.19k
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE19incoming_bytes_takeB6_
Line
Count
Source
96
1.10k
    pub fn incoming_bytes_take(
97
1.10k
        &mut self,
98
1.10k
        num: usize,
99
1.10k
    ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> {
100
1.10k
        if self.incoming_buffer.len() < num {
101
452
            if let Some(
expected_incoming_bytes451
) = self.expected_incoming_bytes.as_mut() {
102
451
                *expected_incoming_bytes = num;
103
451
                return Ok(None);
104
            } else {
105
1
                return Err(IncomingBytesTakeError::ReadClosed);
106
            }
107
649
        }
108
649
109
649
        self.read_bytes += num;
110
111
649
        if let Some(
expected_incoming_bytes647
) = self.expected_incoming_bytes.as_mut() {
112
647
            *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num);
113
647
        }
2
114
115
649
        if self.incoming_buffer.len() == num {
116
275
            Ok(Some(mem::take(&mut self.incoming_buffer)))
117
374
        } else if self.incoming_buffer.len() - num < num.saturating_mul(2) {
118
100
            let remains = self.incoming_buffer.split_at(num).1.to_vec();
119
100
            self.incoming_buffer.truncate(num);
120
100
            Ok(Some(mem::replace(&mut self.incoming_buffer, remains)))
121
        } else {
122
274
            let to_ret = self.incoming_buffer.split_at(num).0.to_vec();
123
274
            self.incoming_buffer.copy_within(num.., 0);
124
274
            self.incoming_buffer
125
274
                .truncate(self.incoming_buffer.len() - num);
126
274
            Ok(Some(to_ret))
127
        }
128
1.10k
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE19incoming_bytes_takeB6_
Line
Count
Source
96
92
    pub fn incoming_bytes_take(
97
92
        &mut self,
98
92
        num: usize,
99
92
    ) -> Result<Option<Vec<u8>>, IncomingBytesTakeError> {
100
92
        if self.incoming_buffer.len() < num {
101
29
            if let Some(
expected_incoming_bytes27
) = self.expected_incoming_bytes.as_mut() {
102
27
                *expected_incoming_bytes = num;
103
27
                return Ok(None);
104
            } else {
105
2
                return Err(IncomingBytesTakeError::ReadClosed);
106
            }
107
63
        }
108
63
109
63
        self.read_bytes += num;
110
111
63
        if let Some(
expected_incoming_bytes62
) = self.expected_incoming_bytes.as_mut() {
112
62
            *expected_incoming_bytes = expected_incoming_bytes.saturating_sub(num);
113
62
        }
1
114
115
63
        if self.incoming_buffer.len() == num {
116
39
            Ok(Some(mem::take(&mut self.incoming_buffer)))
117
24
        } else if self.incoming_buffer.len() - num < num.saturating_mul(2) {
118
8
            let remains = self.incoming_buffer.split_at(num).1.to_vec();
119
8
            self.incoming_buffer.truncate(num);
120
8
            Ok(Some(mem::replace(&mut self.incoming_buffer, remains)))
121
        } else {
122
16
            let to_ret = self.incoming_buffer.split_at(num).0.to_vec();
123
16
            self.incoming_buffer.copy_within(num.., 0);
124
16
            self.incoming_buffer
125
16
                .truncate(self.incoming_buffer.len() - num);
126
16
            Ok(Some(to_ret))
127
        }
128
92
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE19incoming_bytes_takeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE19incoming_bytes_takeB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE19incoming_bytes_takeCsiUjFBJteJ7x_17smoldot_full_node
129
130
    /// Same as [`ReadWrite::incoming_bytes_take_array`], but reads a number of bytes as a
131
    /// compile-time constant.
132
468
    pub fn incoming_bytes_take_array<const N: usize>(
133
468
        &mut self,
134
468
    ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> {
135
468
        let Some(
vec158
) = self.incoming_bytes_take(N)
?0
else {
136
310
            return Ok(None);
137
        };
138
139
158
        let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap();
140
158
        Ok(Some(*bytes))
141
468
    }
_RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKj2_EB7_
Line
Count
Source
132
244
    pub fn incoming_bytes_take_array<const N: usize>(
133
244
        &mut self,
134
244
    ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> {
135
244
        let Some(
vec88
) = self.incoming_bytes_take(N)
?0
else {
136
156
            return Ok(None);
137
        };
138
139
88
        let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap();
140
88
        Ok(Some(*bytes))
141
244
    }
_RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKjc_EB7_
Line
Count
Source
132
219
    pub fn incoming_bytes_take_array<const N: usize>(
133
219
        &mut self,
134
219
    ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> {
135
219
        let Some(
vec67
) = self.incoming_bytes_take(N)
?0
else {
136
152
            return Ok(None);
137
        };
138
139
67
        let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap();
140
67
        Ok(Some(*bytes))
141
219
    }
_RINvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB3_9ReadWritelE25incoming_bytes_take_arrayKj2_EB7_
Line
Count
Source
132
5
    pub fn incoming_bytes_take_array<const N: usize>(
133
5
        &mut self,
134
5
    ) -> Result<Option<[u8; N]>, IncomingBytesTakeError> {
135
5
        let Some(
vec3
) = self.incoming_bytes_take(N)
?0
else {
136
2
            return Ok(None);
137
        };
138
139
3
        let bytes = <&[u8; N]>::try_from(&vec[..]).unwrap();
140
3
        Ok(Some(*bytes))
141
5
    }
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKj2_ECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE25incoming_bytes_take_arrayKjc_ECsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWritepE25incoming_bytes_take_arrayKpEB7_
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE25incoming_bytes_take_arrayKj2_ECsiUjFBJteJ7x_17smoldot_full_node
Unexecuted instantiation: _RINvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB3_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE25incoming_bytes_take_arrayKjc_ECsiUjFBJteJ7x_17smoldot_full_node
142
143
    /// Extract an LEB128-encoded number from the start of the read buffer.
144
    ///
145
    /// On success, updates [`ReadWrite::read_bytes`] and decreases
146
    /// [`ReadWrite::expected_incoming_bytes`] by the number of consumed bytes.
147
    ///
148
    /// If not enough bytes are available, returns `None` and sets
149
    /// [`ReadWrite::expected_incoming_bytes`] to the required number of bytes.
150
    ///
151
    /// Must be passed the maximum value that this function can return on success. An error is
152
    /// returned if the value sent by the remote is higher than this maximum. This parameter,
153
    /// while not strictly necessary, is here for safety, as it is easy to forget to check the
154
    /// value against a maximum.
155
343
    pub fn incoming_bytes_take_leb128(
156
343
        &mut self,
157
343
        max_decoded_number: usize,
158
343
    ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> {
159
343
        match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>(
160
343
            &self.incoming_buffer,
161
        ) {
162
164
            Ok((rest, num)) => {
163
164
                if num > max_decoded_number {
164
                    // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte
165
0
                    return Err(IncomingBytesTakeLeb128Error::TooLarge);
166
164
                }
167
164
168
164
                let consumed_bytes = self.incoming_buffer.len() - rest.len();
169
164
                if !rest.is_empty() {
170
83
                    self.incoming_buffer.copy_within(consumed_bytes.., 0);
171
83
                    self.incoming_buffer
172
83
                        .truncate(self.incoming_buffer.len() - consumed_bytes);
173
83
                } else {
174
81
                    self.incoming_buffer.clear();
175
81
                }
176
164
                self.read_bytes += consumed_bytes;
177
164
                if let Some(
expected_incoming_bytes161
) = self.expected_incoming_bytes.as_mut() {
178
161
                    *expected_incoming_bytes =
179
161
                        expected_incoming_bytes.saturating_sub(consumed_bytes);
180
161
                }
3
181
164
                Ok(Some(num))
182
            }
183
0
            Err(nom::Err::Incomplete(nom::Needed::Size(num))) => {
184
0
                if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() {
185
0
                    *expected_incoming_bytes = self.incoming_buffer.len() + num.get();
186
0
                } else {
187
0
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
188
                }
189
0
                Ok(None)
190
            }
191
            Err(nom::Err::Incomplete(nom::Needed::Unknown)) => {
192
179
                if let Some(
expected_incoming_bytes178
) = self.expected_incoming_bytes.as_mut() {
193
178
                    *expected_incoming_bytes = self.incoming_buffer.len() + 1;
194
178
                } else {
195
1
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
196
                }
197
178
                Ok(None)
198
            }
199
0
            Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128),
200
        }
201
343
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE26incoming_bytes_take_leb128B6_
Line
Count
Source
155
300
    pub fn incoming_bytes_take_leb128(
156
300
        &mut self,
157
300
        max_decoded_number: usize,
158
300
    ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> {
159
300
        match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>(
160
300
            &self.incoming_buffer,
161
        ) {
162
140
            Ok((rest, num)) => {
163
140
                if num > max_decoded_number {
164
                    // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte
165
0
                    return Err(IncomingBytesTakeLeb128Error::TooLarge);
166
140
                }
167
140
168
140
                let consumed_bytes = self.incoming_buffer.len() - rest.len();
169
140
                if !rest.is_empty() {
170
63
                    self.incoming_buffer.copy_within(consumed_bytes.., 0);
171
63
                    self.incoming_buffer
172
63
                        .truncate(self.incoming_buffer.len() - consumed_bytes);
173
77
                } else {
174
77
                    self.incoming_buffer.clear();
175
77
                }
176
140
                self.read_bytes += consumed_bytes;
177
140
                if let Some(
expected_incoming_bytes137
) = self.expected_incoming_bytes.as_mut() {
178
137
                    *expected_incoming_bytes =
179
137
                        expected_incoming_bytes.saturating_sub(consumed_bytes);
180
137
                }
3
181
140
                Ok(Some(num))
182
            }
183
0
            Err(nom::Err::Incomplete(nom::Needed::Size(num))) => {
184
0
                if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() {
185
0
                    *expected_incoming_bytes = self.incoming_buffer.len() + num.get();
186
0
                } else {
187
0
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
188
                }
189
0
                Ok(None)
190
            }
191
            Err(nom::Err::Incomplete(nom::Needed::Unknown)) => {
192
160
                if let Some(
expected_incoming_bytes159
) = self.expected_incoming_bytes.as_mut() {
193
159
                    *expected_incoming_bytes = self.incoming_buffer.len() + 1;
194
159
                } else {
195
1
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
196
                }
197
159
                Ok(None)
198
            }
199
0
            Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128),
200
        }
201
300
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE26incoming_bytes_take_leb128B6_
Line
Count
Source
155
43
    pub fn incoming_bytes_take_leb128(
156
43
        &mut self,
157
43
        max_decoded_number: usize,
158
43
    ) -> Result<Option<usize>, IncomingBytesTakeLeb128Error> {
159
43
        match crate::util::leb128::nom_leb128_usize::<nom::error::Error<&[u8]>>(
160
43
            &self.incoming_buffer,
161
        ) {
162
24
            Ok((rest, num)) => {
163
24
                if num > max_decoded_number {
164
                    // TODO: consider detecting earlier if `TooLarge` is reached; for example is max is 20 we know that it can't be more than one byte
165
0
                    return Err(IncomingBytesTakeLeb128Error::TooLarge);
166
24
                }
167
24
168
24
                let consumed_bytes = self.incoming_buffer.len() - rest.len();
169
24
                if !rest.is_empty() {
170
20
                    self.incoming_buffer.copy_within(consumed_bytes.., 0);
171
20
                    self.incoming_buffer
172
20
                        .truncate(self.incoming_buffer.len() - consumed_bytes);
173
20
                } else {
174
4
                    self.incoming_buffer.clear();
175
4
                }
176
24
                self.read_bytes += consumed_bytes;
177
24
                if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() {
178
24
                    *expected_incoming_bytes =
179
24
                        expected_incoming_bytes.saturating_sub(consumed_bytes);
180
24
                }
0
181
24
                Ok(Some(num))
182
            }
183
0
            Err(nom::Err::Incomplete(nom::Needed::Size(num))) => {
184
0
                if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() {
185
0
                    *expected_incoming_bytes = self.incoming_buffer.len() + num.get();
186
0
                } else {
187
0
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
188
                }
189
0
                Ok(None)
190
            }
191
            Err(nom::Err::Incomplete(nom::Needed::Unknown)) => {
192
19
                if let Some(expected_incoming_bytes) = self.expected_incoming_bytes.as_mut() {
193
19
                    *expected_incoming_bytes = self.incoming_buffer.len() + 1;
194
19
                } else {
195
0
                    return Err(IncomingBytesTakeLeb128Error::ReadClosed);
196
                }
197
19
                Ok(None)
198
            }
199
0
            Err(_) => Err(IncomingBytesTakeLeb128Error::InvalidLeb128),
200
        }
201
43
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE26incoming_bytes_take_leb128CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE26incoming_bytes_take_leb128B6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE26incoming_bytes_take_leb128CsiUjFBJteJ7x_17smoldot_full_node
202
203
    /// Copies as much as possible from the content of `data` to [`ReadWrite::write_buffers`]
204
    /// and updates [`ReadWrite::write_bytes_queued`] and [`ReadWrite::write_bytes_queueable`].
205
    /// The bytes that have been written are removed from `data`.
206
    ///
207
    /// This function is recommended only if the `Vec` is small.
208
0
    pub fn write_from_vec(&mut self, data: &mut Vec<u8>) {
209
0
        let Some(queueable) = self.write_bytes_queueable.as_mut() else {
210
0
            return;
211
        };
212
213
0
        let to_copy = cmp::min(data.len(), *queueable);
214
0
        if to_copy == 0 {
215
0
            return;
216
0
        }
217
0
218
0
        if to_copy == data.len() {
219
0
            self.write_buffers.push(mem::take(data));
220
0
        } else {
221
0
            self.write_buffers.push(data[..to_copy].to_vec());
222
0
            data.copy_within(to_copy.., 0);
223
0
            data.truncate(data.len() - to_copy);
224
0
        }
225
226
0
        self.write_bytes_queued += to_copy;
227
0
        *queueable -= to_copy;
228
0
    }
Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE14write_from_vecB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE14write_from_vecCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE14write_from_vecB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE14write_from_vecCsiUjFBJteJ7x_17smoldot_full_node
229
230
    /// Copies as much as possible from the content of `data` to [`ReadWrite::write_buffers`]
231
    /// and updates [`ReadWrite::write_bytes_queued`] and [`ReadWrite::write_bytes_queueable`].
232
    /// The bytes that have been written are removed from `data`.
233
849
    pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) {
234
849
        let (slice1, slice2) = data.as_slices();
235
849
236
849
        let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0));
237
849
        let to_copy2 = if to_copy1 == slice1.len() {
238
830
            cmp::min(
239
830
                slice2.len(),
240
830
                self.write_bytes_queueable.unwrap_or(0) - to_copy1,
241
830
            )
242
        } else {
243
19
            0
244
        };
245
246
849
        let total_tocopy = to_copy1 + to_copy2;
247
849
248
849
        if total_tocopy == 0 {
249
670
            return;
250
179
        }
251
179
252
179
        self.write_buffers.push(slice1[..to_copy1].to_vec());
253
179
        self.write_buffers.push(slice2[..to_copy2].to_vec());
254
179
255
179
        self.write_bytes_queued += total_tocopy;
256
179
        *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy;
257
179
258
7.38k
        for _ in 0..total_tocopy {
259
7.38k
            data.pop_front();
260
7.38k
        }
261
849
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20write_from_vec_dequeB6_
Line
Count
Source
233
701
    pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) {
234
701
        let (slice1, slice2) = data.as_slices();
235
701
236
701
        let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0));
237
701
        let to_copy2 = if to_copy1 == slice1.len() {
238
701
            cmp::min(
239
701
                slice2.len(),
240
701
                self.write_bytes_queueable.unwrap_or(0) - to_copy1,
241
701
            )
242
        } else {
243
0
            0
244
        };
245
246
701
        let total_tocopy = to_copy1 + to_copy2;
247
701
248
701
        if total_tocopy == 0 {
249
571
            return;
250
130
        }
251
130
252
130
        self.write_buffers.push(slice1[..to_copy1].to_vec());
253
130
        self.write_buffers.push(slice2[..to_copy2].to_vec());
254
130
255
130
        self.write_bytes_queued += total_tocopy;
256
130
        *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy;
257
130
258
5.01k
        for _ in 0..total_tocopy {
259
5.01k
            data.pop_front();
260
5.01k
        }
261
701
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE20write_from_vec_dequeB6_
Line
Count
Source
233
148
    pub fn write_from_vec_deque(&mut self, data: &mut VecDeque<u8>) {
234
148
        let (slice1, slice2) = data.as_slices();
235
148
236
148
        let to_copy1 = cmp::min(slice1.len(), self.write_bytes_queueable.unwrap_or(0));
237
148
        let to_copy2 = if to_copy1 == slice1.len() {
238
129
            cmp::min(
239
129
                slice2.len(),
240
129
                self.write_bytes_queueable.unwrap_or(0) - to_copy1,
241
129
            )
242
        } else {
243
19
            0
244
        };
245
246
148
        let total_tocopy = to_copy1 + to_copy2;
247
148
248
148
        if total_tocopy == 0 {
249
99
            return;
250
49
        }
251
49
252
49
        self.write_buffers.push(slice1[..to_copy1].to_vec());
253
49
        self.write_buffers.push(slice2[..to_copy2].to_vec());
254
49
255
49
        self.write_bytes_queued += total_tocopy;
256
49
        *self.write_bytes_queueable.as_mut().unwrap() -= total_tocopy;
257
49
258
2.37k
        for _ in 0..total_tocopy {
259
2.37k
            data.pop_front();
260
2.37k
        }
261
148
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE20write_from_vec_dequeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE20write_from_vec_dequeB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE20write_from_vec_dequeCsiUjFBJteJ7x_17smoldot_full_node
262
263
    /// Adds the `data` to [`ReadWrite::write_buffers`], increases
264
    /// [`ReadWrite::write_bytes_queued`], and decreases [`ReadWrite::write_bytes_queueable`].
265
    ///
266
    /// # Panic
267
    ///
268
    /// Panics if `data.len() > write_bytes_queueable`.
269
    /// Panics if the writing side is closed and `data` isn't empty.
270
    ///
271
    // TODO: is this function necessary? it seems dangerous due to the panic regarding queuable bytes
272
1
    pub fn write_out(&mut self, data: Vec<u8>) {
273
1
        if data.is_empty() {
274
0
            return;
275
1
        }
276
1
        assert!(data.len() <= self.write_bytes_queueable.unwrap_or(0));
277
1
        self.write_bytes_queued += data.len();
278
1
        *self.write_bytes_queueable.as_mut().unwrap() -= data.len();
279
1
        self.write_buffers.push(data);
280
1
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE9write_outB6_
Line
Count
Source
272
1
    pub fn write_out(&mut self, data: Vec<u8>) {
273
1
        if data.is_empty() {
274
0
            return;
275
1
        }
276
1
        assert!(data.len() <= self.write_bytes_queueable.unwrap_or(0));
277
1
        self.write_bytes_queued += data.len();
278
1
        *self.write_bytes_queueable.as_mut().unwrap() -= data.len();
279
1
        self.write_buffers.push(data);
280
1
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE9write_outB6_
281
282
    /// Sets [`ReadWrite::wake_up_after`] to `min(wake_up_after, after)`.
283
483
    pub fn wake_up_after(&mut self, after: &TNow)
284
483
    where
285
483
        TNow: Clone + Ord,
286
483
    {
287
350
        match self.wake_up_after {
288
350
            Some(ref mut t) if *t < *afte
r => {}300
289
50
            Some(ref mut t) => *t = after.clone(),
290
133
            ref mut t @ None => *t = Some(after.clone()),
291
        }
292
483
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE13wake_up_afterB6_
Line
Count
Source
283
483
    pub fn wake_up_after(&mut self, after: &TNow)
284
483
    where
285
483
        TNow: Clone + Ord,
286
483
    {
287
350
        match self.wake_up_after {
288
350
            Some(ref mut t) if *t < *afte
r => {}300
289
50
            Some(ref mut t) => *t = after.clone(),
290
133
            ref mut t @ None => *t = Some(after.clone()),
291
        }
292
483
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE13wake_up_afterCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE13wake_up_afterB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE13wake_up_afterCsiUjFBJteJ7x_17smoldot_full_node
293
294
    /// Sets [`ReadWrite::wake_up_after`] to the value in [`ReadWrite::now`].
295
522
    pub fn wake_up_asap(&mut self)
296
522
    where
297
522
        TNow: Clone,
298
522
    {
299
522
        self.wake_up_after = Some(self.now.clone());
300
522
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE12wake_up_asapB6_
Line
Count
Source
295
519
    pub fn wake_up_asap(&mut self)
296
519
    where
297
519
        TNow: Clone,
298
519
    {
299
519
        self.wake_up_after = Some(self.now.clone());
300
519
    }
_RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeINtB2_9ReadWritelE12wake_up_asapB6_
Line
Count
Source
295
3
    pub fn wake_up_asap(&mut self)
296
3
    where
297
3
        TNow: Clone,
298
3
    {
299
3
        self.wake_up_after = Some(self.now.clone());
300
3
    }
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsaYZPK01V26L_4core4time8DurationE12wake_up_asapCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWritepE12wake_up_asapB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeINtB2_9ReadWriteNtNtCsbpXXxgr6u8g_3std4time7InstantE12wake_up_asapCsiUjFBJteJ7x_17smoldot_full_node
301
}
302
303
/// Error potentially returned by [`ReadWrite::incoming_bytes_take`].
304
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXs1_NtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeNtB5_22IncomingBytesTakeErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeNtB5_22IncomingBytesTakeErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
305
pub enum IncomingBytesTakeError {
306
    /// Reading side of the stream is closed.
307
    ReadClosed,
308
}
309
310
/// Error potentially returned by [`ReadWrite::incoming_bytes_take_leb128`].
311
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXs4_NtNtCsN16ciHI6Qf_7smoldot6libp2p10read_writeNtB5_28IncomingBytesTakeLeb128ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs4_NtNtCseuYC0Zibziv_7smoldot6libp2p10read_writeNtB5_28IncomingBytesTakeLeb128ErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
312
pub enum IncomingBytesTakeLeb128Error {
313
    /// Invalid LEB128 number.
314
    InvalidLeb128,
315
    /// Reading side of the stream is closed.
316
    ReadClosed,
317
    /// Number of bytes decoded is larger than expected.
318
    TooLarge,
319
}
320
321
#[cfg(test)]
322
mod tests {
323
    use super::{IncomingBytesTakeError, ReadWrite};
324
325
    #[test]
326
1
    fn take_bytes() {
327
1
        let mut rw = ReadWrite {
328
1
            now: 0,
329
1
            incoming_buffer: vec![0x80; 64],
330
1
            expected_incoming_bytes: Some(12),
331
1
            read_bytes: 2,
332
1
            write_buffers: Vec::new(),
333
1
            write_bytes_queued: 0,
334
1
            write_bytes_queueable: None,
335
1
            wake_up_after: None,
336
1
        };
337
1
338
1
        let buffer = rw.incoming_bytes_take(5).unwrap().unwrap();
339
1
        assert_eq!(buffer, &[0x80, 0x80, 0x80, 0x80, 0x80]);
340
1
        assert_eq!(rw.incoming_buffer.len(), 59);
341
1
        assert_eq!(rw.read_bytes, 7);
342
1
        assert_eq!(rw.expected_incoming_bytes, Some(7));
343
344
1
        assert!(
matches!0
(rw.incoming_bytes_take(1000), Ok(None)));
345
1
        assert_eq!(rw.read_bytes, 7);
346
1
        assert_eq!(rw.expected_incoming_bytes, Some(1000));
347
348
1
        let buffer = rw.incoming_bytes_take(57).unwrap().unwrap();
349
1
        assert_eq!(buffer.len(), 57);
350
1
        assert_eq!(rw.incoming_buffer.len(), 2);
351
1
        assert_eq!(rw.read_bytes, 64);
352
1
        assert_eq!(rw.expected_incoming_bytes, Some(1000 - 57));
353
1
    }
354
355
    #[test]
356
1
    fn take_bytes_closed() {
357
1
        let mut rw = ReadWrite {
358
1
            now: 0,
359
1
            incoming_buffer: vec![0x80; 64],
360
1
            expected_incoming_bytes: None,
361
1
            read_bytes: 2,
362
1
            write_buffers: Vec::new(),
363
1
            write_bytes_queued: 0,
364
1
            write_bytes_queueable: None,
365
1
            wake_up_after: None,
366
1
        };
367
1
368
1
        assert!(
matches!0
(
369
1
            rw.incoming_bytes_take(1000),
370
            Err(IncomingBytesTakeError::ReadClosed)
371
        ));
372
1
        assert_eq!(rw.expected_incoming_bytes, None);
373
374
1
        let buffer = rw.incoming_bytes_take(5).unwrap().unwrap();
375
1
        assert_eq!(buffer, &[0x80, 0x80, 0x80, 0x80, 0x80]);
376
1
        assert_eq!(rw.incoming_buffer.len(), 59);
377
1
        assert_eq!(rw.read_bytes, 7);
378
1
        assert_eq!(rw.expected_incoming_bytes, None);
379
380
1
        assert!(
matches!0
(
381
1
            rw.incoming_bytes_take(1000),
382
            Err(IncomingBytesTakeError::ReadClosed)
383
        ));
384
1
        assert_eq!(rw.expected_incoming_bytes, None);
385
1
    }
386
387
    #[test]
388
1
    fn write_out() {
389
1
        let mut rw = ReadWrite {
390
1
            now: 0,
391
1
            incoming_buffer: Vec::new(),
392
1
            expected_incoming_bytes: None,
393
1
            read_bytes: 0,
394
1
            write_buffers: Vec::new(),
395
1
            write_bytes_queued: 11,
396
1
            write_bytes_queueable: Some(10),
397
1
            wake_up_after: None,
398
1
        };
399
1
400
1
        rw.write_out(b"hello".to_vec());
401
1
        assert_eq!(rw.write_buffers.len(), 1);
402
1
        assert_eq!(rw.write_bytes_queued, 16);
403
1
        assert_eq!(rw.write_bytes_queueable, Some(5));
404
1
    }
405
406
    #[test]
407
1
    fn write_from_vec_deque_smaller() {
408
1
        let mut input = [1, 2, 3, 4].iter().cloned().collect();
409
1
410
1
        let mut rw = ReadWrite {
411
1
            now: 0,
412
1
            incoming_buffer: Vec::new(),
413
1
            expected_incoming_bytes: None,
414
1
            read_bytes: 0,
415
1
            write_buffers: Vec::new(),
416
1
            write_bytes_queueable: Some(5),
417
1
            write_bytes_queued: 5,
418
1
            wake_up_after: None,
419
1
        };
420
1
421
1
        rw.write_from_vec_deque(&mut input);
422
1
        assert!(input.is_empty());
423
1
        assert_eq!(rw.write_bytes_queued, 9);
424
1
        assert_eq!(rw.write_bytes_queueable, Some(1));
425
1
    }
426
427
    #[test]
428
1
    fn write_from_vec_deque_larger() {
429
1
        let mut input = [1, 2, 3, 4, 5, 6].iter().cloned().collect();
430
1
431
1
        let mut rw = ReadWrite {
432
1
            now: 0,
433
1
            incoming_buffer: Vec::new(),
434
1
            expected_incoming_bytes: None,
435
1
            read_bytes: 0,
436
1
            write_buffers: Vec::new(),
437
1
            write_bytes_queueable: Some(5),
438
1
            write_bytes_queued: 5,
439
1
            wake_up_after: None,
440
1
        };
441
1
442
1
        rw.write_from_vec_deque(&mut input);
443
1
        assert_eq!(input.into_iter().collect::<Vec<_>>(), &[6]);
444
1
        assert_eq!(rw.write_bytes_queued, 10);
445
1
        assert_eq!(rw.write_bytes_queueable, Some(0));
446
1
    }
447
}