Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/light-base/src/platform/default.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
#![cfg(feature = "std")]
19
#![cfg_attr(docsrs, doc(cfg(feature = "std")))]
20
21
//! Implementation of the [`PlatformRef`] trait that leverages the operating system.
22
//!
23
//! This module contains the [`DefaultPlatform`] struct, which implements [`PlatformRef`].
24
//!
25
//! The [`DefaultPlatform`] delegates the logging to the `log` crate. In order to see log
26
//! messages, you should register as "logger" as documented by the `log` crate.
27
//! See <https://docs.rs/log>.
28
//!
29
//! # Example
30
//!
31
//! ```rust
32
//! use smoldot_light::{Client, platform::DefaultPlatform};
33
//! env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
34
//! let client = Client::new(DefaultPlatform::new(env!("CARGO_PKG_NAME").into(), env!("CARGO_PKG_VERSION").into()));
35
//! # let _: Client<_, ()> = client;  // Used in this example to infer the generic parameters of the Client
36
//! ```
37
//!
38
39
use super::{
40
    with_buffers, Address, ConnectionType, IpAddr, LogLevel, MultiStreamAddress,
41
    MultiStreamWebRtcConnection, PlatformRef, SubstreamDirection,
42
};
43
44
use alloc::{borrow::Cow, sync::Arc};
45
use core::{
46
    fmt::{self, Write as _},
47
    panic,
48
    pin::Pin,
49
    str,
50
    time::Duration,
51
};
52
use futures_util::{future, FutureExt as _};
53
use smoldot::libp2p::websocket;
54
use std::{
55
    io,
56
    net::SocketAddr,
57
    thread,
58
    time::{Instant, UNIX_EPOCH},
59
};
60
61
/// Implementation of the [`PlatformRef`] trait that leverages the operating system.
62
pub struct DefaultPlatform {
63
    client_name: String,
64
    client_version: String,
65
    tasks_executor: Arc<smol::Executor<'static>>,
66
    shutdown_notify: event_listener::Event,
67
}
68
69
impl DefaultPlatform {
70
    /// Creates a new [`DefaultPlatform`].
71
    ///
72
    /// This function spawns threads in order to execute the background tasks that will later be
73
    /// spawned.
74
    ///
75
    /// Must be passed as "client name" and "client version" that are used in various places
76
    /// such as to answer some JSON-RPC requests. Passing `env!("CARGO_PKG_NAME")` and
77
    /// `env!("CARGO_PKG_VERSION")` is typically very reasonable.
78
    ///
79
    /// # Panic
80
    ///
81
    /// Panics if it wasn't possible to spawn background threads.
82
    ///
83
1
    pub fn new(client_name: String, client_version: String) -> Arc<Self> {
84
1
        let tasks_executor = Arc::new(smol::Executor::new());
85
1
        let shutdown_notify = event_listener::Event::new();
86
87
4
        for n in 0..thread::available_parallelism()
88
1
            .map(|n| n.get())
_RNCNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3new0B8_
Line
Count
Source
88
1
            .map(|n| n.get())
Unexecuted instantiation: _RNCNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3new0B8_
89
1
            .unwrap_or(4)
90
        {
91
            // Note that `listen()` must be called here (and not in the thread being spawned), as
92
            // it might be notified as soon as `DefaultPlatform::new` returns.
93
4
            let on_shutdown = shutdown_notify.listen();
94
4
            let tasks_executor = tasks_executor.clone();
95
4
96
4
            let spawn_result = thread::Builder::new()
97
4
                .name(format!("smoldot-light-{}", n))
98
4
                .spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
_RNCNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3news_0B8_
Line
Count
Source
98
4
                .spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
Unexecuted instantiation: _RNCNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3news_0B8_
99
100
4
            if let Err(
err0
) = spawn_result {
101
0
                panic!("Failed to spawn execution thread: {err}");
102
4
            }
103
        }
104
105
1
        Arc::new(DefaultPlatform {
106
1
            client_name,
107
1
            client_version,
108
1
            tasks_executor,
109
1
            shutdown_notify,
110
1
        })
111
1
    }
_RNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB2_15DefaultPlatform3new
Line
Count
Source
83
1
    pub fn new(client_name: String, client_version: String) -> Arc<Self> {
84
1
        let tasks_executor = Arc::new(smol::Executor::new());
85
1
        let shutdown_notify = event_listener::Event::new();
86
87
4
        for n in 0..thread::available_parallelism()
88
1
            .map(|n| n.get())
89
1
            .unwrap_or(4)
90
        {
91
            // Note that `listen()` must be called here (and not in the thread being spawned), as
92
            // it might be notified as soon as `DefaultPlatform::new` returns.
93
4
            let on_shutdown = shutdown_notify.listen();
94
4
            let tasks_executor = tasks_executor.clone();
95
4
96
4
            let spawn_result = thread::Builder::new()
97
4
                .name(format!("smoldot-light-{}", n))
98
4
                .spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
99
100
4
            if let Err(
err0
) = spawn_result {
101
0
                panic!("Failed to spawn execution thread: {err}");
102
4
            }
103
        }
104
105
1
        Arc::new(DefaultPlatform {
106
1
            client_name,
107
1
            client_version,
108
1
            tasks_executor,
109
1
            shutdown_notify,
110
1
        })
111
1
    }
Unexecuted instantiation: _RNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB2_15DefaultPlatform3new
112
}
113
114
impl PlatformRef for Arc<DefaultPlatform> {
115
    type Delay = futures_util::future::Map<smol::Timer, fn(Instant) -> ()>;
116
    type Instant = Instant;
117
    type MultiStream = std::convert::Infallible; // TODO: replace with `!` once stable: https://github.com/rust-lang/rust/issues/35121
118
    type Stream = Stream;
119
    type StreamConnectFuture = future::Ready<Self::Stream>;
120
    type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>;
121
    type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>;
122
    type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
123
    type StreamErrorRef<'a> = &'a io::Error;
124
    type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
125
126
0
    fn now_from_unix_epoch(&self) -> Duration {
127
0
        // Intentionally panic if the time is configured earlier than the UNIX EPOCH.
128
0
        UNIX_EPOCH.elapsed().unwrap()
129
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19now_from_unix_epoch
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19now_from_unix_epoch
130
131
0
    fn now(&self) -> Self::Instant {
132
0
        Instant::now()
133
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef3now
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef3now
134
135
0
    fn fill_random_bytes(&self, buffer: &mut [u8]) {
136
0
        rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
137
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17fill_random_bytes
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17fill_random_bytes
138
139
0
    fn sleep(&self, duration: Duration) -> Self::Delay {
140
0
        smol::Timer::after(duration).map(|_| ())
Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef5sleep0Ba_
Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef5sleep0Ba_
141
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef5sleep
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef5sleep
142
143
0
    fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
144
0
        smol::Timer::at(when).map(|_| ())
Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef11sleep_until0Ba_
Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef11sleep_until0Ba_
145
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11sleep_until
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11sleep_until
146
147
1
    fn spawn_task(
148
1
        &self,
149
1
        _task_name: Cow<str>,
150
1
        task: impl future::Future<Output = ()> + Send + 'static,
151
1
    ) {
152
1
        // In order to make sure that the execution threads don't stop if there are still
153
1
        // tasks to execute, we hold a copy of the `Arc<DefaultPlatform>` inside of the task until
154
1
        // it is finished.
155
1
        let _dummy_keep_alive = self.clone();
156
1
        self.tasks_executor
157
1
            .spawn(
158
1
                panic::AssertUnwindSafe(async move {
159
1
                    task.
await0
;
160
1
                    drop(_dummy_keep_alive);
161
1
                })
_RNCINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB7_15DefaultPlatformENtB9_11PlatformRef10spawn_taskNCNvNtB7_5testss_22tasks_run_indefinitely0E0Bb_
Line
Count
Source
158
1
                panic::AssertUnwindSafe(async move {
159
1
                    task.
await0
;
160
1
                    drop(_dummy_keep_alive);
161
1
                })
Unexecuted instantiation: _RNCINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB7_15DefaultPlatformENtB9_11PlatformRef10spawn_taskpE0Bb_
162
1
                .catch_unwind(),
163
1
            )
164
1
            .detach();
165
1
    }
_RINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef10spawn_taskNCNvNtB5_5testss_22tasks_run_indefinitely0EB9_
Line
Count
Source
147
1
    fn spawn_task(
148
1
        &self,
149
1
        _task_name: Cow<str>,
150
1
        task: impl future::Future<Output = ()> + Send + 'static,
151
1
    ) {
152
1
        // In order to make sure that the execution threads don't stop if there are still
153
1
        // tasks to execute, we hold a copy of the `Arc<DefaultPlatform>` inside of the task until
154
1
        // it is finished.
155
1
        let _dummy_keep_alive = self.clone();
156
1
        self.tasks_executor
157
1
            .spawn(
158
1
                panic::AssertUnwindSafe(async move {
159
                    task.await;
160
                    drop(_dummy_keep_alive);
161
1
                })
162
1
                .catch_unwind(),
163
1
            )
164
1
            .detach();
165
1
    }
Unexecuted instantiation: _RINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef10spawn_taskpEB9_
166
167
0
    fn log<'a>(
168
0
        &self,
169
0
        log_level: LogLevel,
170
0
        log_target: &'a str,
171
0
        message: &'a str,
172
0
        key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
173
0
    ) {
174
        // Note that this conversion is most likely completely optimized out by the compiler due
175
        // to log levels having the same numerical values.
176
0
        let log_level = match log_level {
177
0
            LogLevel::Error => log::Level::Error,
178
0
            LogLevel::Warn => log::Level::Warn,
179
0
            LogLevel::Info => log::Level::Info,
180
0
            LogLevel::Debug => log::Level::Debug,
181
0
            LogLevel::Trace => log::Level::Trace,
182
        };
183
184
0
        let mut message_build = String::with_capacity(128);
185
0
        message_build.push_str(message);
186
0
        let mut first = true;
187
0
        for (key, value) in key_values {
188
0
            if first {
189
0
                let _ = write!(message_build, "; ");
190
0
                first = false;
191
0
            } else {
192
0
                let _ = write!(message_build, ", ");
193
0
            }
194
0
            let _ = write!(message_build, "{}={}", key, value);
195
        }
196
197
0
        log::logger().log(
198
0
            &log::RecordBuilder::new()
199
0
                .level(log_level)
200
0
                .target(log_target)
201
0
                .args(format_args!("{}", message_build))
202
0
                .build(),
203
0
        )
204
0
    }
Unexecuted instantiation: _RINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef3logpEB9_
Unexecuted instantiation: _RINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef3logpEB9_
205
206
0
    fn client_name(&self) -> Cow<str> {
207
0
        Cow::Borrowed(&self.client_name)
208
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11client_name
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11client_name
209
210
0
    fn client_version(&self) -> Cow<str> {
211
0
        Cow::Borrowed(&self.client_version)
212
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14client_version
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14client_version
213
214
0
    fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
215
        // TODO: support WebSocket secure
216
0
        matches!(
217
0
            connection_type,
218
            ConnectionType::TcpIpv4
219
                | ConnectionType::TcpIpv6
220
                | ConnectionType::TcpDns
221
                | ConnectionType::WebSocketIpv4 { .. }
222
                | ConnectionType::WebSocketIpv6 { .. }
223
                | ConnectionType::WebSocketDns { secure: false, .. }
224
        )
225
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef24supports_connection_type
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef24supports_connection_type
226
227
0
    fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
228
0
        let (tcp_socket_addr, host_if_websocket): (
229
0
            either::Either<SocketAddr, (String, u16)>,
230
0
            Option<String>,
231
0
        ) = match multiaddr {
232
0
            Address::TcpDns { hostname, port } => {
233
0
                (either::Right((hostname.to_string(), port)), None)
234
            }
235
            Address::TcpIp {
236
0
                ip: IpAddr::V4(ip),
237
0
                port,
238
0
            } => (either::Left(SocketAddr::from((ip, port))), None),
239
            Address::TcpIp {
240
0
                ip: IpAddr::V6(ip),
241
0
                port,
242
0
            } => (either::Left(SocketAddr::from((ip, port))), None),
243
            Address::WebSocketDns {
244
0
                hostname,
245
0
                port,
246
0
                secure: false,
247
0
            } => (
248
0
                either::Right((hostname.to_string(), port)),
249
0
                Some(format!("{}:{}", hostname, port)),
250
0
            ),
251
            Address::WebSocketIp {
252
0
                ip: IpAddr::V4(ip),
253
0
                port,
254
0
            } => {
255
0
                let addr = SocketAddr::from((ip, port));
256
0
                (either::Left(addr), Some(addr.to_string()))
257
            }
258
            Address::WebSocketIp {
259
0
                ip: IpAddr::V6(ip),
260
0
                port,
261
0
            } => {
262
0
                let addr = SocketAddr::from((ip, port));
263
0
                (either::Left(addr), Some(addr.to_string()))
264
            }
265
266
            // The API user of the `PlatformRef` trait is never supposed to open connections of
267
            // a type that isn't supported.
268
0
            _ => unreachable!(),
269
        };
270
271
0
        let socket_future = async {
272
0
            let tcp_socket = match tcp_socket_addr {
273
0
                either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await,
274
0
                either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await,
275
            };
276
277
0
            if let Ok(tcp_socket) = &tcp_socket {
278
0
                let _ = tcp_socket.set_nodelay(true);
279
0
            }
280
281
0
            match (tcp_socket, host_if_websocket) {
282
0
                (Ok(tcp_socket), Some(host)) => {
283
0
                    websocket::websocket_client_handshake(websocket::Config {
284
0
                        tcp_socket,
285
0
                        host: &host,
286
0
                        url: "/",
287
0
                    })
288
0
                    .await
289
0
                    .map(TcpOrWs::Right)
290
                }
291
292
0
                (Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)),
293
0
                (Err(err), _) => Err(err),
294
            }
295
0
        };
Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef14connect_stream0Ba_
Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef14connect_stream0Ba_
296
297
0
        future::ready(Stream(with_buffers::WithBuffers::new(Box::pin(
298
0
            socket_future,
299
0
        ))))
300
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14connect_stream
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14connect_stream
301
302
0
    fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
303
0
        panic!()
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19connect_multistream
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19connect_multistream
304
    }
305
306
    fn open_out_substream(&self, c: &mut Self::MultiStream) {
307
        // This function can only be called with so-called "multi-stream" connections. We never
308
        // open such connection.
309
        match *c {}
310
    }
311
312
    fn next_substream(&self, c: &'_ mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
313
        // This function can only be called with so-called "multi-stream" connections. We never
314
        // open such connection.
315
        match *c {}
316
    }
317
318
0
    fn read_write_access<'a>(
319
0
        &self,
320
0
        stream: Pin<&'a mut Self::Stream>,
321
0
    ) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
322
0
        let stream = stream.project();
323
0
        stream.0.read_write_access(Instant::now())
324
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17read_write_access
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17read_write_access
325
326
0
    fn wait_read_write_again<'a>(
327
0
        &self,
328
0
        stream: Pin<&'a mut Self::Stream>,
329
0
    ) -> Self::StreamUpdateFuture<'a> {
330
0
        let stream = stream.project();
331
0
        Box::pin(stream.0.wait_read_write_again(|when| async move {
332
0
            smol::Timer::at(when).await;
333
0
        }))
Unexecuted instantiation: _RNCNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB8_15DefaultPlatformENtBa_11PlatformRef21wait_read_write_again00Bc_
Unexecuted instantiation: _RNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB8_15DefaultPlatformENtBa_11PlatformRef21wait_read_write_again00Bc_
Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef21wait_read_write_again0Ba_
Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef21wait_read_write_again0Ba_
334
0
    }
Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef21wait_read_write_again
Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef21wait_read_write_again
335
}
336
337
impl Drop for DefaultPlatform {
338
1
    fn drop(&mut self) {
339
1
        self.shutdown_notify.notify(usize::MAX);
340
1
    }
_RNvXs0_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB5_15DefaultPlatformNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
Line
Count
Source
338
1
    fn drop(&mut self) {
339
1
        self.shutdown_notify.notify(usize::MAX);
340
1
    }
Unexecuted instantiation: _RNvXs0_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB5_15DefaultPlatformNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop
341
}
342
343
/// Implementation detail of [`DefaultPlatform`].
344
0
#[pin_project::pin_project]
Unexecuted instantiation: _RNvMNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB4_6Stream7project
Unexecuted instantiation: _RNvMNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB4_6Stream11project_ref
Unexecuted instantiation: _RNvNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__24___assert_not_repr_packed
Unexecuted instantiation: _RNvXs3_NvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB7_6StreamNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop
Unexecuted instantiation: _RNvMNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB4_6Stream11project_ref
Unexecuted instantiation: _RNvNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__24___assert_not_repr_packed
Unexecuted instantiation: _RNvXs3_NvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB7_6StreamNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop
Unexecuted instantiation: _RNvMNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB4_6Stream7project
345
pub struct Stream(
346
    #[pin]
347
    with_buffers::WithBuffers<
348
        future::BoxFuture<'static, Result<TcpOrWs, io::Error>>,
349
        TcpOrWs,
350
        Instant,
351
    >,
352
);
353
354
type TcpOrWs = future::Either<smol::net::TcpStream, websocket::Connection<smol::net::TcpStream>>;
355
356
#[cfg(test)]
357
mod tests {
358
    use super::{DefaultPlatform, PlatformRef as _};
359
360
    #[test]
361
1
    fn tasks_run_indefinitely() {
362
1
        let platform_destroyed = event_listener::Event::new();
363
1
        let (tx, mut rx) = futures_channel::oneshot::channel();
364
1
365
1
        {
366
1
            let platform = DefaultPlatform::new("".to_string(), "".to_string());
367
1
            let when_platform_destroyed = platform_destroyed.listen();
368
1
            platform.spawn_task("".into(), async move {
369
1
                when_platform_destroyed.
await0
;
370
1
                tx.send(()).unwrap();
371
1
            })
372
1
        }
373
1
374
1
        // The platform is destroyed, but the task must still be running.
375
1
        assert!(
matches!0
(rx.try_recv(), Ok(None)));
376
1
        platform_destroyed.notify(usize::MAX);
377
1
        assert!(
matches!0
(smol::block_on(rx), Ok(())));
378
1
    }
379
}