smoldot_light/platform/
default.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18#![cfg(feature = "std")]
19
20//! Implementation of the [`PlatformRef`] trait that leverages the operating system.
21//!
22//! This module contains the [`DefaultPlatform`] struct, which implements [`PlatformRef`].
23//!
24//! The [`DefaultPlatform`] delegates the logging to the `log` crate. In order to see log
25//! messages, you should register as "logger" as documented by the `log` crate.
26//! See <https://docs.rs/log>.
27//!
28//! # Example
29//!
30//! ```rust
31//! use smoldot_light::{Client, platform::DefaultPlatform};
32//! env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
33//! let client = Client::new(DefaultPlatform::new(env!("CARGO_PKG_NAME").into(), env!("CARGO_PKG_VERSION").into()));
34//! # let _: Client<_, ()> = client;  // Used in this example to infer the generic parameters of the Client
35//! ```
36//!
37
38use super::{
39    Address, ConnectionType, IpAddr, LogLevel, MultiStreamAddress, MultiStreamWebRtcConnection,
40    PlatformRef, SubstreamDirection, with_buffers,
41};
42
43use alloc::{borrow::Cow, sync::Arc};
44use core::{
45    fmt::{self, Write as _},
46    panic,
47    pin::Pin,
48    str,
49    time::Duration,
50};
51use futures_util::{FutureExt as _, future};
52use smoldot::libp2p::websocket;
53use std::{
54    io,
55    net::SocketAddr,
56    thread,
57    time::{Instant, UNIX_EPOCH},
58};
59
60/// Implementation of the [`PlatformRef`] trait that leverages the operating system.
61pub struct DefaultPlatform {
62    client_name: String,
63    client_version: String,
64    tasks_executor: Arc<smol::Executor<'static>>,
65    shutdown_notify: event_listener::Event,
66}
67
68impl DefaultPlatform {
69    /// Creates a new [`DefaultPlatform`].
70    ///
71    /// This function spawns threads in order to execute the background tasks that will later be
72    /// spawned.
73    ///
74    /// Must be passed as "client name" and "client version" that are used in various places
75    /// such as to answer some JSON-RPC requests. Passing `env!("CARGO_PKG_NAME")` and
76    /// `env!("CARGO_PKG_VERSION")` is typically very reasonable.
77    ///
78    /// # Panic
79    ///
80    /// Panics if it wasn't possible to spawn background threads.
81    ///
82    pub fn new(client_name: String, client_version: String) -> Arc<Self> {
83        let tasks_executor = Arc::new(smol::Executor::new());
84        let shutdown_notify = event_listener::Event::new();
85
86        for n in 0..thread::available_parallelism()
87            .map(|n| n.get())
88            .unwrap_or(4)
89        {
90            // Note that `listen()` must be called here (and not in the thread being spawned), as
91            // it might be notified as soon as `DefaultPlatform::new` returns.
92            let on_shutdown = shutdown_notify.listen();
93            let tasks_executor = tasks_executor.clone();
94
95            let spawn_result = thread::Builder::new()
96                .name(format!("smoldot-light-{}", n))
97                .spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
98
99            if let Err(err) = spawn_result {
100                panic!("Failed to spawn execution thread: {err}");
101            }
102        }
103
104        Arc::new(DefaultPlatform {
105            client_name,
106            client_version,
107            tasks_executor,
108            shutdown_notify,
109        })
110    }
111}
112
113impl PlatformRef for Arc<DefaultPlatform> {
114    type Delay = futures_util::future::Map<smol::Timer, fn(Instant) -> ()>;
115    type Instant = Instant;
116    type MultiStream = std::convert::Infallible; // TODO: replace with `!` once stable: https://github.com/rust-lang/rust/issues/35121
117    type Stream = Stream;
118    type StreamConnectFuture = future::Ready<Self::Stream>;
119    type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>;
120    type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>;
121    type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
122    type StreamErrorRef<'a> = &'a io::Error;
123    type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
124
125    fn now_from_unix_epoch(&self) -> Duration {
126        // Intentionally panic if the time is configured earlier than the UNIX EPOCH.
127        UNIX_EPOCH.elapsed().unwrap()
128    }
129
130    fn now(&self) -> Self::Instant {
131        Instant::now()
132    }
133
134    fn fill_random_bytes(&self, buffer: &mut [u8]) {
135        rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
136    }
137
138    fn sleep(&self, duration: Duration) -> Self::Delay {
139        smol::Timer::after(duration).map(|_| ())
140    }
141
142    fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
143        smol::Timer::at(when).map(|_| ())
144    }
145
146    fn spawn_task(&self, _task_name: Cow<str>, task: impl Future<Output = ()> + Send + 'static) {
147        // In order to make sure that the execution threads don't stop if there are still
148        // tasks to execute, we hold a copy of the `Arc<DefaultPlatform>` inside of the task until
149        // it is finished.
150        let _dummy_keep_alive = self.clone();
151        self.tasks_executor
152            .spawn(
153                panic::AssertUnwindSafe(async move {
154                    task.await;
155                    drop(_dummy_keep_alive);
156                })
157                .catch_unwind(),
158            )
159            .detach();
160    }
161
162    fn log<'a>(
163        &self,
164        log_level: LogLevel,
165        log_target: &'a str,
166        message: &'a str,
167        key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
168    ) {
169        // Note that this conversion is most likely completely optimized out by the compiler due
170        // to log levels having the same numerical values.
171        let log_level = match log_level {
172            LogLevel::Error => log::Level::Error,
173            LogLevel::Warn => log::Level::Warn,
174            LogLevel::Info => log::Level::Info,
175            LogLevel::Debug => log::Level::Debug,
176            LogLevel::Trace => log::Level::Trace,
177        };
178
179        let mut message_build = String::with_capacity(128);
180        message_build.push_str(message);
181        let mut first = true;
182        for (key, value) in key_values {
183            if first {
184                let _ = write!(message_build, "; ");
185                first = false;
186            } else {
187                let _ = write!(message_build, ", ");
188            }
189            let _ = write!(message_build, "{}={}", key, value);
190        }
191
192        log::logger().log(
193            &log::RecordBuilder::new()
194                .level(log_level)
195                .target(log_target)
196                .args(format_args!("{}", message_build))
197                .build(),
198        )
199    }
200
201    fn client_name(&self) -> Cow<str> {
202        Cow::Borrowed(&self.client_name)
203    }
204
205    fn client_version(&self) -> Cow<str> {
206        Cow::Borrowed(&self.client_version)
207    }
208
209    fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
210        // TODO: support WebSocket secure
211        matches!(
212            connection_type,
213            ConnectionType::TcpIpv4
214                | ConnectionType::TcpIpv6
215                | ConnectionType::TcpDns
216                | ConnectionType::WebSocketIpv4 { .. }
217                | ConnectionType::WebSocketIpv6 { .. }
218                | ConnectionType::WebSocketDns { secure: false, .. }
219        )
220    }
221
222    fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
223        let (tcp_socket_addr, host_if_websocket): (
224            either::Either<SocketAddr, (String, u16)>,
225            Option<String>,
226        ) = match multiaddr {
227            Address::TcpDns { hostname, port } => {
228                (either::Right((hostname.to_string(), port)), None)
229            }
230            Address::TcpIp {
231                ip: IpAddr::V4(ip),
232                port,
233            } => (either::Left(SocketAddr::from((ip, port))), None),
234            Address::TcpIp {
235                ip: IpAddr::V6(ip),
236                port,
237            } => (either::Left(SocketAddr::from((ip, port))), None),
238            Address::WebSocketDns {
239                hostname,
240                port,
241                secure: false,
242            } => (
243                either::Right((hostname.to_string(), port)),
244                Some(format!("{}:{}", hostname, port)),
245            ),
246            Address::WebSocketIp {
247                ip: IpAddr::V4(ip),
248                port,
249            } => {
250                let addr = SocketAddr::from((ip, port));
251                (either::Left(addr), Some(addr.to_string()))
252            }
253            Address::WebSocketIp {
254                ip: IpAddr::V6(ip),
255                port,
256            } => {
257                let addr = SocketAddr::from((ip, port));
258                (either::Left(addr), Some(addr.to_string()))
259            }
260
261            // The API user of the `PlatformRef` trait is never supposed to open connections of
262            // a type that isn't supported.
263            _ => unreachable!(),
264        };
265
266        let socket_future = async {
267            let tcp_socket = match tcp_socket_addr {
268                either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await,
269                either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await,
270            };
271
272            if let Ok(tcp_socket) = &tcp_socket {
273                let _ = tcp_socket.set_nodelay(true);
274            }
275
276            match (tcp_socket, host_if_websocket) {
277                (Ok(tcp_socket), Some(host)) => {
278                    websocket::websocket_client_handshake(websocket::Config {
279                        tcp_socket,
280                        host: &host,
281                        url: "/",
282                    })
283                    .await
284                    .map(TcpOrWs::Right)
285                }
286
287                (Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)),
288                (Err(err), _) => Err(err),
289            }
290        };
291
292        future::ready(Stream(with_buffers::WithBuffers::new(Box::pin(
293            socket_future,
294        ))))
295    }
296
297    fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
298        panic!()
299    }
300
301    fn open_out_substream(&self, c: &mut Self::MultiStream) {
302        // This function can only be called with so-called "multi-stream" connections. We never
303        // open such connection.
304        match *c {}
305    }
306
307    fn next_substream(&self, c: &mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
308        // This function can only be called with so-called "multi-stream" connections. We never
309        // open such connection.
310        match *c {}
311    }
312
313    fn read_write_access<'a>(
314        &self,
315        stream: Pin<&'a mut Self::Stream>,
316    ) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
317        let stream = stream.project();
318        stream.0.read_write_access(Instant::now())
319    }
320
321    fn wait_read_write_again<'a>(
322        &self,
323        stream: Pin<&'a mut Self::Stream>,
324    ) -> Self::StreamUpdateFuture<'a> {
325        let stream = stream.project();
326        Box::pin(stream.0.wait_read_write_again(|when| async move {
327            smol::Timer::at(when).await;
328        }))
329    }
330}
331
332impl Drop for DefaultPlatform {
333    fn drop(&mut self) {
334        self.shutdown_notify.notify(usize::MAX);
335    }
336}
337
338/// Implementation detail of [`DefaultPlatform`].
339#[pin_project::pin_project]
340pub struct Stream(
341    #[pin]
342    with_buffers::WithBuffers<
343        future::BoxFuture<'static, Result<TcpOrWs, io::Error>>,
344        TcpOrWs,
345        Instant,
346    >,
347);
348
349type TcpOrWs = future::Either<smol::net::TcpStream, websocket::Connection<smol::net::TcpStream>>;
350
351#[cfg(test)]
352mod tests {
353    use super::{DefaultPlatform, PlatformRef as _};
354
355    #[test]
356    fn tasks_run_indefinitely() {
357        let platform_destroyed = event_listener::Event::new();
358        let (tx, mut rx) = futures_channel::oneshot::channel();
359
360        {
361            let platform = DefaultPlatform::new("".to_string(), "".to_string());
362            let when_platform_destroyed = platform_destroyed.listen();
363            platform.spawn_task("".into(), async move {
364                when_platform_destroyed.await;
365                tx.send(()).unwrap();
366            })
367        }
368
369        // The platform is destroyed, but the task must still be running.
370        assert!(matches!(rx.try_recv(), Ok(None)));
371        platform_destroyed.notify(usize::MAX);
372        assert!(matches!(smol::block_on(rx), Ok(())));
373    }
374}