1#![cfg(feature = "std")]
19
20use super::{
39 Address, ConnectionType, IpAddr, LogLevel, MultiStreamAddress, MultiStreamWebRtcConnection,
40 PlatformRef, SubstreamDirection, with_buffers,
41};
42
43use alloc::{borrow::Cow, sync::Arc};
44use core::{
45 fmt::{self, Write as _},
46 panic,
47 pin::Pin,
48 str,
49 time::Duration,
50};
51use futures_util::{FutureExt as _, future};
52use smoldot::libp2p::websocket;
53use std::{
54 io,
55 net::SocketAddr,
56 thread,
57 time::{Instant, UNIX_EPOCH},
58};
59
60pub struct DefaultPlatform {
62 client_name: String,
63 client_version: String,
64 tasks_executor: Arc<smol::Executor<'static>>,
65 shutdown_notify: event_listener::Event,
66}
67
68impl DefaultPlatform {
69 pub fn new(client_name: String, client_version: String) -> Arc<Self> {
83 let tasks_executor = Arc::new(smol::Executor::new());
84 let shutdown_notify = event_listener::Event::new();
85
86 for n in 0..thread::available_parallelism()
87 .map(|n| n.get())
88 .unwrap_or(4)
89 {
90 let on_shutdown = shutdown_notify.listen();
93 let tasks_executor = tasks_executor.clone();
94
95 let spawn_result = thread::Builder::new()
96 .name(format!("smoldot-light-{}", n))
97 .spawn(move || smol::block_on(tasks_executor.run(on_shutdown)));
98
99 if let Err(err) = spawn_result {
100 panic!("Failed to spawn execution thread: {err}");
101 }
102 }
103
104 Arc::new(DefaultPlatform {
105 client_name,
106 client_version,
107 tasks_executor,
108 shutdown_notify,
109 })
110 }
111}
112
113impl PlatformRef for Arc<DefaultPlatform> {
114 type Delay = futures_util::future::Map<smol::Timer, fn(Instant) -> ()>;
115 type Instant = Instant;
116 type MultiStream = std::convert::Infallible; type Stream = Stream;
118 type StreamConnectFuture = future::Ready<Self::Stream>;
119 type MultiStreamConnectFuture = future::Pending<MultiStreamWebRtcConnection<Self::MultiStream>>;
120 type ReadWriteAccess<'a> = with_buffers::ReadWriteAccess<'a, Instant>;
121 type StreamUpdateFuture<'a> = future::BoxFuture<'a, ()>;
122 type StreamErrorRef<'a> = &'a io::Error;
123 type NextSubstreamFuture<'a> = future::Pending<Option<(Self::Stream, SubstreamDirection)>>;
124
125 fn now_from_unix_epoch(&self) -> Duration {
126 UNIX_EPOCH.elapsed().unwrap()
128 }
129
130 fn now(&self) -> Self::Instant {
131 Instant::now()
132 }
133
134 fn fill_random_bytes(&self, buffer: &mut [u8]) {
135 rand::RngCore::fill_bytes(&mut rand::thread_rng(), buffer);
136 }
137
138 fn sleep(&self, duration: Duration) -> Self::Delay {
139 smol::Timer::after(duration).map(|_| ())
140 }
141
142 fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
143 smol::Timer::at(when).map(|_| ())
144 }
145
146 fn spawn_task(&self, _task_name: Cow<str>, task: impl Future<Output = ()> + Send + 'static) {
147 let _dummy_keep_alive = self.clone();
151 self.tasks_executor
152 .spawn(
153 panic::AssertUnwindSafe(async move {
154 task.await;
155 drop(_dummy_keep_alive);
156 })
157 .catch_unwind(),
158 )
159 .detach();
160 }
161
162 fn log<'a>(
163 &self,
164 log_level: LogLevel,
165 log_target: &'a str,
166 message: &'a str,
167 key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
168 ) {
169 let log_level = match log_level {
172 LogLevel::Error => log::Level::Error,
173 LogLevel::Warn => log::Level::Warn,
174 LogLevel::Info => log::Level::Info,
175 LogLevel::Debug => log::Level::Debug,
176 LogLevel::Trace => log::Level::Trace,
177 };
178
179 let mut message_build = String::with_capacity(128);
180 message_build.push_str(message);
181 let mut first = true;
182 for (key, value) in key_values {
183 if first {
184 let _ = write!(message_build, "; ");
185 first = false;
186 } else {
187 let _ = write!(message_build, ", ");
188 }
189 let _ = write!(message_build, "{}={}", key, value);
190 }
191
192 log::logger().log(
193 &log::RecordBuilder::new()
194 .level(log_level)
195 .target(log_target)
196 .args(format_args!("{}", message_build))
197 .build(),
198 )
199 }
200
201 fn client_name(&self) -> Cow<str> {
202 Cow::Borrowed(&self.client_name)
203 }
204
205 fn client_version(&self) -> Cow<str> {
206 Cow::Borrowed(&self.client_version)
207 }
208
209 fn supports_connection_type(&self, connection_type: ConnectionType) -> bool {
210 matches!(
212 connection_type,
213 ConnectionType::TcpIpv4
214 | ConnectionType::TcpIpv6
215 | ConnectionType::TcpDns
216 | ConnectionType::WebSocketIpv4 { .. }
217 | ConnectionType::WebSocketIpv6 { .. }
218 | ConnectionType::WebSocketDns { secure: false, .. }
219 )
220 }
221
222 fn connect_stream(&self, multiaddr: Address) -> Self::StreamConnectFuture {
223 let (tcp_socket_addr, host_if_websocket): (
224 either::Either<SocketAddr, (String, u16)>,
225 Option<String>,
226 ) = match multiaddr {
227 Address::TcpDns { hostname, port } => {
228 (either::Right((hostname.to_string(), port)), None)
229 }
230 Address::TcpIp {
231 ip: IpAddr::V4(ip),
232 port,
233 } => (either::Left(SocketAddr::from((ip, port))), None),
234 Address::TcpIp {
235 ip: IpAddr::V6(ip),
236 port,
237 } => (either::Left(SocketAddr::from((ip, port))), None),
238 Address::WebSocketDns {
239 hostname,
240 port,
241 secure: false,
242 } => (
243 either::Right((hostname.to_string(), port)),
244 Some(format!("{}:{}", hostname, port)),
245 ),
246 Address::WebSocketIp {
247 ip: IpAddr::V4(ip),
248 port,
249 } => {
250 let addr = SocketAddr::from((ip, port));
251 (either::Left(addr), Some(addr.to_string()))
252 }
253 Address::WebSocketIp {
254 ip: IpAddr::V6(ip),
255 port,
256 } => {
257 let addr = SocketAddr::from((ip, port));
258 (either::Left(addr), Some(addr.to_string()))
259 }
260
261 _ => unreachable!(),
264 };
265
266 let socket_future = async {
267 let tcp_socket = match tcp_socket_addr {
268 either::Left(socket_addr) => smol::net::TcpStream::connect(socket_addr).await,
269 either::Right((dns, port)) => smol::net::TcpStream::connect((&dns[..], port)).await,
270 };
271
272 if let Ok(tcp_socket) = &tcp_socket {
273 let _ = tcp_socket.set_nodelay(true);
274 }
275
276 match (tcp_socket, host_if_websocket) {
277 (Ok(tcp_socket), Some(host)) => {
278 websocket::websocket_client_handshake(websocket::Config {
279 tcp_socket,
280 host: &host,
281 url: "/",
282 })
283 .await
284 .map(TcpOrWs::Right)
285 }
286
287 (Ok(tcp_socket), None) => Ok(TcpOrWs::Left(tcp_socket)),
288 (Err(err), _) => Err(err),
289 }
290 };
291
292 future::ready(Stream(with_buffers::WithBuffers::new(Box::pin(
293 socket_future,
294 ))))
295 }
296
297 fn connect_multistream(&self, _address: MultiStreamAddress) -> Self::MultiStreamConnectFuture {
298 panic!()
299 }
300
301 fn open_out_substream(&self, c: &mut Self::MultiStream) {
302 match *c {}
305 }
306
307 fn next_substream(&self, c: &mut Self::MultiStream) -> Self::NextSubstreamFuture<'_> {
308 match *c {}
311 }
312
313 fn read_write_access<'a>(
314 &self,
315 stream: Pin<&'a mut Self::Stream>,
316 ) -> Result<Self::ReadWriteAccess<'a>, &'a io::Error> {
317 let stream = stream.project();
318 stream.0.read_write_access(Instant::now())
319 }
320
321 fn wait_read_write_again<'a>(
322 &self,
323 stream: Pin<&'a mut Self::Stream>,
324 ) -> Self::StreamUpdateFuture<'a> {
325 let stream = stream.project();
326 Box::pin(stream.0.wait_read_write_again(|when| async move {
327 smol::Timer::at(when).await;
328 }))
329 }
330}
331
332impl Drop for DefaultPlatform {
333 fn drop(&mut self) {
334 self.shutdown_notify.notify(usize::MAX);
335 }
336}
337
338#[pin_project::pin_project]
340pub struct Stream(
341 #[pin]
342 with_buffers::WithBuffers<
343 future::BoxFuture<'static, Result<TcpOrWs, io::Error>>,
344 TcpOrWs,
345 Instant,
346 >,
347);
348
349type TcpOrWs = future::Either<smol::net::TcpStream, websocket::Connection<smol::net::TcpStream>>;
350
351#[cfg(test)]
352mod tests {
353 use super::{DefaultPlatform, PlatformRef as _};
354
355 #[test]
356 fn tasks_run_indefinitely() {
357 let platform_destroyed = event_listener::Event::new();
358 let (tx, mut rx) = futures_channel::oneshot::channel();
359
360 {
361 let platform = DefaultPlatform::new("".to_string(), "".to_string());
362 let when_platform_destroyed = platform_destroyed.listen();
363 platform.spawn_task("".into(), async move {
364 when_platform_destroyed.await;
365 tx.send(()).unwrap();
366 })
367 }
368
369 assert!(matches!(rx.try_recv(), Ok(None)));
371 platform_destroyed.notify(usize::MAX);
372 assert!(matches!(smol::block_on(rx), Ok(())));
373 }
374}