Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/network_service/tasks.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{LogCallback, LogLevel};
19
use core::future::Future;
20
use futures_lite::future;
21
use futures_util::StreamExt as _;
22
use smol::{
23
    channel,
24
    future::FutureExt as _,
25
    io::{AsyncRead, AsyncWrite},
26
};
27
use smoldot::{
28
    libp2p::{
29
        multiaddr::{Multiaddr, Protocol},
30
        websocket, with_buffers,
31
    },
32
    network::service::{self, CoordinatorToConnection},
33
};
34
use std::{
35
    io,
36
    net::{IpAddr, SocketAddr},
37
    pin,
38
    sync::Arc,
39
    time::{Duration, Instant},
40
};
41
42
pub(super) trait AsyncReadWrite: AsyncRead + AsyncWrite {}
43
impl<T> AsyncReadWrite for T where T: AsyncRead + AsyncWrite {}
44
45
/// Asynchronous task managing a specific connection.
46
0
pub(super) async fn connection_task(
47
0
    log_callback: Arc<dyn LogCallback + Send + Sync>,
48
0
    address: String,
49
0
    socket: impl Future<Output = Result<impl AsyncReadWrite, io::Error>>,
50
0
    connection_id: service::ConnectionId,
51
0
    mut connection_task: service::SingleStreamConnectionTask<Instant>,
52
0
    coordinator_to_connection: channel::Receiver<service::CoordinatorToConnection>,
53
0
    connection_to_coordinator: channel::Sender<(
54
0
        service::ConnectionId,
55
0
        Option<service::ConnectionToCoordinator>,
56
0
    )>,
57
0
) {
Unexecuted instantiation: _RINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2b_EENCNvB2_19multiaddr_to_socket0EB6_
Unexecuted instantiation: _RINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB4_15background_task0s4_0EB6_
Unexecuted instantiation: _RINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppEB6_
58
0
    // The socket future is wrapped around an object containing a read buffer and a write buffer
59
0
    // and allowing easier usage.
60
0
    let mut socket = pin::pin!(with_buffers::WithBuffers::new(socket));
61
0
62
0
    // Future that sends a message to the coordinator. Only one message is sent to the coordinator
63
0
    // at a time. `None` if no message is being sent.
64
0
    let mut message_sending = pin::pin!(None);
65
0
66
0
    // Channel receivers need to be pinned.
67
0
    let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
68
69
0
    loop {
70
0
        // Because only one message should be sent to the coordinator at a time, and that
71
0
        // processing the socket might generate a message, we only process the socket if no
72
0
        // message is currently being sent.
73
0
        if message_sending.is_none() {
74
0
            if let Ok(mut socket_read_write) = socket.as_mut().read_write_access(Instant::now()) {
75
0
                let read_bytes_before = socket_read_write.read_bytes;
76
0
                let written_bytes_before = socket_read_write.write_bytes_queued;
77
0
                let write_closed = socket_read_write.write_bytes_queueable.is_none();
78
0
79
0
                connection_task.read_write(&mut *socket_read_write);
80
0
81
0
                if socket_read_write.read_bytes != read_bytes_before
82
0
                    || socket_read_write.write_bytes_queued != written_bytes_before
83
0
                    || (!write_closed && socket_read_write.write_bytes_queueable.is_none())
84
0
                {
85
0
                    log_callback.log(
86
0
                        LogLevel::Trace,
87
0
                        format!(
88
0
                            "connection-activity; address={address}; read={}; written={}; wake_up_after={:?}; write_close={:?}",
89
0
                            socket_read_write.read_bytes - read_bytes_before,
90
0
                            socket_read_write.write_bytes_queued - written_bytes_before,
91
0
                            socket_read_write.wake_up_after.map(|w| w
92
0
                                .checked_duration_since(socket_read_write.now)
93
0
                                .unwrap_or(Duration::new(0, 0))),
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s2_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s2_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s2_0Ba_
94
0
                            socket_read_write.write_bytes_queueable.is_none(),
95
0
                        ),
96
0
                    );
97
0
                }
98
            } else {
99
                // Error on the socket.
100
0
                if !connection_task.is_reset_called() {
101
0
                    log_callback.log(
102
0
                        LogLevel::Trace,
103
0
                        format!("connection-activity; address={}; reset", address),
104
0
                    );
105
0
                    connection_task.reset();
106
0
                }
107
            }
108
109
            // Try pull message to send to the coordinator.
110
111
            // Calling this method takes ownership of the task and returns that task if it has
112
            // more work to do. If `None` is returned, then the entire task is gone and the
113
            // connection must be abruptly closed, which is what happens when we return from
114
            // this function.
115
0
            let (task_update, opaque_message) = connection_task.pull_message_to_coordinator();
116
0
            if let Some(task_update) = task_update {
117
0
                connection_task = task_update;
118
0
                debug_assert!(message_sending.is_none());
119
0
                if let Some(opaque_message) = opaque_message {
120
0
                    message_sending.set(Some(
121
0
                        connection_to_coordinator.send((connection_id, Some(opaque_message))),
122
0
                    ));
123
0
                }
124
            } else {
125
0
                let _ = connection_to_coordinator
126
0
                    .send((connection_id, opaque_message))
127
0
                    .await;
128
0
                return;
129
            }
130
0
        }
131
132
        // Now wait for something interesting to happen before looping again.
133
134
        enum WakeUpReason {
135
            CoordinatorMessage(CoordinatorToConnection),
136
            CoordinatorDead,
137
            SocketEvent,
138
            MessageSent,
139
        }
140
141
0
        let wake_up_reason: WakeUpReason = {
142
0
            let coordinator_message = async {
143
0
                match coordinator_to_connection.next().await {
144
0
                    Some(msg) => WakeUpReason::CoordinatorMessage(msg),
145
0
                    None => WakeUpReason::CoordinatorDead,
146
                }
147
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E00Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E00Ba_
Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE00Ba_
148
149
0
            let socket_event = {
150
0
                // The future returned by `wait_read_write_again` yields when `read_write_access`
151
0
                // must be called. Because we only call `read_write_access` when `message_sending`
152
0
                // is `None`, we also call `wait_read_write_again` only when `message_sending` is
153
0
                // `None`.
154
0
                let fut = if message_sending.is_none() {
155
0
                    Some(socket.as_mut().wait_read_write_again(|when| async move {
156
0
                        smol::Timer::at(when).await;
157
0
                    }))
Unexecuted instantiation: _RNCNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2h_EENCNvB8_19multiaddr_to_socket0E0s_00Bc_
Unexecuted instantiation: _RNCNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvBa_15background_task0s4_0E0s_00Bc_
Unexecuted instantiation: _RNCNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s_00Bc_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s_0Ba_
158
                } else {
159
0
                    None
160
                };
161
0
                async {
162
0
                    if let Some(fut) = fut {
163
0
                        fut.await;
164
0
                        WakeUpReason::SocketEvent
165
                    } else {
166
0
                        future::pending().await
167
                    }
168
0
                }
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s0_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s0_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s0_0Ba_
169
            };
170
171
0
            let message_sent = async {
172
0
                let result =
173
0
                    if let Some(message_sending) = message_sending.as_mut().as_mut().as_pin_mut() {
174
0
                        message_sending.await
175
                    } else {
176
0
                        future::pending().await
177
                    };
178
0
                message_sending.set(None);
179
0
                if result.is_ok() {
180
0
                    WakeUpReason::MessageSent
181
                } else {
182
0
                    WakeUpReason::CoordinatorDead
183
                }
184
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s1_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s1_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s1_0Ba_
185
186
0
            coordinator_message.or(socket_event).or(message_sent).await
187
        };
188
189
0
        match wake_up_reason {
190
0
            WakeUpReason::CoordinatorMessage(message) => {
191
0
                connection_task.inject_coordinator_message(&Instant::now(), message);
192
0
            }
193
0
            WakeUpReason::CoordinatorDead => return,
194
0
            WakeUpReason::SocketEvent => {}
195
0
            WakeUpReason::MessageSent => {}
196
        }
197
    }
198
0
}
Unexecuted instantiation: _RNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2d_EENCNvB4_19multiaddr_to_socket0E0B8_
Unexecuted instantiation: _RNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB6_15background_task0s4_0E0B8_
Unexecuted instantiation: _RNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0B8_
199
200
/// Builds a future that connects to the given multiaddress. Returns an error if the multiaddress
201
/// protocols aren't supported.
202
0
pub(super) fn multiaddr_to_socket(
203
0
    addr: &Multiaddr,
204
0
) -> Result<impl Future<Output = Result<impl AsyncReadWrite, io::Error>>, ()> {
205
0
    let mut iter = addr.iter().fuse();
206
0
    let proto1 = iter.next().ok_or(())?;
207
0
    let proto2 = iter.next().ok_or(())?;
208
0
    let proto3 = iter.next();
209
0
210
0
    if iter.next().is_some() {
211
0
        return Err(());
212
0
    }
213
214
    // TODO: doesn't support WebSocket secure connections
215
216
    // Ensure ahead of time that the multiaddress is supported.
217
0
    let (addr, host_if_websocket) = match (&proto1, &proto2, &proto3) {
218
0
        (Protocol::Ip4(ip), Protocol::Tcp(port), None) => (
219
0
            either::Left(SocketAddr::new(IpAddr::V4((*ip).into()), *port)),
220
0
            None,
221
0
        ),
222
0
        (Protocol::Ip6(ip), Protocol::Tcp(port), None) => (
223
0
            either::Left(SocketAddr::new(IpAddr::V6((*ip).into()), *port)),
224
0
            None,
225
0
        ),
226
0
        (Protocol::Ip4(ip), Protocol::Tcp(port), Some(Protocol::Ws)) => {
227
0
            let addr = SocketAddr::new(IpAddr::V4((*ip).into()), *port);
228
0
            (either::Left(addr), Some(addr.to_string()))
229
        }
230
0
        (Protocol::Ip6(ip), Protocol::Tcp(port), Some(Protocol::Ws)) => {
231
0
            let addr = SocketAddr::new(IpAddr::V6((*ip).into()), *port);
232
0
            (either::Left(addr), Some(addr.to_string()))
233
        }
234
235
        // TODO: we don't care about the differences between Dns, Dns4, and Dns6
236
        (
237
0
            Protocol::Dns(addr) | Protocol::Dns4(addr) | Protocol::Dns6(addr),
238
0
            Protocol::Tcp(port),
239
            None,
240
0
        ) => (either::Right((addr.to_string(), *port)), None),
241
        (
242
0
            Protocol::Dns(addr) | Protocol::Dns4(addr) | Protocol::Dns6(addr),
243
0
            Protocol::Tcp(port),
244
            Some(Protocol::Ws),
245
0
        ) => (
246
0
            either::Right((addr.to_string(), *port)),
247
0
            Some(format!("{}:{}", addr, *port)),
248
0
        ),
249
250
0
        _ => return Err(()),
251
    };
252
253
0
    Ok(async move {
254
0
        let tcp_socket = match addr {
255
0
            either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await,
256
0
            either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await,
257
        };
258
259
0
        if let Ok(tcp_socket) = &tcp_socket {
260
0
            // The Nagle algorithm, implemented in the kernel, consists in buffering the
261
0
            // data to be sent out and waiting a bit before actually sending it out, in
262
0
            // order to potentially merge multiple writes in a row into one packet. In
263
0
            // the implementation below, it is guaranteed that the buffer in `WithBuffers`
264
0
            // is filled with as much data as possible before the operating system gets
265
0
            // involved. As such, we disable the Nagle algorithm, in order to avoid adding
266
0
            // an artificial delay to all sends.
267
0
            let _ = tcp_socket.set_nodelay(true);
268
0
        }
269
270
0
        match (tcp_socket, host_if_websocket) {
271
0
            (Ok(tcp_socket), Some(host)) => {
272
0
                websocket::websocket_client_handshake(websocket::Config {
273
0
                    tcp_socket,
274
0
                    host: &host,
275
0
                    url: "/",
276
0
                })
277
0
                .await
278
0
                .map(futures_util::future::Either::Right)
279
            }
280
0
            (Ok(tcp_socket), None) => Ok(futures_util::future::Either::Left(tcp_socket)),
281
0
            (Err(err), _) => Err(err),
282
        }
283
0
    })
Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0B7_
Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0B7_
284
0
}
Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket
Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks19multiaddr_to_socket