/__w/smoldot/smoldot/repo/light-base/src/network_service/tasks.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{ |
19 | | log, |
20 | | platform::{PlatformRef, SubstreamDirection}, |
21 | | }; |
22 | | |
23 | | use alloc::{boxed::Box, string::String}; |
24 | | use core::{pin, time::Duration}; |
25 | | use futures_lite::FutureExt as _; |
26 | | use futures_util::{future, stream::FuturesUnordered, StreamExt as _}; |
27 | | use smoldot::{libp2p::collection::SubstreamFate, network::service}; |
28 | | |
29 | | /// Asynchronous task managing a specific single-stream connection. |
30 | 0 | pub(super) async fn single_stream_connection_task<TPlat: PlatformRef>( |
31 | 0 | mut connection: TPlat::Stream, |
32 | 0 | address_string: String, |
33 | 0 | platform: TPlat, |
34 | 0 | connection_id: service::ConnectionId, |
35 | 0 | connection_task: service::SingleStreamConnectionTask<TPlat::Instant>, |
36 | 0 | coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>, |
37 | 0 | connection_to_coordinator: async_channel::Sender<( |
38 | 0 | service::ConnectionId, |
39 | 0 | service::ConnectionToCoordinator, |
40 | 0 | )>, |
41 | 0 | ) { Unexecuted instantiation: _RINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpEB6_ Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefEB1v_ Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpEB6_ |
42 | 0 | // We need to pin the receiver, as the type doesn't implement `Unpin`. |
43 | 0 | let mut coordinator_to_connection = pin::pin!(coordinator_to_connection); |
44 | 0 | // We also need to pin the socket, as we don't know whether it implements `Unpin`. |
45 | 0 | let mut socket = pin::pin!(connection); |
46 | 0 |
|
47 | 0 | // Future that sends a message to the coordinator. Only one message is sent to the coordinator |
48 | 0 | // at a time. `None` if no message is being sent. |
49 | 0 | let mut message_sending = pin::pin!(None); |
50 | 0 |
|
51 | 0 | // Wrap `connection_task` within an `Option`. It will become `None` if the connection task |
52 | 0 | // wants to self-destruct. |
53 | 0 | let mut connection_task = Some(connection_task); |
54 | | |
55 | | loop { |
56 | | // Yield at every loop in order to provide better tasks granularity. |
57 | 0 | futures_lite::future::yield_now().await; |
58 | | |
59 | | // Because only one message should be sent to the coordinator at a time, and that |
60 | | // processing the socket might generate a message, we only process the socket if no |
61 | | // message is currently being sent. |
62 | 0 | if message_sending.is_none() && connection_task.is_some() { |
63 | 0 | let mut task = connection_task.take().unwrap(); |
64 | 0 |
|
65 | 0 | match platform.read_write_access(socket.as_mut()) { |
66 | 0 | Ok(mut socket_read_write) => { |
67 | 0 | // The code in this block is a bit cumbersome due to the logging. |
68 | 0 | let read_bytes_before = socket_read_write.read_bytes; |
69 | 0 | let written_bytes_before = socket_read_write.write_bytes_queued; |
70 | 0 | let write_closed = socket_read_write.write_bytes_queueable.is_none(); |
71 | 0 |
|
72 | 0 | task.read_write(&mut *socket_read_write); |
73 | 0 |
|
74 | 0 | if socket_read_write.read_bytes != read_bytes_before |
75 | 0 | || socket_read_write.write_bytes_queued != written_bytes_before |
76 | 0 | || (!write_closed && socket_read_write.write_bytes_queueable.is_none()) |
77 | 0 | { |
78 | 0 | log!( |
79 | 0 | &platform, |
80 | 0 | Trace, |
81 | 0 | "connections", |
82 | 0 | "connection-activity", |
83 | 0 | address = address_string, |
84 | 0 | read = socket_read_write.read_bytes - read_bytes_before, |
85 | 0 | written = socket_read_write.write_bytes_queued - written_bytes_before, |
86 | 0 | wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| { |
87 | 0 | if *w > socket_read_write.now { |
88 | 0 | w.clone() - socket_read_write.now.clone() |
89 | | } else { |
90 | 0 | Duration::new(0, 0) |
91 | | } |
92 | 0 | }), Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s2_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s2_0B1z_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s2_0Ba_ |
93 | 0 | write_closed = socket_read_write.write_bytes_queueable.is_none(), |
94 | 0 | ); |
95 | 0 | } |
96 | | } |
97 | 0 | Err(err) => { |
98 | 0 | // Error on the socket. |
99 | 0 | if !task.is_reset_called() { |
100 | 0 | log!( |
101 | 0 | &platform, |
102 | 0 | Trace, |
103 | 0 | "connections", |
104 | 0 | "reset", |
105 | 0 | address = address_string, |
106 | 0 | reason = ?err |
107 | 0 | ); |
108 | 0 | task.reset(); |
109 | 0 | } |
110 | | } |
111 | | } |
112 | | |
113 | | // Try pull message to send to the coordinator. |
114 | | |
115 | | // Calling this method takes ownership of the task and returns that task if it has |
116 | | // more work to do. If `None` is returned, then the entire task is gone and the |
117 | | // connection must be abruptly closed, which is what happens when we return from |
118 | | // this function. |
119 | 0 | let (task_update, message) = task.pull_message_to_coordinator(); |
120 | 0 | connection_task = task_update; |
121 | 0 |
|
122 | 0 | debug_assert!(message_sending.is_none()); |
123 | 0 | if let Some(message) = message { |
124 | 0 | message_sending.set(Some( |
125 | 0 | connection_to_coordinator.send((connection_id, message)), |
126 | 0 | )); |
127 | 0 | } |
128 | 0 | } |
129 | | |
130 | | // Now wait for something interesting to happen before looping again. |
131 | | |
132 | | enum WakeUpReason { |
133 | | CoordinatorMessage(service::CoordinatorToConnection), |
134 | | CoordinatorDead, |
135 | | SocketEvent, |
136 | | MessageSent, |
137 | | } |
138 | | |
139 | 0 | let wake_up_reason: WakeUpReason = { |
140 | | // If the connection task has self-destructed and that no message is being sent, stop |
141 | | // the task altogether as nothing will happen. |
142 | 0 | if connection_task.is_none() && message_sending.is_none() { |
143 | 0 | log!( |
144 | 0 | &platform, |
145 | 0 | Trace, |
146 | 0 | "connections", |
147 | 0 | "shutdown", |
148 | 0 | address = address_string |
149 | 0 | ); |
150 | 0 | return; |
151 | 0 | } |
152 | 0 |
|
153 | 0 | let coordinator_message = async { |
154 | 0 | match coordinator_to_connection.next().await { |
155 | 0 | Some(msg) => WakeUpReason::CoordinatorMessage(msg), |
156 | 0 | None => WakeUpReason::CoordinatorDead, |
157 | | } |
158 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE00Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE00B1z_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE00Ba_ |
159 | | |
160 | 0 | let socket_event = { |
161 | 0 | // The future returned by `wait_read_write_again` yields when `read_write_access` |
162 | 0 | // must be called. Because we only call `read_write_access` when `message_sending` |
163 | 0 | // is `None`, we also call `wait_read_write_again` only when `message_sending` is |
164 | 0 | // `None`. |
165 | 0 | let fut = if message_sending.as_ref().as_pin_ref().is_none() { |
166 | 0 | Some(platform.wait_read_write_again(socket.as_mut())) |
167 | | } else { |
168 | 0 | None |
169 | | }; |
170 | 0 | async { |
171 | 0 | if let Some(fut) = fut { |
172 | 0 | fut.await; |
173 | 0 | WakeUpReason::SocketEvent |
174 | | } else { |
175 | 0 | future::pending().await |
176 | | } |
177 | 0 | } Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s_0B1z_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s_0Ba_ |
178 | | }; |
179 | | |
180 | 0 | let message_sent = async { |
181 | 0 | let result = if let Some(message_sending) = message_sending.as_mut().as_pin_mut() { |
182 | 0 | message_sending.await |
183 | | } else { |
184 | 0 | future::pending().await |
185 | | }; |
186 | 0 | message_sending.set(None); |
187 | 0 | if result.is_ok() { |
188 | 0 | WakeUpReason::MessageSent |
189 | | } else { |
190 | 0 | WakeUpReason::CoordinatorDead |
191 | | } |
192 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s0_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s0_0B1z_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s0_0Ba_ |
193 | | |
194 | 0 | coordinator_message.or(socket_event).or(message_sent).await |
195 | | }; |
196 | | |
197 | 0 | match wake_up_reason { |
198 | 0 | WakeUpReason::CoordinatorMessage(message) => { |
199 | 0 | // The coordinator normally guarantees that no message is sent after the task |
200 | 0 | // is destroyed. |
201 | 0 | let connection_task = connection_task.as_mut().unwrap_or_else(|| unreachable!()); Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s1_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_0B1z_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s1_0Ba_ |
202 | 0 | connection_task.inject_coordinator_message(&platform.now(), message); |
203 | 0 | } |
204 | | WakeUpReason::CoordinatorDead => { |
205 | 0 | log!( |
206 | 0 | &platform, |
207 | 0 | Trace, |
208 | 0 | "connections", |
209 | 0 | "shutdown", |
210 | 0 | address = address_string |
211 | 0 | ); |
212 | 0 | return; |
213 | | } |
214 | 0 | WakeUpReason::SocketEvent => {} |
215 | 0 | WakeUpReason::MessageSent => {} |
216 | | } |
217 | | } |
218 | 0 | } Unexecuted instantiation: _RNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0B8_ Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0B1x_ Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0B8_ |
219 | | |
220 | | /// Asynchronous task managing a specific multi-stream connection. |
221 | | /// |
222 | | /// > **Note**: This function is specific to WebRTC in the sense that it checks whether the reading |
223 | | /// > and writing sides of substreams never close, and adjusts the size of the write |
224 | | /// > buffer to not go over the frame size limit of WebRTC. It can easily be made more |
225 | | /// > general-purpose. |
226 | 0 | pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>( |
227 | 0 | mut connection: TPlat::MultiStream, |
228 | 0 | address_string: String, |
229 | 0 | platform: TPlat, |
230 | 0 | connection_id: service::ConnectionId, |
231 | 0 | mut connection_task: service::MultiStreamConnectionTask<TPlat::Instant, usize>, |
232 | 0 | mut coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>, |
233 | 0 | connection_to_coordinator: async_channel::Sender<( |
234 | 0 | service::ConnectionId, |
235 | 0 | service::ConnectionToCoordinator, |
236 | 0 | )>, |
237 | 0 | ) { Unexecuted instantiation: _RINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpEB6_ Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefEB1B_ Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpEB6_ |
238 | 0 | // Future that sends a message to the coordinator. Only one message is sent to the coordinator |
239 | 0 | // at a time. `None` if no message is being sent. |
240 | 0 | let mut message_sending = pin::pin!(None); |
241 | 0 | // Number of substreams that are currently being opened by the `PlatformRef` implementation |
242 | 0 | // and that the `connection_task` state machine isn't aware of yet. |
243 | 0 | let mut pending_opening_out_substreams = 0; |
244 | 0 | // Stream that yields an item whenever a substream is ready to be read-written. |
245 | 0 | // TODO: we box the future because of the type checker being annoying |
246 | 0 | let mut when_substreams_rw_ready = FuturesUnordered::< |
247 | 0 | pin::Pin<Box<dyn future::Future<Output = (pin::Pin<Box<TPlat::Stream>>, usize)> + Send>>, |
248 | 0 | >::new(); |
249 | 0 | // Identifier to assign to the next substream. |
250 | 0 | // TODO: weird API |
251 | 0 | let mut next_substream_id = 0; |
252 | 0 | // We need to pin the receiver, as the type doesn't implement `Unpin`. |
253 | 0 | let mut coordinator_to_connection = pin::pin!(coordinator_to_connection); |
254 | | |
255 | | loop { |
256 | | // Start opening new outbound substreams, if needed. |
257 | 0 | for _ in 0..connection_task |
258 | 0 | .desired_outbound_substreams() |
259 | 0 | .saturating_sub(pending_opening_out_substreams) |
260 | 0 | { |
261 | 0 | log!( |
262 | 0 | &platform, |
263 | 0 | Trace, |
264 | 0 | "connections", |
265 | 0 | "substream-open-start", |
266 | 0 | address = address_string |
267 | 0 | ); |
268 | 0 | platform.open_out_substream(&mut connection); |
269 | 0 | pending_opening_out_substreams += 1; |
270 | 0 | } |
271 | | |
272 | | // Now wait for something interesting to happen before looping again. |
273 | | |
274 | | enum WakeUpReason<TPlat: PlatformRef> { |
275 | | CoordinatorMessage(service::CoordinatorToConnection), |
276 | | CoordinatorDead, |
277 | | SocketEvent(pin::Pin<Box<TPlat::Stream>>, usize), |
278 | | MessageSent, |
279 | | NewSubstream(TPlat::Stream, SubstreamDirection), |
280 | | ConnectionReset, |
281 | | } |
282 | | |
283 | 0 | let wake_up_reason: WakeUpReason<TPlat> = { |
284 | 0 | let coordinator_message = async { |
285 | 0 | match coordinator_to_connection.next().await { |
286 | 0 | Some(msg) => WakeUpReason::CoordinatorMessage(msg), |
287 | 0 | None => WakeUpReason::CoordinatorDead, |
288 | | } |
289 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE00Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE00B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE00Ba_ |
290 | | |
291 | 0 | let socket_event = { |
292 | 0 | // The future returned by `wait_read_write_again` yields when `read_write_access` |
293 | 0 | // must be called. Because we only call `read_write_access` when `message_sending` |
294 | 0 | // is `None`, we also call `wait_read_write_again` only when `message_sending` is |
295 | 0 | // `None`. |
296 | 0 | let fut = if message_sending.as_ref().as_pin_ref().is_none() |
297 | 0 | && !when_substreams_rw_ready.is_empty() |
298 | | { |
299 | 0 | Some(when_substreams_rw_ready.select_next_some()) |
300 | | } else { |
301 | 0 | None |
302 | | }; |
303 | 0 | async move { |
304 | 0 | if let Some(fut) = fut { |
305 | 0 | let (stream, substream_id) = fut.await; |
306 | 0 | WakeUpReason::SocketEvent(stream, substream_id) |
307 | | } else { |
308 | 0 | future::pending().await |
309 | | } |
310 | 0 | } Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s_0Ba_ |
311 | | }; |
312 | | |
313 | 0 | let message_sent = async { |
314 | 0 | let result: Result<(), _> = |
315 | 0 | if let Some(message_sending) = message_sending.as_mut().as_pin_mut() { |
316 | 0 | message_sending.await |
317 | | } else { |
318 | 0 | future::pending().await |
319 | | }; |
320 | 0 | message_sending.set(None); |
321 | 0 | if result.is_ok() { |
322 | 0 | WakeUpReason::MessageSent |
323 | | } else { |
324 | 0 | WakeUpReason::CoordinatorDead |
325 | | } |
326 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s0_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s0_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s0_0Ba_ |
327 | | |
328 | | // Future that is woken up when a new substream is available. |
329 | 0 | let next_substream = async { |
330 | 0 | if connection_task.is_reset_called() { |
331 | 0 | future::pending().await |
332 | | } else { |
333 | 0 | match platform.next_substream(&mut connection).await { |
334 | 0 | Some((stream, direction)) => WakeUpReason::NewSubstream(stream, direction), |
335 | 0 | None => WakeUpReason::ConnectionReset, |
336 | | } |
337 | | } |
338 | 0 | }; Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s1_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s1_0Ba_ |
339 | | |
340 | 0 | coordinator_message |
341 | 0 | .or(socket_event) |
342 | 0 | .or(message_sent) |
343 | 0 | .or(next_substream) |
344 | 0 | .await |
345 | | }; |
346 | | |
347 | 0 | match wake_up_reason { |
348 | 0 | WakeUpReason::CoordinatorMessage(message) => { |
349 | 0 | connection_task.inject_coordinator_message(&platform.now(), message); |
350 | 0 | } |
351 | | WakeUpReason::CoordinatorDead => { |
352 | 0 | log!( |
353 | 0 | &platform, |
354 | 0 | Trace, |
355 | 0 | "connections", |
356 | 0 | "shutdown", |
357 | 0 | address = address_string |
358 | 0 | ); |
359 | 0 | return; |
360 | | } |
361 | 0 | WakeUpReason::SocketEvent(mut socket, substream_id) => { |
362 | 0 | debug_assert!(message_sending.is_none()); |
363 | | |
364 | 0 | let substream_fate = match platform.read_write_access(socket.as_mut()) { |
365 | 0 | Ok(mut socket_read_write) => { |
366 | 0 | // The code in this block is a bit cumbersome due to the logging. |
367 | 0 | let read_bytes_before = socket_read_write.read_bytes; |
368 | 0 | let written_bytes_before = socket_read_write.write_bytes_queued; |
369 | 0 | let write_closed = socket_read_write.write_bytes_queueable.is_none(); |
370 | 0 |
|
371 | 0 | let substream_fate = connection_task |
372 | 0 | .substream_read_write(&substream_id, &mut *socket_read_write); |
373 | 0 |
|
374 | 0 | if socket_read_write.read_bytes != read_bytes_before |
375 | 0 | || socket_read_write.write_bytes_queued != written_bytes_before |
376 | 0 | || (!write_closed && socket_read_write.write_bytes_queueable.is_none()) |
377 | 0 | { |
378 | 0 | log!( |
379 | 0 | &platform, |
380 | 0 | Trace, |
381 | 0 | "connections", |
382 | 0 | "connection-activity", |
383 | 0 | address = address_string, |
384 | 0 | read = socket_read_write.read_bytes - read_bytes_before, |
385 | 0 | written = socket_read_write.write_bytes_queued - written_bytes_before, |
386 | 0 | wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| { |
387 | 0 | if *w > socket_read_write.now { |
388 | 0 | w.clone() - socket_read_write.now.clone() |
389 | | } else { |
390 | 0 | Duration::new(0, 0) |
391 | | } |
392 | 0 | }), Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s4_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s4_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s4_0Ba_ |
393 | 0 | write_close = ?socket_read_write.write_bytes_queueable.is_none(), |
394 | 0 | ); |
395 | 0 | } |
396 | | |
397 | 0 | if let SubstreamFate::Reset = substream_fate { |
398 | 0 | log!( |
399 | 0 | &platform, |
400 | 0 | Trace, |
401 | 0 | "connections", |
402 | 0 | "reset-substream", |
403 | 0 | address = address_string, |
404 | 0 | substream_id |
405 | 0 | ); |
406 | 0 | } |
407 | | |
408 | 0 | substream_fate |
409 | | } |
410 | 0 | Err(err) => { |
411 | 0 | // Error on the substream. |
412 | 0 | log!( |
413 | 0 | &platform, |
414 | 0 | Trace, |
415 | 0 | "connections", |
416 | 0 | "substream-reset-by-remote", |
417 | 0 | address = address_string, |
418 | 0 | substream_id, |
419 | 0 | error = ?err |
420 | 0 | ); |
421 | 0 | connection_task.reset_substream(&substream_id); |
422 | 0 | SubstreamFate::Reset |
423 | | } |
424 | | }; |
425 | | |
426 | | // Try pull message to send to the coordinator. |
427 | | |
428 | | // Calling this method takes ownership of the task and returns that task if it has |
429 | | // more work to do. If `None` is returned, then the entire task is gone and the |
430 | | // connection must be abruptly closed, which is what happens when we return from |
431 | | // this function. |
432 | 0 | let (task_update, message) = connection_task.pull_message_to_coordinator(); |
433 | 0 | if let Some(task_update) = task_update { |
434 | 0 | connection_task = task_update; |
435 | 0 | debug_assert!(message_sending.is_none()); |
436 | 0 | if let Some(message) = message { |
437 | 0 | message_sending.set(Some( |
438 | 0 | connection_to_coordinator.send((connection_id, message)), |
439 | 0 | )); |
440 | 0 | } |
441 | | } else { |
442 | 0 | log!( |
443 | 0 | &platform, |
444 | 0 | Trace, |
445 | 0 | "connections", |
446 | 0 | "shutdown", |
447 | 0 | address = address_string |
448 | 0 | ); |
449 | 0 | return; |
450 | | } |
451 | | |
452 | | // Put back the stream in `when_substreams_rw_ready`. |
453 | 0 | if let SubstreamFate::Continue = substream_fate { |
454 | 0 | when_substreams_rw_ready.push({ |
455 | 0 | let platform = platform.clone(); |
456 | 0 | Box::pin(async move { |
457 | 0 | platform.wait_read_write_again(socket.as_mut()).await; |
458 | 0 | (socket, substream_id) |
459 | 0 | }) Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s2_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s2_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s2_0Ba_ |
460 | 0 | }); |
461 | 0 | } |
462 | | } |
463 | 0 | WakeUpReason::MessageSent => {} |
464 | | WakeUpReason::ConnectionReset => { |
465 | 0 | debug_assert!(!connection_task.is_reset_called()); |
466 | 0 | log!( |
467 | 0 | &platform, |
468 | 0 | Trace, |
469 | 0 | "connections", |
470 | 0 | "reset", |
471 | 0 | address = address_string |
472 | 0 | ); |
473 | 0 | connection_task.reset(); |
474 | | } |
475 | 0 | WakeUpReason::NewSubstream(substream, direction) => { |
476 | 0 | let outbound = match direction { |
477 | 0 | SubstreamDirection::Outbound => true, |
478 | 0 | SubstreamDirection::Inbound => false, |
479 | | }; |
480 | 0 | let substream_id = next_substream_id; |
481 | 0 | next_substream_id += 1; |
482 | 0 | log!( |
483 | 0 | &platform, |
484 | 0 | Trace, |
485 | 0 | "connections", |
486 | 0 | "substream-opened", |
487 | 0 | address = address_string, |
488 | 0 | substream_id, |
489 | 0 | ?direction |
490 | 0 | ); |
491 | 0 | connection_task.add_substream(substream_id, outbound); |
492 | 0 | if outbound { |
493 | 0 | pending_opening_out_substreams -= 1; |
494 | 0 | } |
495 | | |
496 | 0 | when_substreams_rw_ready |
497 | 0 | .push(Box::pin(async move { (Box::pin(substream), substream_id) })); Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s3_0Ba_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s3_0B1F_ Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s3_0Ba_ |
498 | | } |
499 | | } |
500 | | } |
501 | 0 | } Unexecuted instantiation: _RNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0B8_ Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0B1D_ Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0B8_ |