/__w/smoldot/smoldot/repo/full-node/src/network_service/tasks.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{LogCallback, LogLevel}; |
19 | | use core::future::Future; |
20 | | use futures_lite::future; |
21 | | use futures_util::StreamExt as _; |
22 | | use smol::{ |
23 | | channel, |
24 | | future::FutureExt as _, |
25 | | io::{AsyncRead, AsyncWrite}, |
26 | | }; |
27 | | use smoldot::{ |
28 | | libp2p::{ |
29 | | multiaddr::{Multiaddr, Protocol}, |
30 | | websocket, with_buffers, |
31 | | }, |
32 | | network::service::{self, CoordinatorToConnection}, |
33 | | }; |
34 | | use std::{ |
35 | | io, |
36 | | net::{IpAddr, SocketAddr}, |
37 | | pin, |
38 | | sync::Arc, |
39 | | time::{Duration, Instant}, |
40 | | }; |
41 | | |
42 | | pub(super) trait AsyncReadWrite: AsyncRead + AsyncWrite {} |
43 | | impl<T> AsyncReadWrite for T where T: AsyncRead + AsyncWrite {} |
44 | | |
45 | | /// Asynchronous task managing a specific connection. |
46 | 0 | pub(super) async fn connection_task( |
47 | 0 | log_callback: Arc<dyn LogCallback + Send + Sync>, |
48 | 0 | address: String, |
49 | 0 | socket: impl Future<Output = Result<impl AsyncReadWrite, io::Error>>, |
50 | 0 | connection_id: service::ConnectionId, |
51 | 0 | mut connection_task: service::SingleStreamConnectionTask<Instant>, |
52 | 0 | coordinator_to_connection: channel::Receiver<service::CoordinatorToConnection>, |
53 | 0 | connection_to_coordinator: channel::Sender<( |
54 | 0 | service::ConnectionId, |
55 | 0 | Option<service::ConnectionToCoordinator>, |
56 | 0 | )>, |
57 | 0 | ) { Unexecuted instantiation: _RINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2b_EENCNvB2_19multiaddr_to_socket0EB6_ Unexecuted instantiation: _RINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB4_15background_task0s4_0EB6_ Unexecuted instantiation: _RINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppEB6_ |
58 | 0 | // The socket future is wrapped around an object containing a read buffer and a write buffer |
59 | 0 | // and allowing easier usage. |
60 | 0 | let mut socket = pin::pin!(with_buffers::WithBuffers::new(socket)); |
61 | 0 |
|
62 | 0 | // Future that sends a message to the coordinator. Only one message is sent to the coordinator |
63 | 0 | // at a time. `None` if no message is being sent. |
64 | 0 | let mut message_sending = pin::pin!(None); |
65 | 0 |
|
66 | 0 | // Channel receivers need to be pinned. |
67 | 0 | let mut coordinator_to_connection = pin::pin!(coordinator_to_connection); |
68 | | |
69 | 0 | loop { |
70 | 0 | // Because only one message should be sent to the coordinator at a time, and that |
71 | 0 | // processing the socket might generate a message, we only process the socket if no |
72 | 0 | // message is currently being sent. |
73 | 0 | if message_sending.is_none() { |
74 | 0 | if let Ok(mut socket_read_write) = socket.as_mut().read_write_access(Instant::now()) { |
75 | 0 | let read_bytes_before = socket_read_write.read_bytes; |
76 | 0 | let written_bytes_before = socket_read_write.write_bytes_queued; |
77 | 0 | let write_closed = socket_read_write.write_bytes_queueable.is_none(); |
78 | 0 |
|
79 | 0 | connection_task.read_write(&mut *socket_read_write); |
80 | 0 |
|
81 | 0 | if socket_read_write.read_bytes != read_bytes_before |
82 | 0 | || socket_read_write.write_bytes_queued != written_bytes_before |
83 | 0 | || (!write_closed && socket_read_write.write_bytes_queueable.is_none()) |
84 | 0 | { |
85 | 0 | log_callback.log( |
86 | 0 | LogLevel::Trace, |
87 | 0 | format!( |
88 | 0 | "connection-activity; address={address}; read={}; written={}; wake_up_after={:?}; write_close={:?}", |
89 | 0 | socket_read_write.read_bytes - read_bytes_before, |
90 | 0 | socket_read_write.write_bytes_queued - written_bytes_before, |
91 | 0 | socket_read_write.wake_up_after.map(|w| w |
92 | 0 | .checked_duration_since(socket_read_write.now) |
93 | 0 | .unwrap_or(Duration::new(0, 0))), Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s2_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s2_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s2_0Ba_ |
94 | 0 | socket_read_write.write_bytes_queueable.is_none(), |
95 | 0 | ), |
96 | 0 | ); |
97 | 0 | } |
98 | | } else { |
99 | | // Error on the socket. |
100 | 0 | if !connection_task.is_reset_called() { |
101 | 0 | log_callback.log( |
102 | 0 | LogLevel::Trace, |
103 | 0 | format!("connection-activity; address={}; reset", address), |
104 | 0 | ); |
105 | 0 | connection_task.reset(); |
106 | 0 | } |
107 | | } |
108 | | |
109 | | // Try pull message to send to the coordinator. |
110 | | |
111 | | // Calling this method takes ownership of the task and returns that task if it has |
112 | | // more work to do. If `None` is returned, then the entire task is gone and the |
113 | | // connection must be abruptly closed, which is what happens when we return from |
114 | | // this function. |
115 | 0 | let (task_update, opaque_message) = connection_task.pull_message_to_coordinator(); |
116 | 0 | if let Some(task_update) = task_update { |
117 | 0 | connection_task = task_update; |
118 | 0 | debug_assert!(message_sending.is_none()); |
119 | 0 | if let Some(opaque_message) = opaque_message { |
120 | 0 | message_sending.set(Some( |
121 | 0 | connection_to_coordinator.send((connection_id, Some(opaque_message))), |
122 | 0 | )); |
123 | 0 | } |
124 | | } else { |
125 | 0 | let _ = connection_to_coordinator |
126 | 0 | .send((connection_id, opaque_message)) |
127 | 0 | .await; |
128 | 0 | return; |
129 | | } |
130 | 0 | } |
131 | | |
132 | | // Now wait for something interesting to happen before looping again. |
133 | | |
134 | | enum WakeUpReason { |
135 | | CoordinatorMessage(CoordinatorToConnection), |
136 | | CoordinatorDead, |
137 | | SocketEvent, |
138 | | MessageSent, |
139 | | } |
140 | | |
141 | 0 | let wake_up_reason: WakeUpReason = { |
142 | 0 | let coordinator_message = async { |
143 | 0 | match coordinator_to_connection.next().await { |
144 | 0 | Some(msg) => WakeUpReason::CoordinatorMessage(msg), |
145 | 0 | None => WakeUpReason::CoordinatorDead, |
146 | | } |
147 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E00Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E00Ba_ Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE00Ba_ |
148 | | |
149 | 0 | let socket_event = { |
150 | 0 | // The future returned by `wait_read_write_again` yields when `read_write_access` |
151 | 0 | // must be called. Because we only call `read_write_access` when `message_sending` |
152 | 0 | // is `None`, we also call `wait_read_write_again` only when `message_sending` is |
153 | 0 | // `None`. |
154 | 0 | let fut = if message_sending.is_none() { |
155 | 0 | Some(socket.as_mut().wait_read_write_again(|when| async move { |
156 | 0 | smol::Timer::at(when).await; |
157 | 0 | })) Unexecuted instantiation: _RNCNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2h_EENCNvB8_19multiaddr_to_socket0E0s_00Bc_ Unexecuted instantiation: _RNCNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvBa_15background_task0s4_0E0s_00Bc_ Unexecuted instantiation: _RNCNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s_00Bc_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s_0Ba_ |
158 | | } else { |
159 | 0 | None |
160 | | }; |
161 | 0 | async { |
162 | 0 | if let Some(fut) = fut { |
163 | 0 | fut.await; |
164 | 0 | WakeUpReason::SocketEvent |
165 | | } else { |
166 | 0 | future::pending().await |
167 | | } |
168 | 0 | } Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s0_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s0_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s0_0Ba_ |
169 | | }; |
170 | | |
171 | 0 | let message_sent = async { |
172 | 0 | let result = |
173 | 0 | if let Some(message_sending) = message_sending.as_mut().as_mut().as_pin_mut() { |
174 | 0 | message_sending.await |
175 | | } else { |
176 | 0 | future::pending().await |
177 | | }; |
178 | 0 | message_sending.set(None); |
179 | 0 | if result.is_ok() { |
180 | 0 | WakeUpReason::MessageSent |
181 | | } else { |
182 | 0 | WakeUpReason::CoordinatorDead |
183 | | } |
184 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2f_EENCNvB6_19multiaddr_to_socket0E0s1_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB8_15background_task0s4_0E0s1_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0s1_0Ba_ |
185 | | |
186 | 0 | coordinator_message.or(socket_event).or(message_sent).await |
187 | | }; |
188 | | |
189 | 0 | match wake_up_reason { |
190 | 0 | WakeUpReason::CoordinatorMessage(message) => { |
191 | 0 | connection_task.inject_coordinator_message(&Instant::now(), message); |
192 | 0 | } |
193 | 0 | WakeUpReason::CoordinatorDead => return, |
194 | 0 | WakeUpReason::SocketEvent => {} |
195 | 0 | WakeUpReason::MessageSent => {} |
196 | | } |
197 | | } |
198 | 0 | } Unexecuted instantiation: _RNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskINtNtNtCsbAmNCxs6rLz_12futures_util6future6either6EitherNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamINtNtNtCseuYC0Zibziv_7smoldot6libp2p9websocket10ConnectionB2d_EENCNvB4_19multiaddr_to_socket0E0B8_ Unexecuted instantiation: _RNCINvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks15connection_taskNtNtCs3QA9SwIbHM0_9async_net3tcp9TcpStreamNCNCNvB6_15background_task0s4_0E0B8_ Unexecuted instantiation: _RNCINvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks15connection_taskppE0B8_ |
199 | | |
200 | | /// Builds a future that connects to the given multiaddress. Returns an error if the multiaddress |
201 | | /// protocols aren't supported. |
202 | 0 | pub(super) fn multiaddr_to_socket( |
203 | 0 | addr: &Multiaddr, |
204 | 0 | ) -> Result<impl Future<Output = Result<impl AsyncReadWrite, io::Error>>, ()> { |
205 | 0 | let mut iter = addr.iter().fuse(); |
206 | 0 | let proto1 = iter.next().ok_or(())?; |
207 | 0 | let proto2 = iter.next().ok_or(())?; |
208 | 0 | let proto3 = iter.next(); |
209 | 0 |
|
210 | 0 | if iter.next().is_some() { |
211 | 0 | return Err(()); |
212 | 0 | } |
213 | | |
214 | | // TODO: doesn't support WebSocket secure connections |
215 | | |
216 | | // Ensure ahead of time that the multiaddress is supported. |
217 | 0 | let (addr, host_if_websocket) = match (&proto1, &proto2, &proto3) { |
218 | 0 | (Protocol::Ip4(ip), Protocol::Tcp(port), None) => ( |
219 | 0 | either::Left(SocketAddr::new(IpAddr::V4((*ip).into()), *port)), |
220 | 0 | None, |
221 | 0 | ), |
222 | 0 | (Protocol::Ip6(ip), Protocol::Tcp(port), None) => ( |
223 | 0 | either::Left(SocketAddr::new(IpAddr::V6((*ip).into()), *port)), |
224 | 0 | None, |
225 | 0 | ), |
226 | 0 | (Protocol::Ip4(ip), Protocol::Tcp(port), Some(Protocol::Ws)) => { |
227 | 0 | let addr = SocketAddr::new(IpAddr::V4((*ip).into()), *port); |
228 | 0 | (either::Left(addr), Some(addr.to_string())) |
229 | | } |
230 | 0 | (Protocol::Ip6(ip), Protocol::Tcp(port), Some(Protocol::Ws)) => { |
231 | 0 | let addr = SocketAddr::new(IpAddr::V6((*ip).into()), *port); |
232 | 0 | (either::Left(addr), Some(addr.to_string())) |
233 | | } |
234 | | |
235 | | // TODO: we don't care about the differences between Dns, Dns4, and Dns6 |
236 | | ( |
237 | 0 | Protocol::Dns(addr) | Protocol::Dns4(addr) | Protocol::Dns6(addr), |
238 | 0 | Protocol::Tcp(port), |
239 | | None, |
240 | 0 | ) => (either::Right((addr.to_string(), *port)), None), |
241 | | ( |
242 | 0 | Protocol::Dns(addr) | Protocol::Dns4(addr) | Protocol::Dns6(addr), |
243 | 0 | Protocol::Tcp(port), |
244 | | Some(Protocol::Ws), |
245 | 0 | ) => ( |
246 | 0 | either::Right((addr.to_string(), *port)), |
247 | 0 | Some(format!("{}:{}", addr, *port)), |
248 | 0 | ), |
249 | | |
250 | 0 | _ => return Err(()), |
251 | | }; |
252 | | |
253 | 0 | Ok(async move { |
254 | 0 | let tcp_socket = match addr { |
255 | 0 | either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await, |
256 | 0 | either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await, |
257 | | }; |
258 | | |
259 | 0 | if let Ok(tcp_socket) = &tcp_socket { |
260 | 0 | // The Nagle algorithm, implemented in the kernel, consists in buffering the |
261 | 0 | // data to be sent out and waiting a bit before actually sending it out, in |
262 | 0 | // order to potentially merge multiple writes in a row into one packet. In |
263 | 0 | // the implementation below, it is guaranteed that the buffer in `WithBuffers` |
264 | 0 | // is filled with as much data as possible before the operating system gets |
265 | 0 | // involved. As such, we disable the Nagle algorithm, in order to avoid adding |
266 | 0 | // an artificial delay to all sends. |
267 | 0 | let _ = tcp_socket.set_nodelay(true); |
268 | 0 | } |
269 | | |
270 | 0 | match (tcp_socket, host_if_websocket) { |
271 | 0 | (Ok(tcp_socket), Some(host)) => { |
272 | 0 | websocket::websocket_client_handshake(websocket::Config { |
273 | 0 | tcp_socket, |
274 | 0 | host: &host, |
275 | 0 | url: "/", |
276 | 0 | }) |
277 | 0 | .await |
278 | 0 | .map(futures_util::future::Either::Right) |
279 | | } |
280 | 0 | (Ok(tcp_socket), None) => Ok(futures_util::future::Either::Left(tcp_socket)), |
281 | 0 | (Err(err), _) => Err(err), |
282 | | } |
283 | 0 | }) Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0B7_ Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks19multiaddr_to_socket0B7_ |
284 | 0 | } Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node15network_service5tasks19multiaddr_to_socket Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node15network_service5tasks19multiaddr_to_socket |