Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/light-base/src/network_service/tasks.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{
19
    log,
20
    platform::{PlatformRef, SubstreamDirection},
21
};
22
23
use alloc::{boxed::Box, string::String};
24
use core::{pin, time::Duration};
25
use futures_lite::FutureExt as _;
26
use futures_util::{future, stream::FuturesUnordered, StreamExt as _};
27
use smoldot::{libp2p::collection::SubstreamFate, network::service};
28
29
/// Asynchronous task managing a specific single-stream connection.
30
0
pub(super) async fn single_stream_connection_task<TPlat: PlatformRef>(
31
0
    mut connection: TPlat::Stream,
32
0
    address_string: String,
33
0
    platform: TPlat,
34
0
    connection_id: service::ConnectionId,
35
0
    connection_task: service::SingleStreamConnectionTask<TPlat::Instant>,
36
0
    coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>,
37
0
    connection_to_coordinator: async_channel::Sender<(
38
0
        service::ConnectionId,
39
0
        service::ConnectionToCoordinator,
40
0
    )>,
41
0
) {
Unexecuted instantiation: _RINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpEB6_
Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefEB1v_
Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpEB6_
42
0
    // We need to pin the receiver, as the type doesn't implement `Unpin`.
43
0
    let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
44
0
    // We also need to pin the socket, as we don't know whether it implements `Unpin`.
45
0
    let mut socket = pin::pin!(connection);
46
0
47
0
    // Future that sends a message to the coordinator. Only one message is sent to the coordinator
48
0
    // at a time. `None` if no message is being sent.
49
0
    let mut message_sending = pin::pin!(None);
50
0
51
0
    // Wrap `connection_task` within an `Option`. It will become `None` if the connection task
52
0
    // wants to self-destruct.
53
0
    let mut connection_task = Some(connection_task);
54
55
    loop {
56
        // Yield at every loop in order to provide better tasks granularity.
57
0
        futures_lite::future::yield_now().await;
58
59
        // Because only one message should be sent to the coordinator at a time, and that
60
        // processing the socket might generate a message, we only process the socket if no
61
        // message is currently being sent.
62
0
        if message_sending.is_none() && connection_task.is_some() {
63
0
            let mut task = connection_task.take().unwrap();
64
0
65
0
            match platform.read_write_access(socket.as_mut()) {
66
0
                Ok(mut socket_read_write) => {
67
0
                    // The code in this block is a bit cumbersome due to the logging.
68
0
                    let read_bytes_before = socket_read_write.read_bytes;
69
0
                    let written_bytes_before = socket_read_write.write_bytes_queued;
70
0
                    let write_closed = socket_read_write.write_bytes_queueable.is_none();
71
0
72
0
                    task.read_write(&mut *socket_read_write);
73
0
74
0
                    if socket_read_write.read_bytes != read_bytes_before
75
0
                        || socket_read_write.write_bytes_queued != written_bytes_before
76
0
                        || (!write_closed && socket_read_write.write_bytes_queueable.is_none())
77
0
                    {
78
0
                        log!(
79
0
                            &platform,
80
0
                            Trace,
81
0
                            "connections",
82
0
                            "connection-activity",
83
0
                            address = address_string,
84
0
                            read = socket_read_write.read_bytes - read_bytes_before,
85
0
                            written = socket_read_write.write_bytes_queued - written_bytes_before,
86
0
                            wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| {
87
0
                                if *w > socket_read_write.now {
88
0
                                    w.clone() - socket_read_write.now.clone()
89
                                } else {
90
0
                                    Duration::new(0, 0)
91
                                }
92
0
                            }),
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s2_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s2_0B1z_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s2_0Ba_
93
0
                            write_closed = socket_read_write.write_bytes_queueable.is_none(),
94
0
                        );
95
0
                    }
96
                }
97
0
                Err(err) => {
98
0
                    // Error on the socket.
99
0
                    if !task.is_reset_called() {
100
0
                        log!(
101
0
                            &platform,
102
0
                            Trace,
103
0
                            "connections",
104
0
                            "reset",
105
0
                            address = address_string,
106
0
                            reason = ?err
107
0
                        );
108
0
                        task.reset();
109
0
                    }
110
                }
111
            }
112
113
            // Try pull message to send to the coordinator.
114
115
            // Calling this method takes ownership of the task and returns that task if it has
116
            // more work to do. If `None` is returned, then the entire task is gone and the
117
            // connection must be abruptly closed, which is what happens when we return from
118
            // this function.
119
0
            let (task_update, message) = task.pull_message_to_coordinator();
120
0
            connection_task = task_update;
121
0
122
0
            debug_assert!(message_sending.is_none());
123
0
            if let Some(message) = message {
124
0
                message_sending.set(Some(
125
0
                    connection_to_coordinator.send((connection_id, message)),
126
0
                ));
127
0
            }
128
0
        }
129
130
        // Now wait for something interesting to happen before looping again.
131
132
        enum WakeUpReason {
133
            CoordinatorMessage(service::CoordinatorToConnection),
134
            CoordinatorDead,
135
            SocketEvent,
136
            MessageSent,
137
        }
138
139
0
        let wake_up_reason: WakeUpReason = {
140
            // If the connection task has self-destructed and that no message is being sent, stop
141
            // the task altogether as nothing will happen.
142
0
            if connection_task.is_none() && message_sending.is_none() {
143
0
                log!(
144
0
                    &platform,
145
0
                    Trace,
146
0
                    "connections",
147
0
                    "shutdown",
148
0
                    address = address_string
149
0
                );
150
0
                return;
151
0
            }
152
0
153
0
            let coordinator_message = async {
154
0
                match coordinator_to_connection.next().await {
155
0
                    Some(msg) => WakeUpReason::CoordinatorMessage(msg),
156
0
                    None => WakeUpReason::CoordinatorDead,
157
                }
158
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE00Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE00B1z_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE00Ba_
159
160
0
            let socket_event = {
161
0
                // The future returned by `wait_read_write_again` yields when `read_write_access`
162
0
                // must be called. Because we only call `read_write_access` when `message_sending`
163
0
                // is `None`, we also call `wait_read_write_again` only when `message_sending` is
164
0
                // `None`.
165
0
                let fut = if message_sending.as_ref().as_pin_ref().is_none() {
166
0
                    Some(platform.wait_read_write_again(socket.as_mut()))
167
                } else {
168
0
                    None
169
                };
170
0
                async {
171
0
                    if let Some(fut) = fut {
172
0
                        fut.await;
173
0
                        WakeUpReason::SocketEvent
174
                    } else {
175
0
                        future::pending().await
176
                    }
177
0
                }
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s_0B1z_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s_0Ba_
178
            };
179
180
0
            let message_sent = async {
181
0
                let result = if let Some(message_sending) = message_sending.as_mut().as_pin_mut() {
182
0
                    message_sending.await
183
                } else {
184
0
                    future::pending().await
185
                };
186
0
                message_sending.set(None);
187
0
                if result.is_ok() {
188
0
                    WakeUpReason::MessageSent
189
                } else {
190
0
                    WakeUpReason::CoordinatorDead
191
                }
192
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s0_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s0_0B1z_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s0_0Ba_
193
194
0
            coordinator_message.or(socket_event).or(message_sent).await
195
        };
196
197
0
        match wake_up_reason {
198
0
            WakeUpReason::CoordinatorMessage(message) => {
199
0
                // The coordinator normally guarantees that no message is sent after the task
200
0
                // is destroyed.
201
0
                let connection_task = connection_task.as_mut().unwrap_or_else(|| unreachable!());
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s1_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_0B1z_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0s1_0Ba_
202
0
                connection_task.inject_coordinator_message(&platform.now(), message);
203
0
            }
204
            WakeUpReason::CoordinatorDead => {
205
0
                log!(
206
0
                    &platform,
207
0
                    Trace,
208
0
                    "connections",
209
0
                    "shutdown",
210
0
                    address = address_string
211
0
                );
212
0
                return;
213
            }
214
0
            WakeUpReason::SocketEvent => {}
215
0
            WakeUpReason::MessageSent => {}
216
        }
217
    }
218
0
}
Unexecuted instantiation: _RNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0B8_
Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0B1x_
Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks29single_stream_connection_taskpE0B8_
219
220
/// Asynchronous task managing a specific multi-stream connection.
221
///
222
/// > **Note**: This function is specific to WebRTC in the sense that it checks whether the reading
223
/// >           and writing sides of substreams never close, and adjusts the size of the write
224
/// >           buffer to not go over the frame size limit of WebRTC. It can easily be made more
225
/// >           general-purpose.
226
0
pub(super) async fn webrtc_multi_stream_connection_task<TPlat: PlatformRef>(
227
0
    mut connection: TPlat::MultiStream,
228
0
    address_string: String,
229
0
    platform: TPlat,
230
0
    connection_id: service::ConnectionId,
231
0
    mut connection_task: service::MultiStreamConnectionTask<TPlat::Instant, usize>,
232
0
    mut coordinator_to_connection: async_channel::Receiver<service::CoordinatorToConnection>,
233
0
    connection_to_coordinator: async_channel::Sender<(
234
0
        service::ConnectionId,
235
0
        service::ConnectionToCoordinator,
236
0
    )>,
237
0
) {
Unexecuted instantiation: _RINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpEB6_
Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefEB1B_
Unexecuted instantiation: _RINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpEB6_
238
0
    // Future that sends a message to the coordinator. Only one message is sent to the coordinator
239
0
    // at a time. `None` if no message is being sent.
240
0
    let mut message_sending = pin::pin!(None);
241
0
    // Number of substreams that are currently being opened by the `PlatformRef` implementation
242
0
    // and that the `connection_task` state machine isn't aware of yet.
243
0
    let mut pending_opening_out_substreams = 0;
244
0
    // Stream that yields an item whenever a substream is ready to be read-written.
245
0
    // TODO: we box the future because of the type checker being annoying
246
0
    let mut when_substreams_rw_ready = FuturesUnordered::<
247
0
        pin::Pin<Box<dyn future::Future<Output = (pin::Pin<Box<TPlat::Stream>>, usize)> + Send>>,
248
0
    >::new();
249
0
    // Identifier to assign to the next substream.
250
0
    // TODO: weird API
251
0
    let mut next_substream_id = 0;
252
0
    // We need to pin the receiver, as the type doesn't implement `Unpin`.
253
0
    let mut coordinator_to_connection = pin::pin!(coordinator_to_connection);
254
255
    loop {
256
        // Start opening new outbound substreams, if needed.
257
0
        for _ in 0..connection_task
258
0
            .desired_outbound_substreams()
259
0
            .saturating_sub(pending_opening_out_substreams)
260
0
        {
261
0
            log!(
262
0
                &platform,
263
0
                Trace,
264
0
                "connections",
265
0
                "substream-open-start",
266
0
                address = address_string
267
0
            );
268
0
            platform.open_out_substream(&mut connection);
269
0
            pending_opening_out_substreams += 1;
270
0
        }
271
272
        // Now wait for something interesting to happen before looping again.
273
274
        enum WakeUpReason<TPlat: PlatformRef> {
275
            CoordinatorMessage(service::CoordinatorToConnection),
276
            CoordinatorDead,
277
            SocketEvent(pin::Pin<Box<TPlat::Stream>>, usize),
278
            MessageSent,
279
            NewSubstream(TPlat::Stream, SubstreamDirection),
280
            ConnectionReset,
281
        }
282
283
0
        let wake_up_reason: WakeUpReason<TPlat> = {
284
0
            let coordinator_message = async {
285
0
                match coordinator_to_connection.next().await {
286
0
                    Some(msg) => WakeUpReason::CoordinatorMessage(msg),
287
0
                    None => WakeUpReason::CoordinatorDead,
288
                }
289
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE00Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE00B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE00Ba_
290
291
0
            let socket_event = {
292
0
                // The future returned by `wait_read_write_again` yields when `read_write_access`
293
0
                // must be called. Because we only call `read_write_access` when `message_sending`
294
0
                // is `None`, we also call `wait_read_write_again` only when `message_sending` is
295
0
                // `None`.
296
0
                let fut = if message_sending.as_ref().as_pin_ref().is_none()
297
0
                    && !when_substreams_rw_ready.is_empty()
298
                {
299
0
                    Some(when_substreams_rw_ready.select_next_some())
300
                } else {
301
0
                    None
302
                };
303
0
                async move {
304
0
                    if let Some(fut) = fut {
305
0
                        let (stream, substream_id) = fut.await;
306
0
                        WakeUpReason::SocketEvent(stream, substream_id)
307
                    } else {
308
0
                        future::pending().await
309
                    }
310
0
                }
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s_0Ba_
311
            };
312
313
0
            let message_sent = async {
314
0
                let result: Result<(), _> =
315
0
                    if let Some(message_sending) = message_sending.as_mut().as_pin_mut() {
316
0
                        message_sending.await
317
                    } else {
318
0
                        future::pending().await
319
                    };
320
0
                message_sending.set(None);
321
0
                if result.is_ok() {
322
0
                    WakeUpReason::MessageSent
323
                } else {
324
0
                    WakeUpReason::CoordinatorDead
325
                }
326
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s0_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s0_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s0_0Ba_
327
328
            // Future that is woken up when a new substream is available.
329
0
            let next_substream = async {
330
0
                if connection_task.is_reset_called() {
331
0
                    future::pending().await
332
                } else {
333
0
                    match platform.next_substream(&mut connection).await {
334
0
                        Some((stream, direction)) => WakeUpReason::NewSubstream(stream, direction),
335
0
                        None => WakeUpReason::ConnectionReset,
336
                    }
337
                }
338
0
            };
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s1_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s1_0Ba_
339
340
0
            coordinator_message
341
0
                .or(socket_event)
342
0
                .or(message_sent)
343
0
                .or(next_substream)
344
0
                .await
345
        };
346
347
0
        match wake_up_reason {
348
0
            WakeUpReason::CoordinatorMessage(message) => {
349
0
                connection_task.inject_coordinator_message(&platform.now(), message);
350
0
            }
351
            WakeUpReason::CoordinatorDead => {
352
0
                log!(
353
0
                    &platform,
354
0
                    Trace,
355
0
                    "connections",
356
0
                    "shutdown",
357
0
                    address = address_string
358
0
                );
359
0
                return;
360
            }
361
0
            WakeUpReason::SocketEvent(mut socket, substream_id) => {
362
0
                debug_assert!(message_sending.is_none());
363
364
0
                let substream_fate = match platform.read_write_access(socket.as_mut()) {
365
0
                    Ok(mut socket_read_write) => {
366
0
                        // The code in this block is a bit cumbersome due to the logging.
367
0
                        let read_bytes_before = socket_read_write.read_bytes;
368
0
                        let written_bytes_before = socket_read_write.write_bytes_queued;
369
0
                        let write_closed = socket_read_write.write_bytes_queueable.is_none();
370
0
371
0
                        let substream_fate = connection_task
372
0
                            .substream_read_write(&substream_id, &mut *socket_read_write);
373
0
374
0
                        if socket_read_write.read_bytes != read_bytes_before
375
0
                            || socket_read_write.write_bytes_queued != written_bytes_before
376
0
                            || (!write_closed && socket_read_write.write_bytes_queueable.is_none())
377
0
                        {
378
0
                            log!(
379
0
                                &platform,
380
0
                                Trace,
381
0
                                "connections",
382
0
                                "connection-activity",
383
0
                                address = address_string,
384
0
                                read = socket_read_write.read_bytes - read_bytes_before,
385
0
                                written = socket_read_write.write_bytes_queued - written_bytes_before,
386
0
                                wake_up_after = ?socket_read_write.wake_up_after.as_ref().map(|w| {
387
0
                                    if *w > socket_read_write.now {
388
0
                                        w.clone() - socket_read_write.now.clone()
389
                                    } else {
390
0
                                        Duration::new(0, 0)
391
                                    }
392
0
                                }),
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s4_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s4_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s4_0Ba_
393
0
                                write_close = ?socket_read_write.write_bytes_queueable.is_none(),
394
0
                            );
395
0
                        }
396
397
0
                        if let SubstreamFate::Reset = substream_fate {
398
0
                            log!(
399
0
                                &platform,
400
0
                                Trace,
401
0
                                "connections",
402
0
                                "reset-substream",
403
0
                                address = address_string,
404
0
                                substream_id
405
0
                            );
406
0
                        }
407
408
0
                        substream_fate
409
                    }
410
0
                    Err(err) => {
411
0
                        // Error on the substream.
412
0
                        log!(
413
0
                            &platform,
414
0
                            Trace,
415
0
                            "connections",
416
0
                            "substream-reset-by-remote",
417
0
                            address = address_string,
418
0
                            substream_id,
419
0
                            error = ?err
420
0
                        );
421
0
                        connection_task.reset_substream(&substream_id);
422
0
                        SubstreamFate::Reset
423
                    }
424
                };
425
426
                // Try pull message to send to the coordinator.
427
428
                // Calling this method takes ownership of the task and returns that task if it has
429
                // more work to do. If `None` is returned, then the entire task is gone and the
430
                // connection must be abruptly closed, which is what happens when we return from
431
                // this function.
432
0
                let (task_update, message) = connection_task.pull_message_to_coordinator();
433
0
                if let Some(task_update) = task_update {
434
0
                    connection_task = task_update;
435
0
                    debug_assert!(message_sending.is_none());
436
0
                    if let Some(message) = message {
437
0
                        message_sending.set(Some(
438
0
                            connection_to_coordinator.send((connection_id, message)),
439
0
                        ));
440
0
                    }
441
                } else {
442
0
                    log!(
443
0
                        &platform,
444
0
                        Trace,
445
0
                        "connections",
446
0
                        "shutdown",
447
0
                        address = address_string
448
0
                    );
449
0
                    return;
450
                }
451
452
                // Put back the stream in `when_substreams_rw_ready`.
453
0
                if let SubstreamFate::Continue = substream_fate {
454
0
                    when_substreams_rw_ready.push({
455
0
                        let platform = platform.clone();
456
0
                        Box::pin(async move {
457
0
                            platform.wait_read_write_again(socket.as_mut()).await;
458
0
                            (socket, substream_id)
459
0
                        })
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s2_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s2_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s2_0Ba_
460
0
                    });
461
0
                }
462
            }
463
0
            WakeUpReason::MessageSent => {}
464
            WakeUpReason::ConnectionReset => {
465
0
                debug_assert!(!connection_task.is_reset_called());
466
0
                log!(
467
0
                    &platform,
468
0
                    Trace,
469
0
                    "connections",
470
0
                    "reset",
471
0
                    address = address_string
472
0
                );
473
0
                connection_task.reset();
474
            }
475
0
            WakeUpReason::NewSubstream(substream, direction) => {
476
0
                let outbound = match direction {
477
0
                    SubstreamDirection::Outbound => true,
478
0
                    SubstreamDirection::Inbound => false,
479
                };
480
0
                let substream_id = next_substream_id;
481
0
                next_substream_id += 1;
482
0
                log!(
483
0
                    &platform,
484
0
                    Trace,
485
0
                    "connections",
486
0
                    "substream-opened",
487
0
                    address = address_string,
488
0
                    substream_id,
489
0
                    ?direction
490
0
                );
491
0
                connection_task.add_substream(substream_id, outbound);
492
0
                if outbound {
493
0
                    pending_opening_out_substreams -= 1;
494
0
                }
495
496
0
                when_substreams_rw_ready
497
0
                    .push(Box::pin(async move { (Box::pin(substream), substream_id) }));
Unexecuted instantiation: _RNCNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s3_0Ba_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s3_0B1F_
Unexecuted instantiation: _RNCNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0s3_0Ba_
498
            }
499
        }
500
    }
501
0
}
Unexecuted instantiation: _RNCINvNtNtCsiGub1lfKphe_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0B8_
Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0B1D_
Unexecuted instantiation: _RNCINvNtNtCsih6EgvAwZF2_13smoldot_light15network_service5tasks35webrtc_multi_stream_connection_taskpE0B8_