Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/websocket.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Implementation of a WebSocket client that wraps around an abstract representation of a TCP
19
//! socket through the `AsyncRead` and `AsyncWrite` traits.
20
21
#![cfg(feature = "std")]
22
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]
23
24
use futures_util::{future, AsyncRead, AsyncWrite, Future as _};
25
26
use core::{
27
    cmp, mem,
28
    pin::Pin,
29
    task::{Context, Poll},
30
};
31
32
use std::io;
33
34
/// Configuration for [`websocket_client_handshake`].
35
pub struct Config<'a, T> {
36
    /// Socket to negotiate WebSocket on top of.
37
    pub tcp_socket: T,
38
39
    /// Values to pass for the `Host` HTTP header. Example values include `example.com:1234` or
40
    /// `127.0.0.1:3337`.
41
    pub host: &'a str,
42
43
    /// URL to pass to the server during the HTTP handshake. Typically `/`.
44
    pub url: &'a str,
45
}
46
47
/// Negotiates the WebSocket protocol (including the HTTP-like request) on the given socket, and
48
/// returns an object that translates reads and writes into WebSocket binary frames.
49
0
pub async fn websocket_client_handshake<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>(
50
0
    config: Config<'_, T>,
51
0
) -> Result<Connection<T>, io::Error> {
Unexecuted instantiation: _RINvNtNtCsN16ciHI6Qf_7smoldot6libp2p9websocket26websocket_client_handshakepEB6_
Unexecuted instantiation: _RINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakeNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamECsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakepEB6_
Unexecuted instantiation: _RINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakeNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamECsiUjFBJteJ7x_17smoldot_full_node
52
0
    let mut client = soketto::handshake::Client::new(config.tcp_socket, config.host, config.url);
53
54
0
    let (sender, receiver) = match client.handshake().await {
55
0
        Ok(soketto::handshake::ServerResponse::Accepted { .. }) => client.into_builder().finish(),
56
        Ok(soketto::handshake::ServerResponse::Redirect { .. }) => {
57
            // TODO: implement?
58
0
            return Err(io::Error::new(
59
0
                io::ErrorKind::ConnectionRefused,
60
0
                "Redirections not implemented",
61
0
            ));
62
        }
63
0
        Ok(soketto::handshake::ServerResponse::Rejected { status_code }) => {
64
0
            return Err(io::Error::new(
65
0
                io::ErrorKind::ConnectionRefused,
66
0
                format!("Status code {status_code}"),
67
0
            ))
68
        }
69
0
        Err(err) => return Err(io::Error::new(io::ErrorKind::Other, err)),
70
    };
71
72
0
    Ok(Connection {
73
0
        sender: Write::Idle(sender),
74
0
        receiver: Read::Idle(receiver, Vec::with_capacity(1024), 0),
75
0
    })
76
0
}
Unexecuted instantiation: _RNCINvNtNtCsN16ciHI6Qf_7smoldot6libp2p9websocket26websocket_client_handshakepE0B8_
Unexecuted instantiation: _RNCINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakeNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamE0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakepE0B8_
Unexecuted instantiation: _RNCINvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket26websocket_client_handshakeNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamE0CsiUjFBJteJ7x_17smoldot_full_node
77
78
/// Negotiated WebSocket connection.
79
///
80
/// Implements the `AsyncRead` and `AsyncWrite` traits.
81
pub struct Connection<T> {
82
    sender: Write<T>,
83
    receiver: Read<T>,
84
}
85
86
enum Read<T> {
87
    Idle(soketto::connection::Receiver<T>, Vec<u8>, usize),
88
    Error(soketto::connection::Error),
89
    InProgress(future::BoxFuture<'static, Result<ReadOutcome<T>, soketto::connection::Error>>),
90
    Poisoned,
91
}
92
93
struct ReadOutcome<T> {
94
    socket: soketto::connection::Receiver<T>,
95
    buffer: Vec<u8>,
96
}
97
98
enum Write<T> {
99
    Idle(soketto::connection::Sender<T>),
100
    Writing(
101
        future::BoxFuture<
102
            'static,
103
            Result<soketto::connection::Sender<T>, soketto::connection::Error>,
104
        >,
105
    ),
106
    Flushing(
107
        future::BoxFuture<
108
            'static,
109
            Result<soketto::connection::Sender<T>, soketto::connection::Error>,
110
        >,
111
    ),
112
    Closing(future::BoxFuture<'static, Result<(), soketto::connection::Error>>),
113
    Closed,
114
    Error(soketto::connection::Error),
115
    Poisoned,
116
}
117
118
impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> AsyncRead for Connection<T> {
119
0
    fn poll_read(
120
0
        mut self: Pin<&mut Self>,
121
0
        cx: &mut Context<'_>,
122
0
        out_buf: &mut [u8],
123
0
    ) -> Poll<io::Result<usize>> {
124
0
        assert_ne!(out_buf.len(), 0);
125
126
        loop {
127
0
            match mem::replace(&mut self.receiver, Read::Poisoned) {
128
0
                Read::Idle(socket, pending, pending_pos) if pending_pos < pending.len() => {
129
0
                    let to_copy = cmp::min(out_buf.len(), pending.len() - pending_pos);
130
0
                    debug_assert_ne!(to_copy, 0);
131
0
                    out_buf[..to_copy].copy_from_slice(&pending[pending_pos..][..to_copy]);
132
0
                    self.receiver = Read::Idle(socket, pending, pending_pos + to_copy);
133
0
                    return Poll::Ready(Ok(to_copy));
134
                }
135
0
                Read::Idle(mut socket, mut buffer, _) => {
136
0
                    buffer.clear();
137
0
                    self.receiver = Read::InProgress(Box::pin(async move {
138
0
                        socket.receive_data(&mut buffer).await?;
139
0
                        Ok(ReadOutcome { socket, buffer })
140
0
                    }));
Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websocket0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_read0Bb_
Unexecuted instantiation: _RNCNvXNtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_read0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websocket0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_read0Bb_
Unexecuted instantiation: _RNCNvXNtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_read0CsiUjFBJteJ7x_17smoldot_full_node
141
0
                }
142
0
                Read::InProgress(mut future) => match Pin::new(&mut future).poll(cx) {
143
                    Poll::Pending => {
144
0
                        self.receiver = Read::InProgress(future);
145
0
                        return Poll::Pending;
146
                    }
147
0
                    Poll::Ready(Ok(ReadOutcome { socket, buffer })) => {
148
0
                        self.receiver = Read::Idle(socket, buffer, 0);
149
0
                    }
150
0
                    Poll::Ready(Err(err)) => {
151
0
                        self.receiver = Read::Error(err);
152
0
                    }
153
                },
154
0
                Read::Error(err) => {
155
0
                    let out_err = convert_err(&err);
156
0
                    self.receiver = Read::Error(err);
157
0
                    return Poll::Ready(Err(out_err));
158
                }
159
0
                Read::Poisoned => unreachable!(),
160
            }
161
        }
162
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websocket0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_readB9_
Unexecuted instantiation: _RNvXNtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB2_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_readCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websocket0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_readB9_
Unexecuted instantiation: _RNvXNtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB2_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std9AsyncRead9poll_readCsiUjFBJteJ7x_17smoldot_full_node
163
}
164
165
impl<T: AsyncRead + AsyncWrite + Send + Unpin + 'static> AsyncWrite for Connection<T> {
166
0
    fn poll_write(
167
0
        mut self: Pin<&mut Self>,
168
0
        cx: &mut Context<'_>,
169
0
        buf: &[u8],
170
0
    ) -> Poll<io::Result<usize>> {
171
0
        loop {
172
0
            match mem::replace(&mut self.sender, Write::Poisoned) {
173
0
                Write::Idle(mut socket) => {
174
0
                    let len = buf.len();
175
0
                    let buf = buf.to_vec();
176
0
                    self.sender = Write::Writing(Box::pin(async move {
177
0
                        socket.send_binary_mut(buf).await?;
178
0
                        Ok(socket)
179
0
                    }));
Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_write0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_write0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_write0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_write0CsiUjFBJteJ7x_17smoldot_full_node
180
0
                    return Poll::Ready(Ok(len));
181
                }
182
0
                Write::Flushing(mut future) => match Pin::new(&mut future).poll(cx) {
183
                    Poll::Pending => {
184
0
                        self.sender = Write::Flushing(future);
185
0
                        return Poll::Pending;
186
                    }
187
0
                    Poll::Ready(Ok(socket)) => {
188
0
                        self.sender = Write::Idle(socket);
189
0
                    }
190
0
                    Poll::Ready(Err(err)) => {
191
0
                        self.sender = Write::Error(err);
192
0
                    }
193
                },
194
0
                Write::Writing(mut future) => match Pin::new(&mut future).poll(cx) {
195
                    Poll::Pending => {
196
0
                        self.sender = Write::Writing(future);
197
0
                        return Poll::Pending;
198
                    }
199
0
                    Poll::Ready(Ok(socket)) => {
200
0
                        self.sender = Write::Idle(socket);
201
0
                    }
202
0
                    Poll::Ready(Err(err)) => {
203
0
                        self.sender = Write::Error(err);
204
0
                    }
205
                },
206
0
                Write::Closing(mut future) => match Pin::new(&mut future).poll(cx) {
207
                    Poll::Pending => {
208
0
                        self.sender = Write::Closing(future);
209
0
                        return Poll::Pending;
210
                    }
211
0
                    Poll::Ready(Ok(())) => {
212
0
                        self.sender = Write::Closed;
213
0
                    }
214
0
                    Poll::Ready(Err(err)) => {
215
0
                        self.sender = Write::Error(err);
216
0
                    }
217
                },
218
                Write::Closed => {
219
                    // It is unclear what to do in the situation where the user tries to write
220
                    // to the connection after having closed it. An error seems the most
221
                    // appropriate course of action.
222
0
                    return Poll::Ready(Err(io::Error::new(
223
0
                        io::ErrorKind::BrokenPipe,
224
0
                        "called poll_write after poll_close has succeeded",
225
0
                    )));
226
                }
227
0
                Write::Error(err) => {
228
0
                    let out_err = convert_err(&err);
229
0
                    self.sender = Write::Error(err);
230
0
                    return Poll::Ready(Err(out_err));
231
                }
232
0
                Write::Poisoned => unreachable!(),
233
            }
234
        }
235
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_writeB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_writeCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_writeB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_writeCsiUjFBJteJ7x_17smoldot_full_node
236
237
0
    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
238
0
        loop {
239
0
            match mem::replace(&mut self.sender, Write::Poisoned) {
240
0
                Write::Idle(mut socket) => {
241
0
                    self.sender = Write::Flushing(Box::pin(async move {
242
0
                        socket.flush().await?;
243
0
                        Ok(socket)
244
0
                    }));
Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flush0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flush0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flush0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flush0CsiUjFBJteJ7x_17smoldot_full_node
245
0
                }
246
0
                Write::Flushing(mut future) => match Pin::new(&mut future).poll(cx) {
247
                    Poll::Pending => {
248
0
                        self.sender = Write::Flushing(future);
249
0
                        return Poll::Pending;
250
                    }
251
0
                    Poll::Ready(Ok(socket)) => {
252
0
                        self.sender = Write::Idle(socket);
253
0
                        return Poll::Ready(Ok(()));
254
                    }
255
0
                    Poll::Ready(Err(err)) => {
256
0
                        self.sender = Write::Error(err);
257
0
                    }
258
                },
259
0
                Write::Writing(mut future) => match Pin::new(&mut future).poll(cx) {
260
                    Poll::Pending => {
261
0
                        self.sender = Write::Writing(future);
262
0
                        return Poll::Pending;
263
                    }
264
0
                    Poll::Ready(Ok(socket)) => {
265
0
                        self.sender = Write::Idle(socket);
266
0
                    }
267
0
                    Poll::Ready(Err(err)) => {
268
0
                        self.sender = Write::Error(err);
269
0
                    }
270
                },
271
0
                Write::Closing(mut future) => match Pin::new(&mut future).poll(cx) {
272
                    Poll::Pending => {
273
0
                        self.sender = Write::Closing(future);
274
0
                        return Poll::Pending;
275
                    }
276
0
                    Poll::Ready(Ok(())) => {
277
0
                        self.sender = Write::Closed;
278
0
                    }
279
0
                    Poll::Ready(Err(err)) => {
280
0
                        self.sender = Write::Error(err);
281
0
                    }
282
                },
283
0
                Write::Closed => return Poll::Ready(Ok(())),
284
0
                Write::Error(err) => {
285
0
                    let out_err = convert_err(&err);
286
0
                    self.sender = Write::Error(err);
287
0
                    return Poll::Ready(Err(out_err));
288
                }
289
0
                Write::Poisoned => unreachable!(),
290
            }
291
        }
292
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flushB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flushCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flushB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_flushCsiUjFBJteJ7x_17smoldot_full_node
293
294
0
    fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
295
0
        loop {
296
0
            match mem::replace(&mut self.sender, Write::Poisoned) {
297
0
                Write::Idle(mut socket) => {
298
0
                    self.sender = Write::Closing(Box::pin(async move {
299
0
                        socket.close().await?;
300
0
                        Ok(())
301
0
                    }));
Unexecuted instantiation: _RNCNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_close0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_close0Csih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNCNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB7_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_close0Bb_
Unexecuted instantiation: _RNCNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB6_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_close0CsiUjFBJteJ7x_17smoldot_full_node
302
0
                }
303
0
                Write::Flushing(mut future) => match Pin::new(&mut future).poll(cx) {
304
                    Poll::Pending => {
305
0
                        self.sender = Write::Flushing(future);
306
0
                        return Poll::Pending;
307
                    }
308
0
                    Poll::Ready(Ok(socket)) => {
309
0
                        self.sender = Write::Idle(socket);
310
0
                    }
311
0
                    Poll::Ready(Err(err)) => {
312
0
                        self.sender = Write::Error(err);
313
0
                    }
314
                },
315
0
                Write::Writing(mut future) => match Pin::new(&mut future).poll(cx) {
316
                    Poll::Pending => {
317
0
                        self.sender = Write::Writing(future);
318
0
                        return Poll::Pending;
319
                    }
320
0
                    Poll::Ready(Ok(socket)) => {
321
0
                        self.sender = Write::Idle(socket);
322
0
                    }
323
0
                    Poll::Ready(Err(err)) => {
324
0
                        self.sender = Write::Error(err);
325
0
                    }
326
                },
327
0
                Write::Closing(mut future) => match Pin::new(&mut future).poll(cx) {
328
                    Poll::Pending => {
329
0
                        self.sender = Write::Closing(future);
330
0
                        return Poll::Pending;
331
                    }
332
                    Poll::Ready(Ok(())) => {
333
0
                        self.sender = Write::Closed;
334
0
                        return Poll::Ready(Ok(()));
335
                    }
336
0
                    Poll::Ready(Err(err)) => {
337
0
                        self.sender = Write::Error(err);
338
0
                    }
339
                },
340
0
                Write::Closed => return Poll::Ready(Ok(())),
341
0
                Write::Error(err) => {
342
0
                    let out_err = convert_err(&err);
343
0
                    self.sender = Write::Error(err);
344
0
                    return Poll::Ready(Err(out_err));
345
                }
346
0
                Write::Poisoned => unreachable!(),
347
            }
348
        }
349
0
    }
Unexecuted instantiation: _RNvXININtNtCsN16ciHI6Qf_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_closeB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_closeCsih6EgvAwZF2_13smoldot_light
Unexecuted instantiation: _RNvXININtNtCseuYC0Zibziv_7smoldot6libp2p9websockets_0pEINtB5_10ConnectionpENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_closeB9_
Unexecuted instantiation: _RNvXs_NtNtCseuYC0Zibziv_7smoldot6libp2p9websocketINtB4_10ConnectionNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamENtNtCsfLEmgqq87PL_10futures_io6if_std10AsyncWrite10poll_closeCsiUjFBJteJ7x_17smoldot_full_node
350
}
351
352
0
fn convert_err(err: &soketto::connection::Error) -> io::Error {
353
0
    match err {
354
0
        soketto::connection::Error::Io(err) => io::Error::new(err.kind(), err.to_string()),
355
0
        soketto::connection::Error::Codec(err) => {
356
0
            io::Error::new(io::ErrorKind::InvalidData, err.to_string())
357
        }
358
0
        soketto::connection::Error::Extension(err) => {
359
0
            io::Error::new(io::ErrorKind::InvalidData, err.to_string())
360
        }
361
0
        soketto::connection::Error::UnexpectedOpCode(err) => {
362
0
            io::Error::new(io::ErrorKind::InvalidData, err.to_string())
363
        }
364
0
        soketto::connection::Error::Utf8(err) => {
365
0
            io::Error::new(io::ErrorKind::InvalidData, err.to_string())
366
        }
367
        soketto::connection::Error::MessageTooLarge { .. } => {
368
0
            io::Error::from(io::ErrorKind::InvalidData)
369
        }
370
0
        soketto::connection::Error::Closed => io::Error::from(io::ErrorKind::ConnectionAborted),
371
0
        _ => io::Error::from(io::ErrorKind::Other),
372
    }
373
0
}
Unexecuted instantiation: _RNvNtNtCsN16ciHI6Qf_7smoldot6libp2p9websocket11convert_err
Unexecuted instantiation: _RNvNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket11convert_err
374
375
#[cfg(test)]
376
mod tests {
377
    use futures_util::{AsyncRead, AsyncWrite};
378
379
    #[test]
380
1
    fn is_send() {
381
1
        // Makes sure at compilate time that `Connection` implements `Send`.
382
1
        
fn req_send<T: Send>() {}0
383
1
        #[allow(unused)]
384
1
        fn trait_bounds<T: AsyncRead + AsyncWrite + Send + Unpin + 'static>() {
385
0
            req_send::<super::Connection<T>>()
386
0
        }
387
1
    }
388
}