/__w/smoldot/smoldot/repo/light-base/src/platform/default.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | #![cfg(feature = "std")] |
19 | | #![cfg_attr(docsrs, doc(cfg(feature = "std")))] |
20 | | |
21 | | //! Implementation of the [`PlatformRef`] trait that leverages the operating system. |
22 | | //! |
23 | | //! This module contains the [`DefaultPlatform`] struct, which implements [`PlatformRef`]. |
24 | | //! |
25 | | //! The [`DefaultPlatform`] delegates the logging to the `log` crate. In order to see log |
26 | | //! messages, you should register as "logger" as documented by the `log` crate. |
27 | | //! See <https://docs.rs/log>. |
28 | | //! |
29 | | //! # Example |
30 | | //! |
31 | | //! ```rust |
32 | | //! use smoldot_light::{Client, platform::DefaultPlatform}; |
33 | | //! env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); |
34 | | //! let client = Client::new(DefaultPlatform::new(env!("CARGO_PKG_NAME").into(), env!("CARGO_PKG_VERSION").into())); |
35 | | //! # let _: Client<_, ()> = client; // Used in this example to infer the generic parameters of the Client |
36 | | //! ``` |
37 | | //! |
38 | | |
39 | | use super::{ |
40 | | with_buffers, Address, ConnectionType, IpAddr, LogLevel, MultiStreamAddress, |
41 | | MultiStreamWebRtcConnection, PlatformRef, SubstreamDirection, |
42 | | }; |
43 | | |
44 | | use alloc::{borrow::Cow, sync::Arc}; |
45 | | use core::{ |
46 | | fmt::{self, Write as _}, |
47 | | panic, |
48 | | pin::Pin, |
49 | | str, |
50 | | time::Duration, |
51 | | }; |
52 | | use futures_util::{future, FutureExt as _}; |
53 | | use smoldot::libp2p::websocket; |
54 | | use std::{ |
55 | | io, |
56 | | net::SocketAddr, |
57 | | thread, |
58 | | time::{Instant, UNIX_EPOCH}, |
59 | | }; |
60 | | |
61 | | /// Implementation of the [`PlatformRef`] trait that leverages the operating system. |
62 | | pub struct DefaultPlatform { |
63 | | client_name: String, |
64 | | client_version: String, |
65 | | tasks_executor: Arc<smol::Executor<'static>>, |
66 | | shutdown_notify: event_listener::Event, |
67 | | } |
68 | | |
69 | | impl DefaultPlatform { |
70 | | /// Creates a new [`DefaultPlatform`]. |
71 | | /// |
72 | | /// This function spawns threads in order to execute the background tasks that will later be |
73 | | /// spawned. |
74 | | /// |
75 | | /// Must be passed as "client name" and "client version" that are used in various places |
76 | | /// such as to answer some JSON-RPC requests. Passing `env!("CARGO_PKG_NAME")` and |
77 | | /// `env!("CARGO_PKG_VERSION")` is typically very reasonable. |
78 | | /// |
79 | | /// # Panic |
80 | | /// |
81 | | /// Panics if it wasn't possible to spawn background threads. |
82 | | /// |
83 | 1 | pub fn new(client_name: String, client_version: String) -> Arc<Self> { |
84 | 1 | let tasks_executor = Arc::new(smol::Executor::new()); |
85 | 1 | let shutdown_notify = event_listener::Event::new(); |
86 | | |
87 | 4 | for n in 0..thread::available_parallelism() |
88 | 1 | .map(|n| n.get()) _RNCNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3new0B8_ Line | Count | Source | 88 | 1 | .map(|n| n.get()) |
Unexecuted instantiation: _RNCNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3new0B8_ |
89 | 1 | .unwrap_or(4) |
90 | | { |
91 | | // Note that `listen()` must be called here (and not in the thread being spawned), as |
92 | | // it might be notified as soon as `DefaultPlatform::new` returns. |
93 | 4 | let on_shutdown = shutdown_notify.listen(); |
94 | 4 | let tasks_executor = tasks_executor.clone(); |
95 | 4 | |
96 | 4 | let spawn_result = thread::Builder::new() |
97 | 4 | .name(format!("smoldot-light-{}", n)) |
98 | 4 | .spawn(move || smol::block_on(tasks_executor.run(on_shutdown))); _RNCNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3news_0B8_ Line | Count | Source | 98 | 4 | .spawn(move || smol::block_on(tasks_executor.run(on_shutdown))); |
Unexecuted instantiation: _RNCNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB4_15DefaultPlatform3news_0B8_ |
99 | | |
100 | 4 | if let Err(err0 ) = spawn_result { |
101 | 0 | panic!("Failed to spawn execution thread: {err}"); |
102 | 4 | } |
103 | | } |
104 | | |
105 | 1 | Arc::new(DefaultPlatform { |
106 | 1 | client_name, |
107 | 1 | client_version, |
108 | 1 | tasks_executor, |
109 | 1 | shutdown_notify, |
110 | 1 | }) |
111 | 1 | } _RNvMNtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB2_15DefaultPlatform3new Line | Count | Source | 83 | 1 | pub fn new(client_name: String, client_version: String) -> Arc<Self> { | 84 | 1 | let tasks_executor = Arc::new(smol::Executor::new()); | 85 | 1 | let shutdown_notify = event_listener::Event::new(); | 86 | | | 87 | 4 | for n in 0..thread::available_parallelism() | 88 | 1 | .map(|n| n.get()) | 89 | 1 | .unwrap_or(4) | 90 | | { | 91 | | // Note that `listen()` must be called here (and not in the thread being spawned), as | 92 | | // it might be notified as soon as `DefaultPlatform::new` returns. | 93 | 4 | let on_shutdown = shutdown_notify.listen(); | 94 | 4 | let tasks_executor = tasks_executor.clone(); | 95 | 4 | | 96 | 4 | let spawn_result = thread::Builder::new() | 97 | 4 | .name(format!("smoldot-light-{}", n)) | 98 | 4 | .spawn(move || smol::block_on(tasks_executor.run(on_shutdown))); | 99 | | | 100 | 4 | if let Err(err0 ) = spawn_result { | 101 | 0 | panic!("Failed to spawn execution thread: {err}"); | 102 | 4 | } | 103 | | } | 104 | | | 105 | 1 | Arc::new(DefaultPlatform { | 106 | 1 | client_name, | 107 | 1 | client_version, | 108 | 1 | tasks_executor, | 109 | 1 | shutdown_notify, | 110 | 1 | }) | 111 | 1 | } |
Unexecuted instantiation: _RNvMNtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB2_15DefaultPlatform3new |
112 | | } |
113 | | |
114 | | impl PlatformRef for Arc<DefaultPlatform> { |
115 | | type Delay = futures_util::future::Map<smol::Timer, fn(Instant) -> ()>; |
116 | | type Instant = Instant; |
117 | | type MultiStream = std::convert::Infallible; // TODO: replace with `!` once stable: https://github.com/rust-lang/rust/issues/35121 |
118 | | type Stream = Stream; |
119 | | type StreamConnectFuture = future::Ready<Self::Stream>; |
120 | | type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>; |
121 | | type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>; |
122 | | type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>; |
123 | | type StreamErrorRef<'a> = &'a io::Error; |
124 | | type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>; |
125 | | |
126 | 0 | fn now_from_unix_epoch(&self) -> Duration { |
127 | 0 | // Intentionally panic if the time is configured earlier than the UNIX EPOCH. |
128 | 0 | UNIX_EPOCH.elapsed().unwrap() |
129 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19now_from_unix_epoch Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19now_from_unix_epoch |
130 | | |
131 | 0 | fn now(&self) -> Self::Instant { |
132 | 0 | Instant::now() |
133 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef3now Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef3now |
134 | | |
135 | 0 | fn fill_random_bytes(&self, buffer: &mut [u8]) { |
136 | 0 | rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer); |
137 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17fill_random_bytes Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17fill_random_bytes |
138 | | |
139 | 0 | fn sleep(&self, duration: Duration) -> Self::Delay { |
140 | 0 | smol::Timer::after(duration).map(|_| ()) Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef5sleep0Ba_ Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef5sleep0Ba_ |
141 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef5sleep Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef5sleep |
142 | | |
143 | 0 | fn sleep_until(&self, when: Self::Instant) -> Self::Delay { |
144 | 0 | smol::Timer::at(when).map(|_| ()) Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef11sleep_until0Ba_ Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef11sleep_until0Ba_ |
145 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11sleep_until Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11sleep_until |
146 | | |
147 | 1 | fn spawn_task( |
148 | 1 | &self, |
149 | 1 | _task_name: Cow<str>, |
150 | 1 | task: impl future::Future<Output = ()> + Send + 'static, |
151 | 1 | ) { |
152 | 1 | // In order to make sure that the execution threads don't stop if there are still |
153 | 1 | // tasks to execute, we hold a copy of the `Arc<DefaultPlatform>` inside of the task until |
154 | 1 | // it is finished. |
155 | 1 | let _dummy_keep_alive = self.clone(); |
156 | 1 | self.tasks_executor |
157 | 1 | .spawn( |
158 | 1 | panic::AssertUnwindSafe(async move { |
159 | 1 | task.await0 ; |
160 | 1 | drop(_dummy_keep_alive); |
161 | 1 | }) _RNCINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB7_15DefaultPlatformENtB9_11PlatformRef10spawn_taskNCNvNtB7_5testss_22tasks_run_indefinitely0E0Bb_ Line | Count | Source | 158 | 1 | panic::AssertUnwindSafe(async move { | 159 | 1 | task.await0 ; | 160 | 1 | drop(_dummy_keep_alive); | 161 | 1 | }) |
Unexecuted instantiation: _RNCINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB7_15DefaultPlatformENtB9_11PlatformRef10spawn_taskpE0Bb_ |
162 | 1 | .catch_unwind(), |
163 | 1 | ) |
164 | 1 | .detach(); |
165 | 1 | } _RINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef10spawn_taskNCNvNtB5_5testss_22tasks_run_indefinitely0EB9_ Line | Count | Source | 147 | 1 | fn spawn_task( | 148 | 1 | &self, | 149 | 1 | _task_name: Cow<str>, | 150 | 1 | task: impl future::Future<Output = ()> + Send + 'static, | 151 | 1 | ) { | 152 | 1 | // In order to make sure that the execution threads don't stop if there are still | 153 | 1 | // tasks to execute, we hold a copy of the `Arc<DefaultPlatform>` inside of the task until | 154 | 1 | // it is finished. | 155 | 1 | let _dummy_keep_alive = self.clone(); | 156 | 1 | self.tasks_executor | 157 | 1 | .spawn( | 158 | 1 | panic::AssertUnwindSafe(async move { | 159 | | task.await; | 160 | | drop(_dummy_keep_alive); | 161 | 1 | }) | 162 | 1 | .catch_unwind(), | 163 | 1 | ) | 164 | 1 | .detach(); | 165 | 1 | } |
Unexecuted instantiation: _RINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef10spawn_taskpEB9_ |
166 | | |
167 | 0 | fn log<'a>( |
168 | 0 | &self, |
169 | 0 | log_level: LogLevel, |
170 | 0 | log_target: &'a str, |
171 | 0 | message: &'a str, |
172 | 0 | key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>, |
173 | 0 | ) { |
174 | | // Note that this conversion is most likely completely optimized out by the compiler due |
175 | | // to log levels having the same numerical values. |
176 | 0 | let log_level = match log_level { |
177 | 0 | LogLevel::Error => log::Level::Error, |
178 | 0 | LogLevel::Warn => log::Level::Warn, |
179 | 0 | LogLevel::Info => log::Level::Info, |
180 | 0 | LogLevel::Debug => log::Level::Debug, |
181 | 0 | LogLevel::Trace => log::Level::Trace, |
182 | | }; |
183 | | |
184 | 0 | let mut message_build = String::with_capacity(128); |
185 | 0 | message_build.push_str(message); |
186 | 0 | let mut first = true; |
187 | 0 | for (key, value) in key_values { |
188 | 0 | if first { |
189 | 0 | let _ = write!(message_build, "; "); |
190 | 0 | first = false; |
191 | 0 | } else { |
192 | 0 | let _ = write!(message_build, ", "); |
193 | 0 | } |
194 | 0 | let _ = write!(message_build, "{}={}", key, value); |
195 | | } |
196 | | |
197 | 0 | log::logger().log( |
198 | 0 | &log::RecordBuilder::new() |
199 | 0 | .level(log_level) |
200 | 0 | .target(log_target) |
201 | 0 | .args(format_args!("{}", message_build)) |
202 | 0 | .build(), |
203 | 0 | ) |
204 | 0 | } Unexecuted instantiation: _RINvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef3logpEB9_ Unexecuted instantiation: _RINvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB5_15DefaultPlatformENtB7_11PlatformRef3logpEB9_ |
205 | | |
206 | 0 | fn client_name(&self) -> Cow<str> { |
207 | 0 | Cow::Borrowed(&self.client_name) |
208 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11client_name Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef11client_name |
209 | | |
210 | 0 | fn client_version(&self) -> Cow<str> { |
211 | 0 | Cow::Borrowed(&self.client_version) |
212 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14client_version Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14client_version |
213 | | |
214 | 0 | fn supports_connection_type(&self, connection_type: ConnectionType) -> bool { |
215 | | // TODO: support WebSocket secure |
216 | 0 | matches!( |
217 | 0 | connection_type, |
218 | | ConnectionType::TcpIpv4 |
219 | | | ConnectionType::TcpIpv6 |
220 | | | ConnectionType::TcpDns |
221 | | | ConnectionType::WebSocketIpv4 { .. } |
222 | | | ConnectionType::WebSocketIpv6 { .. } |
223 | | | ConnectionType::WebSocketDns { secure: false, .. } |
224 | | ) |
225 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef24supports_connection_type Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef24supports_connection_type |
226 | | |
227 | 0 | fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture { |
228 | 0 | let (tcp_socket_addr, host_if_websocket): ( |
229 | 0 | either::Either<SocketAddr, (String, u16)>, |
230 | 0 | Option<String>, |
231 | 0 | ) = match multiaddr { |
232 | 0 | Address::TcpDns { hostname, port } => { |
233 | 0 | (either::Right((hostname.to_string(), port)), None) |
234 | | } |
235 | | Address::TcpIp { |
236 | 0 | ip: IpAddr::V4(ip), |
237 | 0 | port, |
238 | 0 | } => (either::Left(SocketAddr::from((ip, port))), None), |
239 | | Address::TcpIp { |
240 | 0 | ip: IpAddr::V6(ip), |
241 | 0 | port, |
242 | 0 | } => (either::Left(SocketAddr::from((ip, port))), None), |
243 | | Address::WebSocketDns { |
244 | 0 | hostname, |
245 | 0 | port, |
246 | 0 | secure: false, |
247 | 0 | } => ( |
248 | 0 | either::Right((hostname.to_string(), port)), |
249 | 0 | Some(format!("{}:{}", hostname, port)), |
250 | 0 | ), |
251 | | Address::WebSocketIp { |
252 | 0 | ip: IpAddr::V4(ip), |
253 | 0 | port, |
254 | 0 | } => { |
255 | 0 | let addr = SocketAddr::from((ip, port)); |
256 | 0 | (either::Left(addr), Some(addr.to_string())) |
257 | | } |
258 | | Address::WebSocketIp { |
259 | 0 | ip: IpAddr::V6(ip), |
260 | 0 | port, |
261 | 0 | } => { |
262 | 0 | let addr = SocketAddr::from((ip, port)); |
263 | 0 | (either::Left(addr), Some(addr.to_string())) |
264 | | } |
265 | | |
266 | | // The API user of the `PlatformRef` trait is never supposed to open connections of |
267 | | // a type that isn't supported. |
268 | 0 | _ => unreachable!(), |
269 | | }; |
270 | | |
271 | 0 | let socket_future = async { |
272 | 0 | let tcp_socket = match tcp_socket_addr { |
273 | 0 | either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await, |
274 | 0 | either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await, |
275 | | }; |
276 | | |
277 | 0 | if let Ok(tcp_socket) = &tcp_socket { |
278 | 0 | let _ = tcp_socket.set_nodelay(true); |
279 | 0 | } |
280 | | |
281 | 0 | match (tcp_socket, host_if_websocket) { |
282 | 0 | (Ok(tcp_socket), Some(host)) => { |
283 | 0 | websocket::websocket_client_handshake(websocket::Config { |
284 | 0 | tcp_socket, |
285 | 0 | host: &host, |
286 | 0 | url: "/", |
287 | 0 | }) |
288 | 0 | .await |
289 | 0 | .map(TcpOrWs::Right) |
290 | | } |
291 | | |
292 | 0 | (Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)), |
293 | 0 | (Err(err), _) => Err(err), |
294 | | } |
295 | 0 | }; Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef14connect_stream0Ba_ Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef14connect_stream0Ba_ |
296 | | |
297 | 0 | future::ready(Stream(with_buffers::WithBuffers::new(Box::pin( |
298 | 0 | socket_future, |
299 | 0 | )))) |
300 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14connect_stream Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef14connect_stream |
301 | | |
302 | 0 | fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture { |
303 | 0 | panic!() Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19connect_multistream Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef19connect_multistream |
304 | | } |
305 | | |
306 | | fn open_out_substream(&self, c: &mut Self::MultiStream) { |
307 | | // This function can only be called with so-called "multi-stream" connections. We never |
308 | | // open such connection. |
309 | | match *c {} |
310 | | } |
311 | | |
312 | | fn next_substream(&self, c: &'_ mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> { |
313 | | // This function can only be called with so-called "multi-stream" connections. We never |
314 | | // open such connection. |
315 | | match *c {} |
316 | | } |
317 | | |
318 | 0 | fn read_write_access<'a>( |
319 | 0 | &self, |
320 | 0 | stream: Pin<&'a mut Self::Stream>, |
321 | 0 | ) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> { |
322 | 0 | let stream = stream.project(); |
323 | 0 | stream.0.read_write_access(Instant::now()) |
324 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17read_write_access Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef17read_write_access |
325 | | |
326 | 0 | fn wait_read_write_again<'a>( |
327 | 0 | &self, |
328 | 0 | stream: Pin<&'a mut Self::Stream>, |
329 | 0 | ) -> Self::StreamUpdateFuture<'a> { |
330 | 0 | let stream = stream.project(); |
331 | 0 | Box::pin(stream.0.wait_read_write_again(|when| async move { |
332 | 0 | smol::Timer::at(when).await; |
333 | 0 | })) Unexecuted instantiation: _RNCNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB8_15DefaultPlatformENtBa_11PlatformRef21wait_read_write_again00Bc_ Unexecuted instantiation: _RNCNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB8_15DefaultPlatformENtBa_11PlatformRef21wait_read_write_again00Bc_ Unexecuted instantiation: _RNCNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef21wait_read_write_again0Ba_ Unexecuted instantiation: _RNCNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB6_15DefaultPlatformENtB8_11PlatformRef21wait_read_write_again0Ba_ |
334 | 0 | } Unexecuted instantiation: _RNvXs_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef21wait_read_write_again Unexecuted instantiation: _RNvXs_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultINtNtCsdZExvAaxgia_5alloc4sync3ArcNtB4_15DefaultPlatformENtB6_11PlatformRef21wait_read_write_again |
335 | | } |
336 | | |
337 | | impl Drop for DefaultPlatform { |
338 | 1 | fn drop(&mut self) { |
339 | 1 | self.shutdown_notify.notify(usize::MAX); |
340 | 1 | } _RNvXs0_NtNtCsiGub1lfKphe_13smoldot_light8platform7defaultNtB5_15DefaultPlatformNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop Line | Count | Source | 338 | 1 | fn drop(&mut self) { | 339 | 1 | self.shutdown_notify.notify(usize::MAX); | 340 | 1 | } |
Unexecuted instantiation: _RNvXs0_NtNtCsih6EgvAwZF2_13smoldot_light8platform7defaultNtB5_15DefaultPlatformNtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4drop |
341 | | } |
342 | | |
343 | | /// Implementation detail of [`DefaultPlatform`]. |
344 | 0 | #[pin_project::pin_project] Unexecuted instantiation: _RNvMNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB4_6Stream7project Unexecuted instantiation: _RNvMNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB4_6Stream11project_ref Unexecuted instantiation: _RNvNvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__24___assert_not_repr_packed Unexecuted instantiation: _RNvXs3_NvNtNtCsiGub1lfKphe_13smoldot_light8platform7default1__NtB7_6StreamNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop Unexecuted instantiation: _RNvMNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB4_6Stream11project_ref Unexecuted instantiation: _RNvNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__24___assert_not_repr_packed Unexecuted instantiation: _RNvXs3_NvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB7_6StreamNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop Unexecuted instantiation: _RNvMNvNtNtCsih6EgvAwZF2_13smoldot_light8platform7default1__NtB4_6Stream7project |
345 | | pub struct Stream( |
346 | | #[pin] |
347 | | with_buffers::WithBuffers< |
348 | | future::BoxFuture<'static, Result<TcpOrWs, io::Error>>, |
349 | | TcpOrWs, |
350 | | Instant, |
351 | | >, |
352 | | ); |
353 | | |
354 | | type TcpOrWs = future::Either<smol::net::TcpStream, websocket::Connection<smol::net::TcpStream>>; |
355 | | |
356 | | #[cfg(test)] |
357 | | mod tests { |
358 | | use super::{DefaultPlatform, PlatformRef as _}; |
359 | | |
360 | | #[test] |
361 | 1 | fn tasks_run_indefinitely() { |
362 | 1 | let platform_destroyed = event_listener::Event::new(); |
363 | 1 | let (tx, mut rx) = futures_channel::oneshot::channel(); |
364 | 1 | |
365 | 1 | { |
366 | 1 | let platform = DefaultPlatform::new("".to_string(), "".to_string()); |
367 | 1 | let when_platform_destroyed = platform_destroyed.listen(); |
368 | 1 | platform.spawn_task("".into(), async move { |
369 | 1 | when_platform_destroyed.await0 ; |
370 | 1 | tx.send(()).unwrap(); |
371 | 1 | }) |
372 | 1 | } |
373 | 1 | |
374 | 1 | // The platform is destroyed, but the task must still be running. |
375 | 1 | assert!(matches!0 (rx.try_recv(), Ok(None))); |
376 | 1 | platform_destroyed.notify(usize::MAX); |
377 | 1 | assert!(matches!0 (smol::block_on(rx), Ok(()))); |
378 | 1 | } |
379 | | } |