Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/with_buffers.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
#![cfg(feature = "std")]
19
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]
20
21
//! Augments an implementation of `AsyncRead` and `AsyncWrite` with a read buffer and a write
22
//! buffer.
23
//!
24
//! While this module is generic, the targeted use-case is TCP connections.
25
26
// TODO: usage and example
27
28
use crate::libp2p::read_write;
29
30
use core::{
31
    fmt, future, mem, ops,
32
    pin::{self, Pin},
33
    task::Poll,
34
};
35
use futures_util::{AsyncRead, AsyncWrite};
36
use std::io;
37
38
/// Holds an implementation of `AsyncRead` and `AsyncWrite`, alongside with a read buffer and a
39
/// write buffer.
40
0
#[pin_project::pin_project]
Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE7projectB8_
Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE11project_refB8_
Unexecuted instantiation: _RINvNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__24___assert_not_repr_packedpppEB8_
Unexecuted instantiation: _RNvXININvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffers1__s3_0pppEINtB7_11WithBufferspppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1g_6future6future6Futurep6OutputINtNtB1g_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB4b_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1g_6marker4SendEL_EEB3h_NtNtB5x_4time7InstantE7projectCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE7projectB8_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBufferspppE11project_refB8_
Unexecuted instantiation: _RINvNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__24___assert_not_repr_packedpppEB8_
Unexecuted instantiation: _RNvXININvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__s3_0pppEINtB7_11WithBufferspppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE7projectB1j_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffers1__INtB4_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3t_EENtNtCsbpXXxgr6u8g_3std4time7InstantE7projectB1j_
41
pub struct WithBuffers<TSocketFut, TSocket, TNow> {
42
    /// Actual socket to read from/write to.
43
    #[pin]
44
    socket: Socket<TSocketFut, TSocket>,
45
    /// Error that has happened on the socket, if any.
46
    error: Option<io::Error>,
47
    /// Storage for data read from the socket. The first [`WithBuffers::read_buffer_valid`] bytes
48
    /// contain actual socket data, while the rest contains garbage data.
49
    /// The capacity of this buffer is at least equal to the amount of bytes requested by the
50
    /// inner data consumer.
51
    read_buffer: Vec<u8>,
52
    /// Number of bytes of data in [`WithBuffers::read_buffer`] that contain actual data.
53
    read_buffer_valid: usize,
54
    read_buffer_reasonable_capacity: usize,
55
    /// True if reading from the socket has returned `Ok(0)` earlier, in other words "end of
56
    /// file".
57
    read_closed: bool,
58
    /// Storage for data to write to the socket.
59
    write_buffers: Vec<Vec<u8>>,
60
    /// True if the consumer has closed the writing side earlier.
61
    write_closed: bool,
62
    /// True if the consumer has closed the writing side earlier, and the socket still has to
63
    /// be closed.
64
    close_pending: bool,
65
    /// True if data has been written on the socket and the socket needs to be flushed.
66
    flush_pending: bool,
67
68
    /// Value of [`read_write::ReadWrite::now`] that was fed by the latest call to
69
    /// [`WithBuffers::read_write_access`].
70
    read_write_now: Option<TNow>,
71
    /// Value of [`read_write::ReadWrite::wake_up_after`] produced by the latest call
72
    /// to [`WithBuffers::read_write_access`].
73
    read_write_wake_up_after: Option<TNow>,
74
}
75
76
0
#[pin_project::pin_project(project = SocketProj)]
Unexecuted instantiation: _RNvMNvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketppE7projectB8_
Unexecuted instantiation: _RNvXININvNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss_1__s3_0ppEINtB7_6SocketppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1c_6future6future6Futurep6OutputINtNtB1c_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB47_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1c_6marker4SendEL_EEB3d_E7projectCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketppE7projectB8_
Unexecuted instantiation: _RNvXININvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__s3_0ppEINtB7_6SocketppENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBb_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamE7projectB1f_
Unexecuted instantiation: _RNvMNvNtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss_1__INtB4_6SocketNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3p_EEE7projectB1f_
77
enum Socket<TSocketFut, TSocket> {
78
    Pending(#[pin] TSocketFut),
79
    Resolved(#[pin] TSocket),
80
}
81
82
impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow>
83
where
84
    TNow: Clone + Ord,
85
{
86
    /// Initializes a new [`WithBuffers`] with the given socket-yielding future.
87
0
    pub fn new(socket: TSocketFut) -> Self {
88
0
        let read_buffer_reasonable_capacity = 65536; // TODO: make configurable?
89
0
90
0
        WithBuffers {
91
0
            socket: Socket::Pending(socket),
92
0
            error: None,
93
0
            read_buffer: Vec::with_capacity(read_buffer_reasonable_capacity),
94
0
            read_buffer_valid: 0,
95
0
            read_buffer_reasonable_capacity,
96
0
            read_closed: false,
97
0
            write_buffers: Vec::with_capacity(64),
98
0
            write_closed: false,
99
0
            close_pending: false,
100
0
            flush_pending: false,
101
0
            read_write_now: None,
102
0
            read_write_wake_up_after: None,
103
0
        }
104
0
    }
Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE3newB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1b_6future6future6Futurep6OutputINtNtB1b_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB46_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1b_6marker4SendEL_EEB3c_NtNtB5s_4time7InstantE3newCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE3newB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE3newB1e_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB3o_EENtNtCsbpXXxgr6u8g_3std4time7InstantE3newB1e_
105
106
    /// Returns an object that implements `Deref<Target = ReadWrite>`. This object can be used
107
    /// to push or pull data to/from the socket.
108
    ///
109
    /// > **Note**: The parameter requires `Self` to be pinned for consistency with
110
    /// >           [`WithBuffers::wait_read_write_again`].
111
0
    pub fn read_write_access(
112
0
        self: Pin<&mut Self>,
113
0
        now: TNow,
114
0
    ) -> Result<ReadWriteAccess<TNow>, &io::Error> {
115
0
        let this = self.project();
116
0
117
0
        debug_assert!(this
118
0
            .read_write_now
119
0
            .as_ref()
120
0
            .map_or(true, |old_now| *old_now <= now));
Unexecuted instantiation: _RNCNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB4_11WithBufferspppE17read_write_access0B8_
Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1d_6future6future6Futurep6OutputINtNtB1d_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB48_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1d_6marker4SendEL_EEB3e_NtNtB5u_4time7InstantE17read_write_access0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBufferspppE17read_write_access0B8_
Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_access0B1g_
Unexecuted instantiation: _RNCNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB4_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB6_9websocket10ConnectionB3q_EENtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_access0B1g_
121
0
        *this.read_write_wake_up_after = None;
122
0
        *this.read_write_now = Some(now.clone());
123
124
0
        if let Some(error) = this.error.as_ref() {
125
0
            return Err(error);
126
0
        }
127
0
128
0
        this.read_buffer.truncate(*this.read_buffer_valid);
129
130
0
        let is_resolved = matches!(*this.socket, Socket::Resolved(_));
131
132
0
        let write_bytes_queued = this.write_buffers.iter().map(Vec::len).sum();
133
0
134
0
        Ok(ReadWriteAccess {
135
0
            read_buffer_len_before: this.read_buffer.len(),
136
0
            write_buffers_len_before: this.write_buffers.len(),
137
0
            read_write: read_write::ReadWrite {
138
0
                now,
139
0
                incoming_buffer: mem::take(this.read_buffer),
140
0
                expected_incoming_bytes: if !*this.read_closed { Some(0) } else { None },
141
                read_bytes: 0,
142
0
                write_bytes_queued,
143
0
                write_buffers: mem::take(this.write_buffers),
144
0
                write_bytes_queueable: if !is_resolved {
145
0
                    Some(0)
146
0
                } else if !*this.write_closed {
147
                    // Limit outgoing buffer size to 128kiB.
148
                    // TODO: make configurable?
149
0
                    Some((128 * 1024usize).saturating_sub(write_bytes_queued))
150
                } else {
151
0
                    None
152
                },
153
0
                wake_up_after: this.read_write_wake_up_after.take(),
154
0
            },
155
0
            read_buffer: this.read_buffer,
156
0
            read_buffer_valid: this.read_buffer_valid,
157
0
            read_buffer_reasonable_capacity: *this.read_buffer_reasonable_capacity,
158
0
            write_buffers: this.write_buffers,
159
0
            write_closed: this.write_closed,
160
0
            close_pending: this.close_pending,
161
0
            read_write_wake_up_after: this.read_write_wake_up_after,
162
        })
163
0
    }
Unexecuted instantiation: _RNvMNtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE17read_write_accessB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1b_6future6future6Futurep6OutputINtNtB1b_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB46_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1b_6marker4SendEL_EEB3c_NtNtB5s_4time7InstantE17read_write_accessCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBufferspppE17read_write_accessB6_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_accessB1e_
Unexecuted instantiation: _RNvMNtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB2_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB4_9websocket10ConnectionB3o_EENtNtCsbpXXxgr6u8g_3std4time7InstantE17read_write_accessB1e_
164
}
165
166
impl<TSocketFut, TSocket, TNow> WithBuffers<TSocketFut, TSocket, TNow>
167
where
168
    TSocket: AsyncRead + AsyncWrite,
169
    TSocketFut: future::Future<Output = Result<TSocket, io::Error>>,
170
    TNow: Clone + Ord,
171
{
172
    /// Waits until [`WithBuffers::read_write_access`] should be called again.
173
    ///
174
    /// Returns immediately if [`WithBuffers::read_write_access`] has never been called.
175
    ///
176
    /// Returns if an error happens on the socket. If an error happened in the past on the socket,
177
    /// the future never yields.
178
0
    pub async fn wait_read_write_again<F>(
179
0
        self: Pin<&mut Self>,
180
0
        timer_builder: impl FnOnce(TNow) -> F,
181
0
    ) where
182
0
        F: future::Future<Output = ()>,
183
0
    {
Unexecuted instantiation: _RINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB5_11WithBufferspppE21wait_read_write_againppEB9_
Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1e_6future6future6Futurep6OutputINtNtB1e_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB7_9websocket10ConnectionB49_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1e_6marker4SendEL_EEB3f_NtNtB5v_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1K_4sync3ArcNtB7o_15DefaultPlatformENtB7q_11PlatformRef21wait_read_write_again00NCB7j_0EB7s_
Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBufferspppE21wait_read_write_againppEB9_
Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1f_5tasks15connection_taskB2r_B19_E0s_00NCB48_s_0EB1h_
Unexecuted instantiation: _RINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB7_9websocket10ConnectionB3r_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1d_15connection_taskB2x_B19_E0s_00NCB5I_s_0EB1h_
184
0
        let mut this = self.project();
185
0
186
0
        // Return immediately if `read_write_access` was never called or if `wake_up_after <= now`.
187
0
        match (&*this.read_write_wake_up_after, &*this.read_write_now) {
188
0
            (_, None) => return,
189
0
            (Some(when_wake_up), Some(now)) if *when_wake_up <= *now => {
190
0
                return;
191
            }
192
0
            _ => {}
193
0
        }
194
0
195
0
        let mut timer = pin::pin!({
196
0
            let fut = this
197
0
                .read_write_wake_up_after
198
0
                .as_ref()
199
0
                .map(|when| timer_builder(when.clone()));
Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s_0Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E0s_0B7w_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s_0Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E0s_0B1l_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E0s_0B1l_
200
0
            async {
201
0
                if let Some(fut) = fut {
202
0
                    fut.await;
203
                } else {
204
0
                    future::pending::<()>().await;
205
                }
206
0
            }
Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s0_0Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E0s0_0B7w_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE0s0_0Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E0s0_0B1l_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E0s0_0B1l_
207
0
        });
208
0
209
0
        // Grow the read buffer in order to make space for potentially more data.
210
0
        this.read_buffer.resize(this.read_buffer.capacity(), 0);
211
0
212
0
        future::poll_fn(move |cx| {
213
0
            if this.error.is_some() {
214
                // Never return.
215
0
                return Poll::Pending;
216
0
            }
217
0
218
0
            // If still `true` at the end of the function, `Poll::Pending` is returned.
219
0
            let mut pending = true;
220
0
221
0
            match future::Future::poll(Pin::new(&mut timer), cx) {
222
0
                Poll::Pending => {}
223
0
                Poll::Ready(()) => {
224
0
                    pending = false;
225
0
                }
226
            }
227
228
0
            match this.socket.as_mut().project() {
229
0
                SocketProj::Pending(future) => match future::Future::poll(future, cx) {
230
0
                    Poll::Pending => {}
231
0
                    Poll::Ready(Ok(socket)) => {
232
0
                        this.socket.set(Socket::Resolved(socket));
233
0
                        pending = false;
234
0
                    }
235
0
                    Poll::Ready(Err(err)) => {
236
0
                        *this.error = Some(err);
237
0
                        return Poll::Ready(());
238
                    }
239
                },
240
0
                SocketProj::Resolved(mut socket) => {
241
0
                    if !*this.read_closed && *this.read_buffer_valid < this.read_buffer.len() {
242
0
                        let read_result = AsyncRead::poll_read(
243
0
                            socket.as_mut(),
244
0
                            cx,
245
0
                            &mut this.read_buffer[*this.read_buffer_valid..],
246
0
                        );
247
248
0
                        match read_result {
249
0
                            Poll::Pending => {}
250
0
                            Poll::Ready(Ok(0)) => {
251
0
                                *this.read_closed = true;
252
0
                                pending = false;
253
0
                            }
254
0
                            Poll::Ready(Ok(n)) => {
255
0
                                *this.read_buffer_valid += n;
256
0
                                // TODO: consider waking up only if the expected bytes of the consumer are exceeded
257
0
                                pending = false;
258
0
                            }
259
0
                            Poll::Ready(Err(err)) => {
260
0
                                *this.error = Some(err);
261
0
                                return Poll::Ready(());
262
                            }
263
                        };
264
0
                    }
265
266
0
                    loop {
267
0
                        if this.write_buffers.iter().any(|b| !b.is_empty()) {
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE000Bf_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1k_6future6future6Futurep6OutputINtNtB1k_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB4f_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1k_6marker4SendEL_EEB3l_NtNtB5B_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1Q_4sync3ArcNtB7u_15DefaultPlatformENtB7w_11PlatformRef21wait_read_write_again00NCB7p_0E000B7y_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE000Bf_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1l_5tasks15connection_taskB2x_B1f_E0s_00NCB4e_s_0E000B1n_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB3x_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1j_15connection_taskB2D_B1f_E0s_00NCB5O_s_0E000B1n_
268
0
                            let write_result = {
269
0
                                let buffers = this
270
0
                                    .write_buffers
271
0
                                    .iter()
272
0
                                    .map(|buf| io::IoSlice::new(buf))
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE00s_0Bf_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1k_6future6future6Futurep6OutputINtNtB1k_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB4f_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1k_6marker4SendEL_EEB3l_NtNtB5B_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1Q_4sync3ArcNtB7u_15DefaultPlatformENtB7w_11PlatformRef21wait_read_write_again00NCB7p_0E00s_0B7y_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBufferspppE21wait_read_write_againppE00s_0Bf_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1l_5tasks15connection_taskB2x_B1f_E0s_00NCB4e_s_0E00s_0B1n_
Unexecuted instantiation: _RNCNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtBb_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBd_9websocket10ConnectionB3x_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1j_15connection_taskB2D_B1f_E0s_00NCB5O_s_0E00s_0B1n_
273
0
                                    .collect::<Vec<_>>();
274
0
                                AsyncWrite::poll_write_vectored(socket.as_mut(), cx, &buffers)
275
                            };
276
277
0
                            match write_result {
278
                                Poll::Ready(Ok(0)) => {
279
                                    // It is not legal for `poll_write` to return 0 bytes written.
280
0
                                    unreachable!();
281
                                }
282
0
                                Poll::Ready(Ok(mut n)) => {
283
0
                                    *this.flush_pending = true;
284
0
                                    while n > 0 {
285
0
                                        let first_buf = this.write_buffers.first_mut().unwrap();
286
0
                                        if first_buf.len() <= n {
287
0
                                            n -= first_buf.len();
288
0
                                            this.write_buffers.remove(0);
289
0
                                        } else {
290
                                            // TODO: consider keeping the buffer as is but starting the next write at a later offset
291
0
                                            first_buf.copy_within(n.., 0);
292
0
                                            first_buf.truncate(first_buf.len() - n);
293
0
                                            break;
294
                                        }
295
                                    }
296
                                    // Wake up if the write buffers switch from non-empty to empty.
297
0
                                    if this.write_buffers.is_empty() {
298
0
                                        pending = false;
299
0
                                    }
300
                                }
301
0
                                Poll::Ready(Err(err)) => {
302
0
                                    *this.error = Some(err);
303
0
                                    return Poll::Ready(());
304
                                }
305
0
                                Poll::Pending => break,
306
                            };
307
0
                        } else if *this.flush_pending {
308
0
                            match AsyncWrite::poll_flush(socket.as_mut(), cx) {
309
0
                                Poll::Ready(Ok(())) => {
310
0
                                    *this.flush_pending = false;
311
0
                                }
312
0
                                Poll::Ready(Err(err)) => {
313
0
                                    *this.error = Some(err);
314
0
                                    return Poll::Ready(());
315
                                }
316
0
                                Poll::Pending => break,
317
                            }
318
0
                        } else if *this.close_pending {
319
0
                            match AsyncWrite::poll_close(socket.as_mut(), cx) {
320
                                Poll::Ready(Ok(())) => {
321
0
                                    *this.close_pending = false;
322
0
                                    pending = false;
323
0
                                    break;
324
                                }
325
0
                                Poll::Ready(Err(err)) => {
326
0
                                    *this.error = Some(err);
327
0
                                    return Poll::Ready(());
328
                                }
329
0
                                Poll::Pending => break,
330
                            }
331
                        } else {
332
0
                            break;
333
                        }
334
                    }
335
                }
336
            };
337
338
0
            if !pending {
339
0
                Poll::Ready(())
340
            } else {
341
0
                Poll::Pending
342
            }
343
0
        })
Unexecuted instantiation: _RNCNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE00Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1i_6future6future6Futurep6OutputINtNtB1i_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB4d_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1i_6marker4SendEL_EEB3j_NtNtB5z_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1O_4sync3ArcNtB7s_15DefaultPlatformENtB7u_11PlatformRef21wait_read_write_again00NCB7n_0E00B7w_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBufferspppE21wait_read_write_againppE00Bd_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1j_5tasks15connection_taskB2v_B1d_E0s_00NCB4c_s_0E00B1l_
Unexecuted instantiation: _RNCNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB9_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtBb_9websocket10ConnectionB3v_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1h_15connection_taskB2B_B1d_E0s_00NCB5M_s_0E00B1l_
344
0
        .await;
345
0
    }
Unexecuted instantiation: _RNCINvMs_NtNtCsN16ciHI6Qf_7smoldot6libp2p12with_buffersINtB7_11WithBufferspppE21wait_read_write_againppE0Bb_
Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersINtNtCsaYZPK01V26L_4core3pin3PinINtNtCsdZExvAaxgia_5alloc5boxed3BoxDNtNtNtB1g_6future6future6Futurep6OutputINtNtB1g_6result6ResultINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB9_9websocket10ConnectionB4b_EENtNtNtCsbpXXxgr6u8g_3std2io5error5ErrorENtNtB1g_6marker4SendEL_EEB3h_NtNtB5x_4time7InstantE21wait_read_write_againNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtB1M_4sync3ArcNtB7q_15DefaultPlatformENtB7s_11PlatformRef21wait_read_write_again00NCB7l_0E0B7u_
Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBufferspppE21wait_read_write_againppE0Bb_
Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0NtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvNtB1h_5tasks15connection_taskB2t_B1b_E0s_00NCB4a_s_0E0B1j_
Unexecuted instantiation: _RNCINvMs_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_11WithBuffersNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0INtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtB9_9websocket10ConnectionB3t_EENtNtCsbpXXxgr6u8g_3std4time7InstantE21wait_read_write_againNCNCNCINvB1f_15connection_taskB2z_B1b_E0s_00NCB5K_s_0E0B1j_
346
}
347
348
impl<TSocketFut, TSocket: fmt::Debug, TNow> fmt::Debug for WithBuffers<TSocketFut, TSocket, TNow> {
349
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
350
0
        let mut t = f.debug_tuple("WithBuffers");
351
0
        if let Socket::Resolved(socket) = &self.socket {
352
0
            t.field(socket);
353
0
        } else {
354
0
            t.field(&"<pending>");
355
0
        }
356
0
        t.finish()
357
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss0_0pppEINtB5_11WithBufferspppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtB9_
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss0_0pppEINtB5_11WithBufferspppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtB9_
358
}
359
360
/// See [`WithBuffers::read_write_access`].
361
pub struct ReadWriteAccess<'a, TNow: Clone> {
362
    read_write: read_write::ReadWrite<TNow>,
363
364
    read_buffer_len_before: usize,
365
    write_buffers_len_before: usize,
366
367
    // Fields below as references from the content of the `WithBuffers`.
368
    read_buffer: &'a mut Vec<u8>,
369
    read_buffer_valid: &'a mut usize,
370
    read_buffer_reasonable_capacity: usize,
371
    write_buffers: &'a mut Vec<Vec<u8>>,
372
    write_closed: &'a mut bool,
373
    close_pending: &'a mut bool,
374
    read_write_wake_up_after: &'a mut Option<TNow>,
375
}
376
377
impl<'a, TNow: Clone> ops::Deref for ReadWriteAccess<'a, TNow> {
378
    type Target = read_write::ReadWrite<TNow>;
379
380
0
    fn deref(&self) -> &Self::Target {
381
0
        &self.read_write
382
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss1_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefB9_
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss1_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefB9_
Unexecuted instantiation: _RNvXs1_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops5deref5Deref5derefCsiUjFBJteJ7x_17smoldot_full_node
383
}
384
385
impl<'a, TNow: Clone> ops::DerefMut for ReadWriteAccess<'a, TNow> {
386
0
    fn deref_mut(&mut self) -> &mut Self::Target {
387
0
        &mut self.read_write
388
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss2_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutB9_
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss2_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutB9_
Unexecuted instantiation: _RNvXs2_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops5deref8DerefMut9deref_mutCsiUjFBJteJ7x_17smoldot_full_node
389
}
390
391
impl<'a, TNow: Clone> Drop for ReadWriteAccess<'a, TNow> {
392
0
    fn drop(&mut self) {
393
0
        *self.read_buffer = mem::take(&mut self.read_write.incoming_buffer);
394
0
        *self.read_buffer_valid = self.read_buffer.len();
395
396
        // Adjust `read_buffer` to the number of bytes requested by the consumer.
397
0
        if let Some(expected_incoming_bytes) = self.read_write.expected_incoming_bytes {
398
0
            if expected_incoming_bytes < self.read_buffer_reasonable_capacity
399
0
                && self.read_buffer.is_empty()
400
0
            {
401
0
                // We use `shrink_to(0)` then `reserve(cap)` rather than just `shrink_to(cap)`
402
0
                // so that the `Vec` doesn't try to preserve the data in the read buffer.
403
0
                self.read_buffer.shrink_to(0);
404
0
                self.read_buffer
405
0
                    .reserve(self.read_buffer_reasonable_capacity);
406
0
            } else if expected_incoming_bytes > self.read_buffer.len() {
407
0
                self.read_buffer
408
0
                    .reserve(expected_incoming_bytes - self.read_buffer.len());
409
0
            }
410
0
            debug_assert!(self.read_buffer.capacity() >= expected_incoming_bytes);
411
0
        }
412
413
0
        *self.write_buffers = mem::take(&mut self.read_write.write_buffers);
414
0
415
0
        if self.read_write.write_bytes_queueable.is_none() && !*self.write_closed {
416
0
            *self.write_closed = true;
417
0
            *self.close_pending = true;
418
0
        }
419
420
0
        *self.read_write_wake_up_after = self.read_write.wake_up_after.take();
421
0
422
0
        // If the consumer has advanced its reading or writing sides, we make the next call to
423
0
        // `read_write_access` return immediately by setting `wake_up_after`.
424
0
        if (self.read_buffer_len_before != self.read_buffer.len()
425
0
            && self
426
0
                .read_write
427
0
                .expected_incoming_bytes
428
0
                .map_or(false, |b| b <= self.read_buffer.len()))
Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss3_0pEINtB7_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0Bb_
Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss3_0pEINtB7_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0Bb_
Unexecuted instantiation: _RNCNvXs3_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB7_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop0CsiUjFBJteJ7x_17smoldot_full_node
429
0
            || (self.write_buffers_len_before != self.write_buffers.len() && !*self.write_closed)
430
0
        {
431
0
            *self.read_write_wake_up_after = Some(self.read_write.now.clone());
432
0
        }
433
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p12with_bufferss3_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropB9_
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p12with_bufferss3_0pEINtB5_15ReadWriteAccesspENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropB9_
Unexecuted instantiation: _RNvXs3_NtNtCseuYC0Zibziv_7smoldot6libp2p12with_buffersINtB5_15ReadWriteAccessNtNtCsbpXXxgr6u8g_3std4time7InstantENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropCsiUjFBJteJ7x_17smoldot_full_node
434
}
435
436
// TODO: tests