/__w/smoldot/smoldot/repo/lib/src/libp2p/with_buffers.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | #![cfg(feature = "std")] |
19 | | #![cfg_attr(docsrs, doc(cfg(feature = "std")))] |
20 | | |
21 | | //! Augments an implementation of `AsyncRead` and `AsyncWrite` with a read buffer and a write |
22 | | //! buffer. |
23 | | //! |
24 | | //! While this module is generic, the targeted use-case is TCP connections. |
25 | | |
26 | | // TODO: usage and example |
27 | | |
28 | | use crate::libp2p::read_write; |
29 | | |
30 | | use core::{ |
31 | | fmt, future, mem, ops, |
32 | | pin::{self, Pin}, |
33 | | task::Poll, |
34 | | }; |
35 | | use futures_util::{AsyncRead, AsyncWrite}; |
36 | | use std::io; |
37 | | |
38 | | /// Holds an implementation of `AsyncRead` and `AsyncWrite`, alongside with a read buffer and a |
39 | | /// write buffer. |
40 | 0 | #[pin_project::pin_project] Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE7projectB8_ Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE11project_refB8_ Unexecuted instantiation: _RINvNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__24___assert_not_repr_packedpppEB8_ Unexecuted instantiation: _RNvXININvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__s3_0pppEINtB7_11WithBufferspppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1g_6future6future6Futurep6OutputINtNtB1g_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB4b_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1g_6marker4SendEL_EEB3h_NtNtB5x_4time7InstantE7projectCsih6EgvAwZF2_13smoldot_light Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE7projectB8_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE11project_refB8_ Unexecuted instantiation: _RINvNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__24___assert_not_repr_packedpppEB8_ Unexecuted instantiation: _RNvXININvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__s3_0pppEINtB7_11WithBufferspppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE7projectB1j_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3t_EENtNtCsbpXXxgr6u8g_3std4time7InstantE7projectB1j_ |
41 | | pub struct WithBuffers<TSocketFut, TSocket, TNow> { |
42 | | /// Actual socket to read from/write to. |
43 | | #[pin] |
44 | | socket: Socket<TSocketFut, TSocket>, |
45 | | /// Error that has happened on the socket, if any. |
46 | | error: Option<io::Error>, |
47 | | /// Storage for data read from the socket. The first [`WithBuffers::read_buffer_valid`] bytes |
48 | | /// contain actual socket data, while the rest contains garbage data. |
49 | | /// The capacity of this buffer is at least equal to the amount of bytes requested by the |
50 | | /// inner data consumer. |
51 | | read_buffer: Vec<u8>, |
52 | | /// Number of bytes of data in [`WithBuffers::read_buffer`] that contain actual data. |
53 | | read_buffer_valid: usize, |
54 | | read_buffer_reasonable_capacity: usize, |
55 | | /// True if reading from the socket has returned `Ok(0)` earlier, in other words "end of |
56 | | /// file". |
57 | | read_closed: bool, |
58 | | /// Storage for data to write to the socket. |
59 | | write_buffers: Vec<Vec<u8>>, |
60 | | /// True if the consumer has closed the writing side earlier. |
61 | | write_closed: bool, |
62 | | /// True if the consumer has closed the writing side earlier, and the socket still has to |
63 | | /// be closed. |
64 | | close_pending: bool, |
65 | | /// True if data has been written on the socket and the socket needs to be flushed. |
66 | | flush_pending: bool, |
67 | | |
68 | | /// Value of [`read_write::ReadWrite::now`] that was fed by the latest call to |
69 | | /// [`WithBuffers::read_write_access`]. |
70 | | read_write_now: Option<TNow>, |
71 | | /// Value of [`read_write::ReadWrite::wake_up_after`] produced by the latest call |
72 | | /// to [`WithBuffers::read_write_access`]. |
73 | | read_write_wake_up_after: Option<TNow>, |
74 | | } |
75 | | |
76 | 0 | #[pin_project::pin_project(project = SocketProj)] Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketppE7projectB8_ Unexecuted instantiation: _RNvXININvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss_1__s3_0ppEINtB7_6SocketppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1c_6future6future6Futurep6OutputINtNtB1c_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB47_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1c_6marker4SendEL_EEB3d_E7projectCsih6EgvAwZF2_13smoldot_light Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketppE7projectB8_ Unexecuted instantiation: _RNvXININvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__s3_0ppEINtB7_6SocketppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamE7projectB1f_ Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3p_EEE7projectB1f_ |
77 | | enum Socket<TSocketFut, TSocket> { |
78 | | Pending(#[pin] TSocketFut), |
79 | | Resolved(#[pin] TSocket), |
80 | | } |
81 | | |
82 | | impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow> |
83 | | where |
84 | | TNow: Clone + Ord, |
85 | | { |
86 | | /// Initializes a new [`WithBuffers`] with the given socket-yielding future. |
87 | 0 | pub fn new(socket: TSocketFut) -> Self { |
88 | 0 | let read_buffer_reasonable_capacity = 65536; // TODO: make configurable? |
89 | 0 |
|
90 | 0 | WithBuffers { |
91 | 0 | socket: Socket::Pending(socket), |
92 | 0 | error: None, |
93 | 0 | read_buffer: Vec::with_capacity(read_buffer_reasonable_capacity), |
94 | 0 | read_buffer_valid: 0, |
95 | 0 | read_buffer_reasonable_capacity, |
96 | 0 | read_closed: false, |
97 | 0 | write_buffers: Vec::with_capacity(64), |
98 | 0 | write_closed: false, |
99 | 0 | close_pending: false, |
100 | 0 | flush_pending: false, |
101 | 0 | read_write_now: None, |
102 | 0 | read_write_wake_up_after: None, |
103 | 0 | } |
104 | 0 | } Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE3newB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1b_6future6future6Futurep6OutputINtNtB1b_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB46_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1b_6marker4SendEL_EEB3c_NtNtB5s_4time7InstantE3newCsih6EgvAwZF2_13smoldot_light Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE3newB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE3newB1e_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB3o_EENtNtCsbpXXxgr6u8g_3std4time7InstantE3newB1e_ |
105 | | |
106 | | /// Returns an object that implements `Deref<Target = ReadWrite>`. This object can be used |
107 | | /// to push or pull data to/from the socket. |
108 | | /// |
109 | | /// > **Note**: The parameter requires `Self` to be pinned for consistency with |
110 | | /// > [`WithBuffers::wait_read_write_again`]. |
111 | 0 | pub fn read_write_access( |
112 | 0 | self: Pin<&mut Self>, |
113 | 0 | now: TNow, |
114 | 0 | ) -> Result<ReadWriteAccess<TNow>, &io::Error> { |
115 | 0 | let this = self.project(); |
116 | 0 |
|
117 | 0 | debug_assert!(this |
118 | 0 | .read_write_now |
119 | 0 | .as_ref() |
120 | 0 | .map_or(true, |old_now| *old_now <= now)); Unexecuted instantiation: _RNCNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB4_11WithBufferspppE17read_write_access0B8_ Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1d_6future6future6Futurep6OutputINtNtB1d_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB48_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1d_6marker4SendEL_EEB3e_NtNtB5u_4time7InstantE17read_write_access0Csih6EgvAwZF2_13smoldot_light Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBufferspppE17read_write_access0B8_ Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_access0B1g_ Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3q_EENtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_access0B1g_ |
121 | 0 | *this.read_write_wake_up_after = None; |
122 | 0 | *this.read_write_now = Some(now.clone()); |
123 | | |
124 | 0 | if let Some(error) = this.error.as_ref() { |
125 | 0 | return Err(error); |
126 | 0 | } |
127 | 0 |
|
128 | 0 | this.read_buffer.truncate(*this.read_buffer_valid); |
129 | | |
130 | 0 | let is_resolved = matches!(*this.socket, Socket::Resolved(_)); |
131 | | |
132 | 0 | let write_bytes_queued = this.write_buffers.iter().map(Vec::len).sum(); |
133 | 0 |
|
134 | 0 | Ok(ReadWriteAccess { |
135 | 0 | read_buffer_len_before: this.read_buffer.len(), |
136 | 0 | write_buffers_len_before: this.write_buffers.len(), |
137 | 0 | read_write: read_write::ReadWrite { |
138 | 0 | now, |
139 | 0 | incoming_buffer: mem::take(this.read_buffer), |
140 | 0 | expected_incoming_bytes: if !*this.read_closed { Some(0) } else { None }, |
141 | | read_bytes: 0, |
142 | 0 | write_bytes_queued, |
143 | 0 | write_buffers: mem::take(this.write_buffers), |
144 | 0 | write_bytes_queueable: if !is_resolved { |
145 | 0 | Some(0) |
146 | 0 | } else if !*this.write_closed { |
147 | | // Limit outgoing buffer size to 128kiB. |
148 | | // TODO: make configurable? |
149 | 0 | Some((128 * 1024usize).saturating_sub(write_bytes_queued)) |
150 | | } else { |
151 | 0 | None |
152 | | }, |
153 | 0 | wake_up_after: this.read_write_wake_up_after.take(), |
154 | 0 | }, |
155 | 0 | read_buffer: this.read_buffer, |
156 | 0 | read_buffer_valid: this.read_buffer_valid, |
157 | 0 | read_buffer_reasonable_capacity: *this.read_buffer_reasonable_capacity, |
158 | 0 | write_buffers: this.write_buffers, |
159 | 0 | write_closed: this.write_closed, |
160 | 0 | close_pending: this.close_pending, |
161 | 0 | read_write_wake_up_after: this.read_write_wake_up_after, |
162 | | }) |
163 | 0 | } Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE17read_write_accessB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1b_6future6future6Futurep6OutputINtNtB1b_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB46_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1b_6marker4SendEL_EEB3c_NtNtB5s_4time7InstantE17read_write_accessCsih6EgvAwZF2_13smoldot_light Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE17read_write_accessB6_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_accessB1e_ Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB3o_EENtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_accessB1e_ |
164 | | } |
165 | | |
166 | | impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow> |
167 | | where |
168 | | TSocket: AsyncRead + AsyncWrite, |
169 | | TSocketFut: future::Future<Output = Result<TSocket, io::Error>>, |
170 | | TNow: Clone + Ord, |
171 | | { |
172 | | /// Waits until [`WithBuffers::read_write_access`] should be called again. |
173 | | /// |
174 | | /// Returns immediately if [`WithBuffers::read_write_access`] has never been called. |
175 | | /// |
176 | | /// Returns if an error happens on the socket. If an error happened in the past on the socket, |
177 | | /// the future never yields. |
178 | 0 | pub async fn wait_read_write_again<F>( |
179 | 0 | self: Pin<&mut Self>, |
180 | 0 | timer_builder: impl FnOnce(TNow) -> F, |
181 | 0 | ) where |
182 | 0 | F: future::Future<Output = ()>, |
183 | 0 | { Unexecuted instantiation: _RINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB5_11WithBufferspppE21wait_read_write_againppEB9_ Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1e_6future6future6Futurep6OutputINtNtB1e_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB7_9websocket10ConnectionB49_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1e_6marker4SendEL_EEB3f_NtNtB5v_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1K_4sync3ArcNtB7o_15DefaultPlatformENtB7q_11PlatformRef21wait_read_write_again00NCB7j_0EB7s_ Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBufferspppE21wait_read_write_againppEB9_ Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1f_5tasks15connection_taskB2r_B19_E0s_00NCB48_s_0EB1h_ Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB7_9websocket10ConnectionB3r_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1d_15connection_taskB2x_B19_E0s_00NCB5I_s_0EB1h_ |
184 | 0 | let mut this = self.project(); |
185 | 0 |
|
186 | 0 | // Return immediately if `read_write_access` was never called or if `wake_up_after <= now`. |
187 | 0 | match (&*this.read_write_wake_up_after, &*this.read_write_now) { |
188 | 0 | (_, None) => return, |
189 | 0 | (Some(when_wake_up), Some(now)) if *when_wake_up <= *now => { |
190 | 0 | return; |
191 | | } |
192 | 0 | _ => {} |
193 | 0 | } |
194 | 0 |
|
195 | 0 | let mut timer = pin::pin!({ |
196 | 0 | let fut = this |
197 | 0 | .read_write_wake_up_after |
198 | 0 | .as_ref() |
199 | 0 | .map(|when| timer_builder(when.clone())); Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s_0Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E0s_0B7w_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s_0Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E0s_0B1l_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E0s_0B1l_ |
200 | 0 | async { |
201 | 0 | if let Some(fut) = fut { |
202 | 0 | fut.await; |
203 | | } else { |
204 | 0 | future::pending::<()>().await; |
205 | | } |
206 | 0 | } Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s0_0Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E0s0_0B7w_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s0_0Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E0s0_0B1l_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E0s0_0B1l_ |
207 | 0 | }); |
208 | 0 |
|
209 | 0 | // Grow the read buffer in order to make space for potentially more data. |
210 | 0 | this.read_buffer.resize(this.read_buffer.capacity(), 0); |
211 | 0 |
|
212 | 0 | future::poll_fn(move |cx| { |
213 | 0 | if this.error.is_some() { |
214 | | // Never return. |
215 | 0 | return Poll::Pending; |
216 | 0 | } |
217 | 0 |
|
218 | 0 | // If still `true` at the end of the function, `Poll::Pending` is returned. |
219 | 0 | let mut pending = true; |
220 | 0 |
|
221 | 0 | match future::Future::poll(Pin::new(&mut timer), cx) { |
222 | 0 | Poll::Pending => {} |
223 | 0 | Poll::Ready(()) => { |
224 | 0 | pending = false; |
225 | 0 | } |
226 | | } |
227 | | |
228 | 0 | match this.socket.as_mut().project() { |
229 | 0 | SocketProj::Pending(future) => match future::Future::poll(future, cx) { |
230 | 0 | Poll::Pending => {} |
231 | 0 | Poll::Ready(Ok(socket)) => { |
232 | 0 | this.socket.set(Socket::Resolved(socket)); |
233 | 0 | pending = false; |
234 | 0 | } |
235 | 0 | Poll::Ready(Err(err)) => { |
236 | 0 | *this.error = Some(err); |
237 | 0 | return Poll::Ready(()); |
238 | | } |
239 | | }, |
240 | 0 | SocketProj::Resolved(mut socket) => { |
241 | 0 | if !*this.read_closed && *this.read_buffer_valid < this.read_buffer.len() { |
242 | 0 | let read_result = AsyncRead::poll_read( |
243 | 0 | socket.as_mut(), |
244 | 0 | cx, |
245 | 0 | &mut this.read_buffer[*this.read_buffer_valid..], |
246 | 0 | ); |
247 | | |
248 | 0 | match read_result { |
249 | 0 | Poll::Pending => {} |
250 | 0 | Poll::Ready(Ok(0)) => { |
251 | 0 | *this.read_closed = true; |
252 | 0 | pending = false; |
253 | 0 | } |
254 | 0 | Poll::Ready(Ok(n)) => { |
255 | 0 | *this.read_buffer_valid += n; |
256 | 0 | // TODO: consider waking up only if the expected bytes of the consumer are exceeded |
257 | 0 | pending = false; |
258 | 0 | } |
259 | 0 | Poll::Ready(Err(err)) => { |
260 | 0 | *this.error = Some(err); |
261 | 0 | return Poll::Ready(()); |
262 | | } |
263 | | }; |
264 | 0 | } |
265 | | |
266 | 0 | loop { |
267 | 0 | if this.write_buffers.iter().any(|b| !b.is_empty()) { Unexecuted instantiation: _RNCNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE000Bf_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1k_6future6future6Futurep6OutputINtNtB1k_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB4f_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1k_6marker4SendEL_EEB3l_NtNtB5B_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1Q_4sync3ArcNtB7u_15DefaultPlatformENtB7w_11PlatformRef21wait_read_write_again00NCB7p_0E000B7y_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE000Bf_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1l_5tasks15connection_taskB2x_B1f_E0s_00NCB4e_s_0E000B1n_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB3x_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1j_15connection_taskB2D_B1f_E0s_00NCB5O_s_0E000B1n_ |
268 | 0 | let write_result = { |
269 | 0 | let buffers = this |
270 | 0 | .write_buffers |
271 | 0 | .iter() |
272 | 0 | .map(|buf| io::IoSlice::new(buf)) Unexecuted instantiation: _RNCNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE00s_0Bf_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1k_6future6future6Futurep6OutputINtNtB1k_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB4f_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1k_6marker4SendEL_EEB3l_NtNtB5B_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1Q_4sync3ArcNtB7u_15DefaultPlatformENtB7w_11PlatformRef21wait_read_write_again00NCB7p_0E00s_0B7y_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE00s_0Bf_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1l_5tasks15connection_taskB2x_B1f_E0s_00NCB4e_s_0E00s_0B1n_ Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB3x_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1j_15connection_taskB2D_B1f_E0s_00NCB5O_s_0E00s_0B1n_ |
273 | 0 | .collect::<Vec<_>>(); |
274 | 0 | AsyncWrite::poll_write_vectored(socket.as_mut(), cx, &buffers) |
275 | | }; |
276 | | |
277 | 0 | match write_result { |
278 | | Poll::Ready(Ok(0)) => { |
279 | | // It is not legal for `poll_write` to return 0 bytes written. |
280 | 0 | unreachable!(); |
281 | | } |
282 | 0 | Poll::Ready(Ok(mut n)) => { |
283 | 0 | *this.flush_pending = true; |
284 | 0 | while n > 0 { |
285 | 0 | let first_buf = this.write_buffers.first_mut().unwrap(); |
286 | 0 | if first_buf.len() <= n { |
287 | 0 | n -= first_buf.len(); |
288 | 0 | this.write_buffers.remove(0); |
289 | 0 | } else { |
290 | | // TODO: consider keeping the buffer as is but starting the next write at a later offset |
291 | 0 | first_buf.copy_within(n.., 0); |
292 | 0 | first_buf.truncate(first_buf.len() - n); |
293 | 0 | break; |
294 | | } |
295 | | } |
296 | | // Wake up if the write buffers switch from non-empty to empty. |
297 | 0 | if this.write_buffers.is_empty() { |
298 | 0 | pending = false; |
299 | 0 | } |
300 | | } |
301 | 0 | Poll::Ready(Err(err)) => { |
302 | 0 | *this.error = Some(err); |
303 | 0 | return Poll::Ready(()); |
304 | | } |
305 | 0 | Poll::Pending => break, |
306 | | }; |
307 | 0 | } else if *this.flush_pending { |
308 | 0 | match AsyncWrite::poll_flush(socket.as_mut(), cx) { |
309 | 0 | Poll::Ready(Ok(())) => { |
310 | 0 | *this.flush_pending = false; |
311 | 0 | } |
312 | 0 | Poll::Ready(Err(err)) => { |
313 | 0 | *this.error = Some(err); |
314 | 0 | return Poll::Ready(()); |
315 | | } |
316 | 0 | Poll::Pending => break, |
317 | | } |
318 | 0 | } else if *this.close_pending { |
319 | 0 | match AsyncWrite::poll_close(socket.as_mut(), cx) { |
320 | | Poll::Ready(Ok(())) => { |
321 | 0 | *this.close_pending = false; |
322 | 0 | pending = false; |
323 | 0 | break; |
324 | | } |
325 | 0 | Poll::Ready(Err(err)) => { |
326 | 0 | *this.error = Some(err); |
327 | 0 | return Poll::Ready(()); |
328 | | } |
329 | 0 | Poll::Pending => break, |
330 | | } |
331 | | } else { |
332 | 0 | break; |
333 | | } |
334 | | } |
335 | | } |
336 | | }; |
337 | | |
338 | 0 | if !pending { |
339 | 0 | Poll::Ready(()) |
340 | | } else { |
341 | 0 | Poll::Pending |
342 | | } |
343 | 0 | }) Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE00Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E00B7w_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE00Bd_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E00B1l_ Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E00B1l_ |
344 | 0 | .await; |
345 | 0 | } Unexecuted instantiation: _RNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB7_11WithBufferspppE21wait_read_write_againppE0Bb_ Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1g_6future6future6Futurep6OutputINtNtB1g_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB9_9websocket10ConnectionB4b_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1g_6marker4SendEL_EEB3h_NtNtB5x_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1M_4sync3ArcNtB7q_15DefaultPlatformENtB7s_11PlatformRef21wait_read_write_again00NCB7l_0E0B7u_ Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBufferspppE21wait_read_write_againppE0Bb_ Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1h_5tasks15connection_taskB2t_B1b_E0s_00NCB4a_s_0E0B1j_ Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB9_9websocket10ConnectionB3t_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1f_15connection_taskB2z_B1b_E0s_00NCB5K_s_0E0B1j_ |
346 | | } |
347 | | |
348 | | impl<TSocketFut, TSocket: fmt::Debug, TNow> fmt::Debug for WithBuffers<TSocketFut, TSocket, TNow> { |
349 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
350 | 0 | let mut t = f.debug_tuple("WithBuffers"); |
351 | 0 | if let Socket::Resolved(socket) = &self.socket { |
352 | 0 | t.field(socket); |
353 | 0 | } else { |
354 | 0 | t.field(&"<pending>"); |
355 | 0 | } |
356 | 0 | t.finish() |
357 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss0_0pppEINtB5_11WithBufferspppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtB9_ Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss0_0pppEINtB5_11WithBufferspppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtB9_ |
358 | | } |
359 | | |
360 | | /// See [`WithBuffers::read_write_access`]. |
361 | | pub struct ReadWriteAccess<'a, TNow: Clone> { |
362 | | read_write: read_write::ReadWrite<TNow>, |
363 | | |
364 | | read_buffer_len_before: usize, |
365 | | write_buffers_len_before: usize, |
366 | | |
367 | | // Fields below as references from the content of the `WithBuffers`. |
368 | | read_buffer: &'a mut Vec<u8>, |
369 | | read_buffer_valid: &'a mut usize, |
370 | | read_buffer_reasonable_capacity: usize, |
371 | | write_buffers: &'a mut Vec<Vec<u8>>, |
372 | | write_closed: &'a mut bool, |
373 | | close_pending: &'a mut bool, |
374 | | read_write_wake_up_after: &'a mut Option<TNow>, |
375 | | } |
376 | | |
377 | | impl<'a, TNow: Clone> ops::Deref for ReadWriteAccess<'a, TNow> { |
378 | | type Target = read_write::ReadWrite<TNow>; |
379 | | |
380 | 0 | fn deref(&self) -> &Self::Target { |
381 | 0 | &self.read_write |
382 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss1_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefB9_ Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss1_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefB9_ Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefCsiUjFBJteJ7x_17smoldot_full_node |
383 | | } |
384 | | |
385 | | impl<'a, TNow: Clone> ops::DerefMut for ReadWriteAccess<'a, TNow> { |
386 | 0 | fn deref_mut(&mut self) -> &mut Self::Target { |
387 | 0 | &mut self.read_write |
388 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss2_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutB9_ Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss2_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutB9_ Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutCsiUjFBJteJ7x_17smoldot_full_node |
389 | | } |
390 | | |
391 | | impl<'a, TNow: Clone> Drop for ReadWriteAccess<'a, TNow> { |
392 | 0 | fn drop(&mut self) { |
393 | 0 | *self.read_buffer = mem::take(&mut self.read_write.incoming_buffer); |
394 | 0 | *self.read_buffer_valid = self.read_buffer.len(); |
395 | | |
396 | | // Adjust `read_buffer` to the number of bytes requested by the consumer. |
397 | 0 | if let Some(expected_incoming_bytes) = self.read_write.expected_incoming_bytes { |
398 | 0 | if expected_incoming_bytes < self.read_buffer_reasonable_capacity |
399 | 0 | && self.read_buffer.is_empty() |
400 | 0 | { |
401 | 0 | // We use `shrink_to(0)` then `reserve(cap)` rather than just `shrink_to(cap)` |
402 | 0 | // so that the `Vec` doesn't try to preserve the data in the read buffer. |
403 | 0 | self.read_buffer.shrink_to(0); |
404 | 0 | self.read_buffer |
405 | 0 | .reserve(self.read_buffer_reasonable_capacity); |
406 | 0 | } else if expected_incoming_bytes > self.read_buffer.len() { |
407 | 0 | self.read_buffer |
408 | 0 | .reserve(expected_incoming_bytes - self.read_buffer.len()); |
409 | 0 | } |
410 | 0 | debug_assert!(self.read_buffer.capacity() >= expected_incoming_bytes); |
411 | 0 | } |
412 | | |
413 | 0 | *self.write_buffers = mem::take(&mut self.read_write.write_buffers); |
414 | 0 |
|
415 | 0 | if self.read_write.write_bytes_queueable.is_none() && !*self.write_closed { |
416 | 0 | *self.write_closed = true; |
417 | 0 | *self.close_pending = true; |
418 | 0 | } |
419 | | |
420 | 0 | *self.read_write_wake_up_after = self.read_write.wake_up_after.take(); |
421 | 0 |
|
422 | 0 | // If the consumer has advanced its reading or writing sides, we make the next call to |
423 | 0 | // `read_write_access` return immediately by setting `wake_up_after`. |
424 | 0 | if (self.read_buffer_len_before != self.read_buffer.len() |
425 | 0 | && self |
426 | 0 | .read_write |
427 | 0 | .expected_incoming_bytes |
428 | 0 | .map_or(false, |b| b <= self.read_buffer.len())) Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss3_0pEINtB7_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0Bb_ Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss3_0pEINtB7_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0Bb_ Unexecuted instantiation: _RNCNvXs3_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0CsiUjFBJteJ7x_17smoldot_full_node |
429 | 0 | || (self.write_buffers_len_before != self.write_buffers.len() && !*self.write_closed) |
430 | 0 | { |
431 | 0 | *self.read_write_wake_up_after = Some(self.read_write.now.clone()); |
432 | 0 | } |
433 | 0 | } Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss3_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropB9_ Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss3_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropB9_ Unexecuted instantiation: _RNvXs3_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropCsiUjFBJteJ7x_17smoldot_full_node |
434 | | } |
435 | | |
436 | | // TODO: tests |