Coverage Report

Created: 2025-07-01 09:16

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/__w/smoldot/smoldot/repo/wasm-node/rust/src/platform.rs
Line
Count
Source
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{bindings, timers::Delay};
19
20
use futures_lite::future::FutureExt as _;
21
22
use smoldot_light::platform::{SubstreamDirection, read_write};
23
24
use alloc::{
25
    borrow::{Cow, ToOwned as _},
26
    boxed::Box,
27
    collections::{BTreeMap, VecDeque},
28
    format,
29
    string::{String, ToString as _},
30
    vec::Vec,
31
};
32
use async_lock::Mutex;
33
use core::{
34
    fmt::{self, Write as _},
35
    future, iter, mem,
36
    net::IpAddr,
37
    ops, pin, str,
38
    sync::atomic::{AtomicU32, AtomicU64, Ordering},
39
    task,
40
    time::Duration,
41
};
42
43
/// Total number of bytes that all the connections created through [`PlatformRef`] combined have
44
/// received.
45
pub static TOTAL_BYTES_RECEIVED: AtomicU64 = AtomicU64::new(0);
46
/// Total number of bytes that all the connections created through [`PlatformRef`] combined have
47
/// sent.
48
pub static TOTAL_BYTES_SENT: AtomicU64 = AtomicU64::new(0);
49
/// Total number of microseconds that all the tasks have spent executing. A `u64` will overflow
50
/// after 584 542 years.
51
pub static TOTAL_CPU_USAGE_US: AtomicU64 = AtomicU64::new(0);
52
53
pub(crate) const PLATFORM_REF: PlatformRef = PlatformRef {};
54
55
/// Log level above which log entries aren't emitted.
56
pub static MAX_LOG_LEVEL: AtomicU32 = AtomicU32::new(0);
57
58
#[derive(Debug, Copy, Clone)]
59
pub(crate) struct PlatformRef {}
60
61
// TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code
62
impl smoldot_light::platform::PlatformRef for PlatformRef {
63
    type Delay = Delay;
64
    type Instant = Duration;
65
    type MultiStream = MultiStreamWrapper; // Entry in the ̀`CONNECTIONS` map.
66
    type Stream = StreamWrapper; // Entry in the ̀`STREAMS` map and a read buffer.
67
    type StreamConnectFuture = future::Ready<Self::Stream>;
68
    type ReadWriteAccess<'a> = ReadWriteAccess<'a>;
69
    type StreamErrorRef<'a> = StreamError;
70
    type MultiStreamConnectFuture = pin::Pin<
71
        Box<
72
            dyn Future<
73
                    Output = smoldot_light::platform::MultiStreamWebRtcConnection<
74
                        Self::MultiStream,
75
                    >,
76
                > + Send,
77
        >,
78
    >;
79
    type StreamUpdateFuture<'a> = pin::Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
80
    type NextSubstreamFuture<'a> = pin::Pin<
81
        Box<
82
            dyn Future<Output = Option<(Self::Stream, smoldot_light::platform::SubstreamDirection)>>
83
                + Send
84
                + 'a,
85
        >,
86
    >;
87
88
0
    fn now_from_unix_epoch(&self) -> Duration {
89
0
        let microseconds = bindings::unix_timestamp_us();
90
0
        Duration::from_micros(microseconds)
91
0
    }
92
93
0
    fn now(&self) -> Self::Instant {
94
0
        let microseconds = bindings::monotonic_clock_us();
95
0
        Duration::from_micros(microseconds)
96
0
    }
97
98
0
    fn fill_random_bytes(&self, buffer: &mut [u8]) {
99
        unsafe {
100
0
            bindings::random_get(
101
0
                u32::try_from(buffer.as_mut_ptr() as usize).unwrap(),
102
0
                u32::try_from(buffer.len()).unwrap(),
103
            )
104
        }
105
0
    }
106
107
0
    fn sleep(&self, duration: Duration) -> Self::Delay {
108
0
        Delay::new(duration)
109
0
    }
110
111
0
    fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
112
0
        Delay::new_at_monotonic_clock(when)
113
0
    }
114
115
0
    fn spawn_task(&self, task_name: Cow<str>, task: impl Future<Output = ()> + Send + 'static) {
116
        // The code below processes tasks that have names.
117
        #[pin_project::pin_project]
118
        struct FutureAdapter<F> {
119
            name: String,
120
            #[pin]
121
            future: F,
122
        }
123
124
        impl<F: Future> Future for FutureAdapter<F> {
125
            type Output = F::Output;
126
0
            fn poll(self: pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
127
0
                let this = self.project();
128
0
                bindings::current_task_entered(
129
0
                    u32::try_from(this.name.as_bytes().as_ptr() as usize).unwrap(),
130
0
                    u32::try_from(this.name.as_bytes().len()).unwrap(),
131
                );
132
133
0
                let before_polling = bindings::monotonic_clock_us();
134
0
                let out = this.future.poll(cx);
135
0
                let poll_duration =
136
0
                    Duration::from_micros(bindings::monotonic_clock_us() - before_polling);
137
0
                TOTAL_CPU_USAGE_US.fetch_add(
138
0
                    u64::try_from(poll_duration.as_micros()).unwrap_or(u64::MAX),
139
0
                    Ordering::Relaxed,
140
                );
141
142
0
                bindings::current_task_exit();
143
144
                // Print a warning if polling the task takes a long time.
145
                // It has been noticed that sometimes in Firefox polling a task takes a 16ms + a
146
                // small amount. This most likely indicates that Firefox does something like
147
                // freezing the JS/Wasm execution before resuming it at the next frame, thus adding
148
                // 16ms to the execution time.
149
                // For this reason, the threshold above which a task takes too long must be
150
                // above 16ms no matter what.
151
0
                if poll_duration.as_millis() >= 20 {
152
0
                    smoldot_light::platform::PlatformRef::log(
153
0
                        &PLATFORM_REF,
154
0
                        smoldot_light::platform::LogLevel::Debug,
155
0
                        "smoldot",
156
0
                        "task-too-long-time",
157
0
                        [
158
0
                            ("name", this.name as &dyn fmt::Display),
159
0
                            (
160
0
                                "poll_duration_ms",
161
0
                                &poll_duration.as_millis() as &dyn fmt::Display,
162
0
                            ),
163
0
                        ]
164
0
                        .into_iter(),
165
0
                    );
166
0
                }
167
0
                if poll_duration.as_millis() >= 150 {
168
0
                    smoldot_light::platform::PlatformRef::log(
169
0
                        &PLATFORM_REF,
170
0
                        smoldot_light::platform::LogLevel::Warn,
171
0
                        "smoldot",
172
0
                        &format!(
173
0
                            "The task named `{}` has occupied the CPU for an \
174
0
                            unreasonable amount of time ({}ms).",
175
0
                            this.name,
176
0
                            poll_duration.as_millis(),
177
0
                        ),
178
0
                        iter::empty(),
179
0
                    );
180
0
                }
181
182
0
                out
183
0
            }
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvMs2_B1a_INtB1a_6ClientBO_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB1a_7ChainIdEEsa_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtB1a_15runtime_service14run_backgroundBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_15network_service5tasks29single_stream_connection_taskBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_15network_service5tasks35webrtc_multi_stream_connection_taskBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_16json_rpc_service10background3runBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB1a_15runtime_serviceINtB2D_14RuntimeServiceBO_E31send_message_or_restart_service00ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB1a_20transactions_serviceINtB2D_19TransactionsServiceBO_E18send_to_background00ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvB7_9add_chains0_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_12sync_serviceINtB2B_11SyncServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_15network_serviceINtB2B_14NetworkServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_15network_serviceINtB2B_14NetworkServiceBO_E9add_chains_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_20transactions_serviceINtB2B_19TransactionsServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvNtB7_4init4init0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_
184
        }
185
186
0
        let task = FutureAdapter {
187
0
            name: task_name.into_owned(),
188
0
            future: task,
189
0
        };
190
191
0
        let (runnable, task) = async_task::spawn(task, |runnable| {
192
0
            super::TASKS_QUEUE.push(runnable);
193
0
            bindings::advance_execution_ready();
194
0
        });
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B1a_INtB1a_6ClientBO_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB1a_7ChainIdEEsa_0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB1a_15runtime_service14run_backgroundBO_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_15network_service5tasks29single_stream_connection_taskBO_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_15network_service5tasks35webrtc_multi_stream_connection_taskBO_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_16json_rpc_service10background3runBO_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB1a_15runtime_serviceINtB2i_14RuntimeServiceBO_E31send_message_or_restart_service00E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB1a_20transactions_serviceINtB2i_19TransactionsServiceBO_E18send_to_background00E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB7_9add_chains0_0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_12sync_serviceINtB2g_11SyncServiceBO_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_15network_serviceINtB2g_14NetworkServiceBO_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_15network_serviceINtB2g_14NetworkServiceBO_E9add_chains_0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_20transactions_serviceINtB2g_19TransactionsServiceBO_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB7_4init4init0E0B7_
195
196
0
        task.detach();
197
0
        runnable.schedule();
198
0
    }
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B18_INtB18_6ClientBM_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB18_7ChainIdEEsa_0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB18_15runtime_service14run_backgroundBM_E0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_15network_service5tasks29single_stream_connection_taskBM_E0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_15network_service5tasks35webrtc_multi_stream_connection_taskBM_E0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_16json_rpc_service10background3runBM_E0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB18_15runtime_serviceINtB2g_14RuntimeServiceBM_E31send_message_or_restart_service00EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB18_20transactions_serviceINtB2g_19TransactionsServiceBM_E18send_to_background00EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB5_9add_chains0_0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_12sync_serviceINtB2e_11SyncServiceBM_E3new0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_15network_serviceINtB2e_14NetworkServiceBM_E3new0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_15network_serviceINtB2e_14NetworkServiceBM_E9add_chains_0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_20transactions_serviceINtB2e_19TransactionsServiceBM_E3new0EB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB5_4init4init0EB5_
199
200
0
    fn log<'a>(
201
0
        &self,
202
0
        log_level: smoldot_light::platform::LogLevel,
203
0
        log_target: &'a str,
204
0
        message: &'a str,
205
0
        key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
206
0
    ) {
207
0
        let log_level = match log_level {
208
0
            smoldot_light::platform::LogLevel::Error => 1,
209
0
            smoldot_light::platform::LogLevel::Warn => 2,
210
0
            smoldot_light::platform::LogLevel::Info => 3,
211
0
            smoldot_light::platform::LogLevel::Debug => 4,
212
0
            smoldot_light::platform::LogLevel::Trace => 5,
213
        };
214
215
0
        if log_level > MAX_LOG_LEVEL.load(Ordering::Relaxed) {
216
0
            return;
217
0
        }
218
219
0
        let mut key_values = key_values.peekable();
220
221
0
        if key_values.peek().is_none() {
222
0
            bindings::log(
223
0
                log_level,
224
0
                u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(),
225
0
                u32::try_from(log_target.as_bytes().len()).unwrap(),
226
0
                u32::try_from(message.as_bytes().as_ptr() as usize).unwrap(),
227
0
                u32::try_from(message.as_bytes().len()).unwrap(),
228
            )
229
        } else {
230
0
            let mut message_build = String::with_capacity(128);
231
0
            message_build.push_str(message);
232
0
            let mut first = true;
233
0
            for (key, value) in key_values {
234
0
                if first {
235
0
                    let _ = write!(message_build, "; ");
236
0
                    first = false;
237
0
                } else {
238
0
                    let _ = write!(message_build, ", ");
239
0
                }
240
0
                let _ = write!(message_build, "{}={}", key, value);
241
            }
242
243
0
            bindings::log(
244
0
                log_level,
245
0
                u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(),
246
0
                u32::try_from(log_target.as_bytes().len()).unwrap(),
247
0
                u32::try_from(message_build.as_bytes().as_ptr() as usize).unwrap(),
248
0
                u32::try_from(message_build.as_bytes().len()).unwrap(),
249
            )
250
        }
251
0
    }
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtCs1p5UDGgVI4d_4core5array4iter8IntoIterTReRDNtNtB28_3fmt7DisplayEL_EKj2_EEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter7sources5empty5EmptyTReRDNtNtB2a_3fmt7DisplayEL_EEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEB5_
Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEINtNtB2Y_5empty5EmptyB3m_EEEB5_
252
253
0
    fn client_name(&self) -> Cow<str> {
254
0
        env!("CARGO_PKG_NAME").into()
255
0
    }
256
257
0
    fn client_version(&self) -> Cow<str> {
258
0
        env!("CARGO_PKG_VERSION").into()
259
0
    }
260
261
0
    fn supports_connection_type(
262
0
        &self,
263
0
        connection_type: smoldot_light::platform::ConnectionType,
264
0
    ) -> bool {
265
0
        let ty = match connection_type {
266
0
            smoldot_light::platform::ConnectionType::TcpIpv4 => 0,
267
0
            smoldot_light::platform::ConnectionType::TcpIpv6 => 1,
268
0
            smoldot_light::platform::ConnectionType::TcpDns => 2,
269
            smoldot_light::platform::ConnectionType::WebSocketIpv4 {
270
                remote_is_localhost: true,
271
                ..
272
            }
273
            | smoldot_light::platform::ConnectionType::WebSocketIpv6 {
274
                remote_is_localhost: true,
275
                ..
276
            }
277
            | smoldot_light::platform::ConnectionType::WebSocketDns {
278
                secure: false,
279
                remote_is_localhost: true,
280
0
            } => 7,
281
0
            smoldot_light::platform::ConnectionType::WebSocketIpv4 { .. } => 4,
282
0
            smoldot_light::platform::ConnectionType::WebSocketIpv6 { .. } => 5,
283
0
            smoldot_light::platform::ConnectionType::WebSocketDns { secure: false, .. } => 6,
284
0
            smoldot_light::platform::ConnectionType::WebSocketDns { secure: true, .. } => 14,
285
0
            smoldot_light::platform::ConnectionType::WebRtcIpv4 => 16,
286
0
            smoldot_light::platform::ConnectionType::WebRtcIpv6 => 17,
287
        };
288
289
0
        bindings::connection_type_supported(ty) != 0
290
0
    }
291
292
0
    fn connect_stream(
293
0
        &self,
294
0
        address: smoldot_light::platform::Address,
295
0
    ) -> Self::StreamConnectFuture {
296
0
        let mut lock = STATE.try_lock().unwrap();
297
298
0
        let connection_id = lock.next_connection_id;
299
0
        lock.next_connection_id += 1;
300
301
0
        let encoded_address: Vec<u8> = match address {
302
            smoldot_light::platform::Address::TcpIp {
303
0
                ip: IpAddr::V4(ip),
304
0
                port,
305
0
            } => iter::once(0u8)
306
0
                .chain(port.to_be_bytes())
307
0
                .chain(ip.to_string().bytes())
308
0
                .collect(),
309
            smoldot_light::platform::Address::TcpIp {
310
0
                ip: IpAddr::V6(ip),
311
0
                port,
312
0
            } => iter::once(1u8)
313
0
                .chain(port.to_be_bytes())
314
0
                .chain(ip.to_string().bytes())
315
0
                .collect(),
316
0
            smoldot_light::platform::Address::TcpDns { hostname, port } => iter::once(2u8)
317
0
                .chain(port.to_be_bytes())
318
0
                .chain(hostname.as_bytes().iter().copied())
319
0
                .collect(),
320
            smoldot_light::platform::Address::WebSocketIp {
321
0
                ip: IpAddr::V4(ip),
322
0
                port,
323
0
            } => iter::once(4u8)
324
0
                .chain(port.to_be_bytes())
325
0
                .chain(ip.to_string().bytes())
326
0
                .collect(),
327
            smoldot_light::platform::Address::WebSocketIp {
328
0
                ip: IpAddr::V6(ip),
329
0
                port,
330
0
            } => iter::once(5u8)
331
0
                .chain(port.to_be_bytes())
332
0
                .chain(ip.to_string().bytes())
333
0
                .collect(),
334
            smoldot_light::platform::Address::WebSocketDns {
335
0
                hostname,
336
0
                port,
337
                secure: false,
338
0
            } => iter::once(6u8)
339
0
                .chain(port.to_be_bytes())
340
0
                .chain(hostname.as_bytes().iter().copied())
341
0
                .collect(),
342
            smoldot_light::platform::Address::WebSocketDns {
343
0
                hostname,
344
0
                port,
345
                secure: true,
346
0
            } => iter::once(14u8)
347
0
                .chain(port.to_be_bytes())
348
0
                .chain(hostname.as_bytes().iter().copied())
349
0
                .collect(),
350
        };
351
352
0
        let write_closable = match address {
353
            smoldot_light::platform::Address::TcpIp { .. }
354
0
            | smoldot_light::platform::Address::TcpDns { .. } => true,
355
            smoldot_light::platform::Address::WebSocketIp { .. }
356
0
            | smoldot_light::platform::Address::WebSocketDns { .. } => false,
357
        };
358
359
0
        bindings::connection_new(
360
0
            connection_id,
361
0
            u32::try_from(encoded_address.as_ptr() as usize).unwrap(),
362
0
            u32::try_from(encoded_address.len()).unwrap(),
363
        );
364
365
0
        let _prev_value = lock.connections.insert(
366
0
            connection_id,
367
0
            Connection {
368
0
                inner: ConnectionInner::SingleStreamMsNoiseYamux,
369
0
                something_happened: event_listener::Event::new(),
370
0
            },
371
        );
372
0
        debug_assert!(_prev_value.is_none());
373
374
0
        let _prev_value = lock.streams.insert(
375
0
            (connection_id, None),
376
0
            Stream {
377
0
                reset: None,
378
0
                messages_queue: VecDeque::with_capacity(8),
379
0
                messages_queue_total_size: 0,
380
0
                something_happened: event_listener::Event::new(),
381
0
                writable_bytes_extra: 0,
382
0
            },
383
        );
384
0
        debug_assert!(_prev_value.is_none());
385
386
0
        future::ready(StreamWrapper {
387
0
            connection_id,
388
0
            stream_id: None,
389
0
            read_buffer: Vec::new(),
390
0
            inner_expected_incoming_bytes: Some(1),
391
0
            is_reset: None,
392
0
            writable_bytes: 0,
393
0
            write_closable,
394
0
            write_closed: false,
395
0
            when_wake_up: None,
396
0
        })
397
0
    }
398
399
0
    fn connect_multistream(
400
0
        &self,
401
0
        address: smoldot_light::platform::MultiStreamAddress,
402
0
    ) -> Self::MultiStreamConnectFuture {
403
0
        let mut lock = STATE.try_lock().unwrap();
404
405
0
        let connection_id = lock.next_connection_id;
406
0
        lock.next_connection_id += 1;
407
408
0
        let encoded_address: Vec<u8> = match address {
409
            smoldot_light::platform::MultiStreamAddress::WebRtc {
410
0
                ip: IpAddr::V4(ip),
411
0
                port,
412
0
                remote_certificate_sha256,
413
0
            } => iter::once(16u8)
414
0
                .chain(port.to_be_bytes())
415
0
                .chain(remote_certificate_sha256.iter().copied())
416
0
                .chain(ip.to_string().bytes())
417
0
                .collect(),
418
            smoldot_light::platform::MultiStreamAddress::WebRtc {
419
0
                ip: IpAddr::V6(ip),
420
0
                port,
421
0
                remote_certificate_sha256,
422
0
            } => iter::once(17u8)
423
0
                .chain(port.to_be_bytes())
424
0
                .chain(remote_certificate_sha256.iter().copied())
425
0
                .chain(ip.to_string().bytes())
426
0
                .collect(),
427
        };
428
429
0
        bindings::connection_new(
430
0
            connection_id,
431
0
            u32::try_from(encoded_address.as_ptr() as usize).unwrap(),
432
0
            u32::try_from(encoded_address.len()).unwrap(),
433
        );
434
435
0
        let _prev_value = lock.connections.insert(
436
0
            connection_id,
437
0
            Connection {
438
0
                inner: ConnectionInner::MultiStreamUnknownHandshake {
439
0
                    opened_substreams_to_pick_up: VecDeque::with_capacity(0),
440
0
                    connection_handles_alive: 1,
441
0
                },
442
0
                something_happened: event_listener::Event::new(),
443
0
            },
444
        );
445
0
        debug_assert!(_prev_value.is_none());
446
447
0
        Box::pin(async move {
448
            // Wait until the connection state is no longer "unknown handshake".
449
0
            let mut lock = loop {
450
0
                let something_happened = {
451
0
                    let mut lock = STATE.try_lock().unwrap();
452
0
                    let connection = lock.connections.get_mut(&connection_id).unwrap();
453
454
0
                    if matches!(
455
0
                        connection.inner,
456
                        ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. }
457
                    ) {
458
0
                        break lock;
459
0
                    }
460
461
0
                    connection.something_happened.listen()
462
                };
463
464
0
                something_happened.await
465
            };
466
0
            let lock = &mut *lock;
467
468
0
            let connection = lock.connections.get_mut(&connection_id).unwrap();
469
470
0
            match &mut connection.inner {
471
                ConnectionInner::SingleStreamMsNoiseYamux { .. }
472
                | ConnectionInner::MultiStreamUnknownHandshake { .. } => {
473
0
                    unreachable!()
474
                }
475
                ConnectionInner::MultiStreamWebRtc {
476
0
                    local_tls_certificate_sha256,
477
                    ..
478
0
                } => smoldot_light::platform::MultiStreamWebRtcConnection {
479
0
                    connection: MultiStreamWrapper(connection_id),
480
0
                    local_tls_certificate_sha256: *local_tls_certificate_sha256,
481
0
                },
482
                ConnectionInner::Reset { .. } => {
483
                    // If the connection was already reset, we proceed anyway but provide a fake
484
                    // certificate hash. This has absolutely no consequence.
485
0
                    smoldot_light::platform::MultiStreamWebRtcConnection {
486
0
                        connection: MultiStreamWrapper(connection_id),
487
0
                        local_tls_certificate_sha256: [0; 32],
488
0
                    }
489
                }
490
            }
491
0
        })
492
0
    }
493
494
0
    fn next_substream<'a>(
495
0
        &self,
496
0
        MultiStreamWrapper(connection_id): &'a mut Self::MultiStream,
497
0
    ) -> Self::NextSubstreamFuture<'a> {
498
0
        let connection_id = *connection_id;
499
500
0
        Box::pin(async move {
501
0
            let (stream_id, direction) = loop {
502
0
                let something_happened = {
503
0
                    let mut lock = STATE.try_lock().unwrap();
504
0
                    let connection = lock.connections.get_mut(&connection_id).unwrap();
505
506
0
                    match &mut connection.inner {
507
0
                        ConnectionInner::Reset { .. } => return None,
508
                        ConnectionInner::MultiStreamWebRtc {
509
0
                            opened_substreams_to_pick_up,
510
0
                            connection_handles_alive,
511
                            ..
512
                        }
513
                        | ConnectionInner::MultiStreamUnknownHandshake {
514
0
                            opened_substreams_to_pick_up,
515
0
                            connection_handles_alive,
516
                            ..
517
                        } => {
518
0
                            if let Some((substream, direction)) =
519
0
                                opened_substreams_to_pick_up.pop_front()
520
                            {
521
0
                                *connection_handles_alive += 1;
522
0
                                break (substream, direction);
523
0
                            }
524
                        }
525
                        ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
526
0
                            unreachable!()
527
                        }
528
                    }
529
530
0
                    connection.something_happened.listen()
531
                };
532
533
0
                something_happened.await
534
            };
535
536
0
            Some((
537
0
                StreamWrapper {
538
0
                    connection_id,
539
0
                    stream_id: Some(stream_id),
540
0
                    read_buffer: Vec::new(),
541
0
                    inner_expected_incoming_bytes: Some(1),
542
0
                    is_reset: None,
543
0
                    writable_bytes: 0,
544
0
                    write_closable: false, // Note: this is currently hardcoded for WebRTC.
545
0
                    write_closed: false,
546
0
                    when_wake_up: None,
547
0
                },
548
0
                direction,
549
0
            ))
550
0
        })
551
0
    }
552
553
0
    fn open_out_substream(&self, MultiStreamWrapper(connection_id): &mut Self::MultiStream) {
554
0
        match STATE
555
0
            .try_lock()
556
0
            .unwrap()
557
0
            .connections
558
0
            .get(connection_id)
559
0
            .unwrap()
560
0
            .inner
561
        {
562
            ConnectionInner::MultiStreamWebRtc { .. }
563
0
            | ConnectionInner::MultiStreamUnknownHandshake { .. } => {
564
0
                bindings::connection_stream_open(*connection_id);
565
0
            }
566
0
            ConnectionInner::Reset { .. } => {}
567
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
568
0
                unreachable!()
569
            }
570
        }
571
0
    }
572
573
0
    fn wait_read_write_again<'a>(
574
0
        &self,
575
0
        stream: pin::Pin<&'a mut Self::Stream>,
576
0
    ) -> Self::StreamUpdateFuture<'a> {
577
0
        Box::pin(async move {
578
0
            let stream = stream.get_mut();
579
580
0
            if stream.is_reset.is_some() {
581
0
                future::pending::<()>().await;
582
0
            }
583
584
            loop {
585
0
                let listener = {
586
0
                    let mut lock = STATE.try_lock().unwrap();
587
0
                    let stream_inner = lock
588
0
                        .streams
589
0
                        .get_mut(&(stream.connection_id, stream.stream_id))
590
0
                        .unwrap();
591
592
0
                    if let Some(msg) = &stream_inner.reset {
593
0
                        stream.is_reset = Some(msg.clone());
594
0
                        return;
595
0
                    }
596
597
0
                    let mut shall_return = false;
598
599
                    // Move the buffers from `STATE` into `read_buffer`.
600
0
                    if !stream_inner.messages_queue.is_empty() {
601
0
                        stream
602
0
                            .read_buffer
603
0
                            .reserve(stream_inner.messages_queue_total_size);
604
605
0
                        while let Some(msg) = stream_inner.messages_queue.pop_front() {
606
0
                            stream_inner.messages_queue_total_size -= msg.len();
607
                            // TODO: could be optimized by reworking the bindings
608
0
                            stream.read_buffer.extend_from_slice(&msg);
609
0
                            if stream
610
0
                                .inner_expected_incoming_bytes
611
0
                                .map_or(false, |expected| expected <= stream.read_buffer.len())
612
                            {
613
0
                                shall_return = true;
614
0
                                break;
615
0
                            }
616
                        }
617
0
                    }
618
619
0
                    if stream_inner.writable_bytes_extra != 0 {
620
0
                        // As documented, the number of writable bytes must never become
621
0
                        // exceedingly large (a few megabytes). As such, this can't overflow
622
0
                        // unless there is a bug on the JavaScript side.
623
0
                        stream.writable_bytes += stream_inner.writable_bytes_extra;
624
0
                        stream_inner.writable_bytes_extra = 0;
625
0
                        shall_return = true;
626
0
                    }
627
628
0
                    if shall_return {
629
0
                        return;
630
0
                    }
631
632
0
                    stream_inner.something_happened.listen()
633
                };
634
635
0
                let timer_stop = async move {
636
0
                    listener.await;
637
0
                    false
638
0
                }
639
0
                .or(async {
640
0
                    if let Some(when_wake_up) = stream.when_wake_up.as_mut() {
641
0
                        when_wake_up.await;
642
0
                        stream.when_wake_up = None;
643
0
                        true
644
                    } else {
645
0
                        future::pending().await
646
                    }
647
0
                })
648
0
                .await;
649
650
0
                if timer_stop {
651
0
                    return;
652
0
                }
653
            }
654
0
        })
655
0
    }
656
657
0
    fn read_write_access<'a>(
658
0
        &self,
659
0
        stream: pin::Pin<&'a mut Self::Stream>,
660
0
    ) -> Result<Self::ReadWriteAccess<'a>, Self::StreamErrorRef<'a>> {
661
0
        let stream = stream.get_mut();
662
663
0
        if let Some(message) = &stream.is_reset {
664
0
            return Err(StreamError {
665
0
                message: message.clone(),
666
0
            });
667
0
        }
668
669
        Ok(ReadWriteAccess {
670
            read_write: read_write::ReadWrite {
671
0
                now: Duration::from_micros(bindings::monotonic_clock_us()),
672
0
                incoming_buffer: mem::take(&mut stream.read_buffer),
673
0
                expected_incoming_bytes: Some(0),
674
                read_bytes: 0,
675
0
                write_buffers: Vec::new(),
676
                write_bytes_queued: 0,
677
0
                write_bytes_queueable: if !stream.write_closed {
678
0
                    Some(stream.writable_bytes)
679
                } else {
680
0
                    None
681
                },
682
0
                wake_up_after: None,
683
            },
684
0
            stream,
685
        })
686
0
    }
687
}
688
689
pub(crate) struct ReadWriteAccess<'a> {
690
    read_write: read_write::ReadWrite<Duration>,
691
    stream: &'a mut StreamWrapper,
692
}
693
694
impl<'a> ops::Deref for ReadWriteAccess<'a> {
695
    type Target = read_write::ReadWrite<Duration>;
696
697
0
    fn deref(&self) -> &Self::Target {
698
0
        &self.read_write
699
0
    }
700
}
701
702
impl<'a> ops::DerefMut for ReadWriteAccess<'a> {
703
0
    fn deref_mut(&mut self) -> &mut Self::Target {
704
0
        &mut self.read_write
705
0
    }
706
}
707
708
impl<'a> Drop for ReadWriteAccess<'a> {
709
0
    fn drop(&mut self) {
710
0
        let mut lock = STATE.try_lock().unwrap();
711
712
0
        let stream_inner = lock
713
0
            .streams
714
0
            .get_mut(&(self.stream.connection_id, self.stream.stream_id))
715
0
            .unwrap();
716
717
0
        if (self.read_write.read_bytes != 0
718
0
            && self
719
0
                .read_write
720
0
                .expected_incoming_bytes
721
0
                .map_or(false, |expected| {
722
0
                    expected >= self.read_write.incoming_buffer.len()
723
0
                }))
724
0
            || (self.read_write.write_bytes_queued != 0
725
0
                && self.read_write.write_bytes_queueable.is_some())
726
0
        {
727
0
            self.read_write.wake_up_asap();
728
0
        }
729
730
0
        self.stream.when_wake_up = self
731
0
            .read_write
732
0
            .wake_up_after
733
0
            .map(Delay::new_at_monotonic_clock);
734
735
0
        self.stream.read_buffer = mem::take(&mut self.read_write.incoming_buffer);
736
737
0
        self.stream.inner_expected_incoming_bytes = self.read_write.expected_incoming_bytes;
738
739
0
        if !self.read_write.write_buffers.is_empty() && stream_inner.reset.is_none() {
740
0
            let mut io_vectors = Vec::with_capacity(self.read_write.write_buffers.len());
741
0
            let mut total_length = 0;
742
743
0
            for buffer in &self.read_write.write_buffers {
744
0
                io_vectors.push(bindings::StreamSendIoVector {
745
0
                    ptr: u32::try_from(buffer.as_ptr() as usize).unwrap(),
746
0
                    len: u32::try_from(buffer.len()).unwrap(),
747
0
                });
748
0
                total_length += buffer.len();
749
0
            }
750
751
0
            assert!(total_length <= self.stream.writable_bytes);
752
0
            self.stream.writable_bytes -= total_length;
753
754
            // `unwrap()` is ok as there's no way that `buffer.len()` doesn't fit in a `u64`.
755
0
            TOTAL_BYTES_SENT.fetch_add(u64::try_from(total_length).unwrap(), Ordering::Relaxed);
756
757
0
            bindings::stream_send(
758
0
                self.stream.connection_id,
759
0
                self.stream.stream_id.unwrap_or(0),
760
0
                u32::try_from(io_vectors.as_ptr() as usize).unwrap(),
761
0
                u32::try_from(io_vectors.len()).unwrap(),
762
            );
763
764
0
            self.read_write.write_buffers.clear();
765
0
        }
766
767
0
        if self.read_write.write_bytes_queueable.is_none() && !self.stream.write_closed {
768
0
            if stream_inner.reset.is_none() && self.stream.write_closable {
769
0
                bindings::stream_send_close(
770
0
                    self.stream.connection_id,
771
0
                    self.stream.stream_id.unwrap_or(0),
772
0
                );
773
0
            }
774
775
0
            self.stream.write_closed = true;
776
0
        }
777
0
    }
778
}
779
780
pub(crate) struct StreamWrapper {
781
    connection_id: u32,
782
    stream_id: Option<u32>,
783
    read_buffer: Vec<u8>,
784
    inner_expected_incoming_bytes: Option<usize>,
785
    /// `Some` if the remote has reset the stream and `update_stream` has since then been called.
786
    /// Contains the error message.
787
    is_reset: Option<String>,
788
    writable_bytes: usize,
789
    write_closable: bool,
790
    write_closed: bool,
791
    /// The stream should wake up after this delay.
792
    when_wake_up: Option<Delay>,
793
}
794
795
impl Drop for StreamWrapper {
796
0
    fn drop(&mut self) {
797
0
        let mut lock = STATE.try_lock().unwrap();
798
0
        let lock = &mut *lock;
799
800
0
        let connection = lock.connections.get_mut(&self.connection_id).unwrap();
801
0
        let removed_stream = lock
802
0
            .streams
803
0
            .remove(&(self.connection_id, self.stream_id))
804
0
            .unwrap();
805
806
0
        let remove_connection = match &mut connection.inner {
807
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
808
0
                if removed_stream.reset.is_none() {
809
0
                    bindings::reset_connection(self.connection_id);
810
0
                }
811
812
0
                debug_assert!(self.stream_id.is_none());
813
0
                true
814
            }
815
            ConnectionInner::MultiStreamWebRtc {
816
0
                connection_handles_alive,
817
                ..
818
            }
819
            | ConnectionInner::MultiStreamUnknownHandshake {
820
0
                connection_handles_alive,
821
                ..
822
            } => {
823
0
                if removed_stream.reset.is_none() {
824
0
                    bindings::connection_stream_reset(self.connection_id, self.stream_id.unwrap());
825
0
                }
826
0
                *connection_handles_alive -= 1;
827
0
                let remove_connection = *connection_handles_alive == 0;
828
0
                if remove_connection {
829
0
                    bindings::reset_connection(self.connection_id);
830
0
                }
831
0
                remove_connection
832
            }
833
            ConnectionInner::Reset {
834
0
                connection_handles_alive,
835
                ..
836
            } => {
837
0
                *connection_handles_alive -= 1;
838
0
                *connection_handles_alive == 0
839
            }
840
        };
841
842
0
        if remove_connection {
843
0
            lock.connections.remove(&self.connection_id).unwrap();
844
0
        }
845
0
    }
846
}
847
848
pub(crate) struct MultiStreamWrapper(u32);
849
850
impl Drop for MultiStreamWrapper {
851
0
    fn drop(&mut self) {
852
0
        let mut lock = STATE.try_lock().unwrap();
853
854
0
        let connection = lock.connections.get_mut(&self.0).unwrap();
855
0
        let (remove_connection, reset_connection) = match &mut connection.inner {
856
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
857
0
                unreachable!()
858
            }
859
            ConnectionInner::MultiStreamWebRtc {
860
0
                connection_handles_alive,
861
                ..
862
            }
863
            | ConnectionInner::MultiStreamUnknownHandshake {
864
0
                connection_handles_alive,
865
                ..
866
            } => {
867
0
                *connection_handles_alive -= 1;
868
0
                let v = *connection_handles_alive == 0;
869
0
                (v, v)
870
            }
871
0
            ConnectionInner::Reset { .. } => (true, false),
872
        };
873
874
0
        if remove_connection {
875
0
            lock.connections.remove(&self.0).unwrap();
876
0
        }
877
0
        if reset_connection {
878
0
            bindings::reset_connection(self.0);
879
0
        }
880
0
    }
881
}
882
883
#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
884
#[display("{message}")]
885
pub(crate) struct StreamError {
886
    message: String,
887
}
888
889
static STATE: Mutex<NetworkState> = Mutex::new(NetworkState {
890
    next_connection_id: 0,
891
    connections: hashbrown::HashMap::with_hasher(FnvBuildHasher),
892
    streams: BTreeMap::new(),
893
});
894
895
// TODO: we use a custom `FnvBuildHasher` because it's not possible to create `fnv::FnvBuildHasher` in a `const` context
896
struct FnvBuildHasher;
897
impl core::hash::BuildHasher for FnvBuildHasher {
898
    type Hasher = fnv::FnvHasher;
899
0
    fn build_hasher(&self) -> fnv::FnvHasher {
900
0
        fnv::FnvHasher::default()
901
0
    }
902
}
903
904
/// All the connections and streams that are alive.
905
///
906
/// Single-stream connections have one entry in `connections` and one entry in `streams` (with
907
/// a `stream_id` always equal to `None`).
908
/// Multi-stream connections have one entry in `connections` and zero or more entries in `streams`.
909
struct NetworkState {
910
    next_connection_id: u32,
911
    connections: hashbrown::HashMap<u32, Connection, FnvBuildHasher>,
912
    streams: BTreeMap<(u32, Option<u32>), Stream>,
913
}
914
915
struct Connection {
916
    /// Type of connection and extra fields that depend on the type.
917
    inner: ConnectionInner,
918
    /// Event notified whenever one of the fields above is modified.
919
    something_happened: event_listener::Event,
920
}
921
922
enum ConnectionInner {
923
    SingleStreamMsNoiseYamux,
924
    MultiStreamUnknownHandshake {
925
        /// List of substreams that the host (i.e. JavaScript side) has reported have been opened,
926
        /// but that haven't been reported through
927
        /// [`smoldot_light::platform::PlatformRef::next_substream`] yet.
928
        opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>,
929
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
930
        /// this connection. If it switches from 1 to 0, the connection must be removed.
931
        connection_handles_alive: u32,
932
    },
933
    MultiStreamWebRtc {
934
        /// List of substreams that the host (i.e. JavaScript side) has reported have been opened,
935
        /// but that haven't been reported through
936
        /// [`smoldot_light::platform::PlatformRef::next_substream`] yet.
937
        opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>,
938
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
939
        /// this connection. If it switches from 1 to 0, the connection must be removed.
940
        connection_handles_alive: u32,
941
        /// SHA256 hash of the TLS certificate used by the local node at the DTLS layer.
942
        local_tls_certificate_sha256: [u8; 32],
943
    },
944
    /// [`bindings::connection_reset`] has been called
945
    Reset {
946
        /// Message given by the bindings to justify the closure.
947
        // TODO: why is this unused? shouldn't it be not unused?
948
        _message: String,
949
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
950
        /// this connection. If it switches from 1 to 0, the connection must be removed.
951
        connection_handles_alive: u32,
952
    },
953
}
954
955
struct Stream {
956
    /// `Some` if [`bindings::stream_reset`] has been called. Contains the error message.
957
    reset: Option<String>,
958
    /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] that
959
    /// haven't been processed yet in a call to `update_stream`.
960
    writable_bytes_extra: usize,
961
    /// List of messages received through [`bindings::stream_message`]. Must never contain
962
    /// empty messages.
963
    messages_queue: VecDeque<Box<[u8]>>,
964
    /// Total size of all the messages stored in [`Stream::messages_queue`].
965
    messages_queue_total_size: usize,
966
    /// Event notified whenever one of the fields above is modified, such as a new message being
967
    /// queued.
968
    something_happened: event_listener::Event,
969
}
970
971
0
pub(crate) fn connection_multi_stream_set_handshake_info(
972
0
    connection_id: u32,
973
0
    handshake_ty: Box<[u8]>,
974
0
) {
975
0
    let (_, local_tls_certificate_sha256) = nom::Parser::parse(
976
0
        &mut nom::sequence::preceded(
977
0
            nom::bytes::streaming::tag::<_, _, nom::error::Error<&[u8]>>(&[0][..]),
978
0
            nom::combinator::map(nom::bytes::streaming::take(32u32), |b| {
979
0
                <&[u8; 32]>::try_from(b).unwrap()
980
0
            }),
981
        ),
982
0
        &handshake_ty[..],
983
    )
984
0
    .expect("invalid handshake type provided to connection_multi_stream_set_handshake_info");
985
986
0
    let mut lock = STATE.try_lock().unwrap();
987
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
988
989
0
    let (opened_substreams_to_pick_up, connection_handles_alive) = match &mut connection.inner {
990
        ConnectionInner::MultiStreamUnknownHandshake {
991
0
            opened_substreams_to_pick_up,
992
0
            connection_handles_alive,
993
0
        } => (
994
0
            mem::take(opened_substreams_to_pick_up),
995
0
            *connection_handles_alive,
996
0
        ),
997
0
        _ => unreachable!(),
998
    };
999
1000
0
    connection.inner = ConnectionInner::MultiStreamWebRtc {
1001
0
        opened_substreams_to_pick_up,
1002
0
        connection_handles_alive,
1003
0
        local_tls_certificate_sha256: *local_tls_certificate_sha256,
1004
0
    };
1005
0
    connection.something_happened.notify(usize::MAX);
1006
0
}
1007
1008
0
pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u32) {
1009
0
    let mut lock = STATE.try_lock().unwrap();
1010
1011
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1012
1013
    // For single stream connections, the docs of this function mentions that `stream_id` can be
1014
    // any value.
1015
0
    let actual_stream_id = match connection.inner {
1016
        ConnectionInner::MultiStreamWebRtc { .. }
1017
0
        | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id),
1018
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => None,
1019
0
        ConnectionInner::Reset { .. } => unreachable!(),
1020
    };
1021
1022
0
    let stream = lock
1023
0
        .streams
1024
0
        .get_mut(&(connection_id, actual_stream_id))
1025
0
        .unwrap();
1026
0
    debug_assert!(stream.reset.is_none());
1027
1028
    // As documented, the number of writable bytes must never become exceedingly large (a few
1029
    // megabytes). As such, this can't overflow unless there is a bug on the JavaScript side.
1030
0
    stream.writable_bytes_extra += usize::try_from(bytes).unwrap();
1031
0
    stream.something_happened.notify(usize::MAX);
1032
0
}
1033
1034
0
pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Box<[u8]>) {
1035
0
    let mut lock = STATE.try_lock().unwrap();
1036
1037
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1038
1039
    // For single stream connections, the docs of this function mentions that `stream_id` can be
1040
    // any value.
1041
0
    let actual_stream_id = match connection.inner {
1042
        ConnectionInner::MultiStreamWebRtc { .. }
1043
0
        | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id),
1044
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => None,
1045
0
        ConnectionInner::Reset { .. } => unreachable!(),
1046
    };
1047
1048
0
    let stream = lock
1049
0
        .streams
1050
0
        .get_mut(&(connection_id, actual_stream_id))
1051
0
        .unwrap();
1052
0
    debug_assert!(stream.reset.is_none());
1053
1054
0
    TOTAL_BYTES_RECEIVED.fetch_add(u64::try_from(message.len()).unwrap(), Ordering::Relaxed);
1055
1056
    // Ignore empty message to avoid all sorts of problems.
1057
0
    if message.is_empty() {
1058
0
        return;
1059
0
    }
1060
1061
    // There is unfortunately no way to instruct the browser to back-pressure connections to
1062
    // remotes.
1063
    //
1064
    // In order to avoid DoS attacks, we refuse to buffer more than a certain amount of data per
1065
    // connection. This limit is completely arbitrary, and this is in no way a robust solution
1066
    // because this limit isn't in sync with any other part of the code. In other words, it could
1067
    // be legitimate for the remote to buffer a large amount of data.
1068
    //
1069
    // This corner case is handled by discarding the messages that would go over the limit. While
1070
    // this is not a great solution, going over that limit can be considered as a fault from the
1071
    // remote, the same way as it would be a fault from the remote to forget to send some bytes,
1072
    // and thus should be handled in a similar way by the higher level code.
1073
    //
1074
    // A better way to handle this would be to kill the connection abruptly. However, this would
1075
    // add a lot of complex code in this module, and the effort is clearly not worth it for this
1076
    // niche situation.
1077
    //
1078
    // While this problem is specific to browsers (Deno and NodeJS have ways to back-pressure
1079
    // connections), we add this hack for all platforms, for consistency. If this limit is ever
1080
    // reached, we want to be sure to detect it, even when testing on NodeJS or Deno.
1081
    //
1082
    // See <https://github.com/smol-dot/smoldot/issues/109>.
1083
    // TODO: do this properly eventually ^
1084
0
    if stream.messages_queue_total_size >= 25 * 1024 * 1024 {
1085
0
        return;
1086
0
    }
1087
1088
0
    stream.messages_queue_total_size += message.len();
1089
0
    stream.messages_queue.push_back(message);
1090
0
    stream.something_happened.notify(usize::MAX);
1091
0
}
1092
1093
0
pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) {
1094
0
    let mut lock = STATE.try_lock().unwrap();
1095
0
    let lock = &mut *lock;
1096
1097
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1098
    if let ConnectionInner::MultiStreamWebRtc {
1099
0
        opened_substreams_to_pick_up,
1100
        ..
1101
0
    } = &mut connection.inner
1102
    {
1103
0
        let _prev_value = lock.streams.insert(
1104
0
            (connection_id, Some(stream_id)),
1105
0
            Stream {
1106
0
                reset: None,
1107
0
                messages_queue: VecDeque::with_capacity(8),
1108
0
                messages_queue_total_size: 0,
1109
0
                something_happened: event_listener::Event::new(),
1110
0
                writable_bytes_extra: 0,
1111
0
            },
1112
        );
1113
1114
0
        if _prev_value.is_some() {
1115
0
            panic!("same stream_id used multiple times in connection_stream_opened")
1116
0
        }
1117
1118
0
        opened_substreams_to_pick_up.push_back((
1119
0
            stream_id,
1120
0
            if outbound != 0 {
1121
0
                SubstreamDirection::Outbound
1122
            } else {
1123
0
                SubstreamDirection::Inbound
1124
            },
1125
        ));
1126
1127
0
        connection.something_happened.notify(usize::MAX);
1128
    } else {
1129
0
        panic!()
1130
    }
1131
0
}
1132
1133
0
pub(crate) fn connection_reset(connection_id: u32, message: Box<[u8]>) {
1134
0
    let message = str::from_utf8(&message)
1135
0
        .unwrap_or_else(|_| panic!("non-UTF-8 message"))
1136
0
        .to_owned();
1137
1138
0
    let mut lock = STATE.try_lock().unwrap();
1139
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1140
1141
0
    let connection_handles_alive = match &connection.inner {
1142
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something
1143
        ConnectionInner::MultiStreamWebRtc {
1144
0
            connection_handles_alive,
1145
            ..
1146
        }
1147
        | ConnectionInner::MultiStreamUnknownHandshake {
1148
0
            connection_handles_alive,
1149
            ..
1150
0
        } => *connection_handles_alive,
1151
0
        ConnectionInner::Reset { .. } => unreachable!(),
1152
    };
1153
1154
0
    connection.inner = ConnectionInner::Reset {
1155
0
        connection_handles_alive,
1156
0
        _message: message.clone(),
1157
0
    };
1158
1159
0
    connection.something_happened.notify(usize::MAX);
1160
1161
0
    for ((_, _), stream) in lock
1162
0
        .streams
1163
0
        .range_mut((connection_id, Some(u32::MIN))..=(connection_id, Some(u32::MAX)))
1164
0
    {
1165
0
        stream.reset = Some(message.clone());
1166
0
        stream.something_happened.notify(usize::MAX);
1167
0
    }
1168
0
    if let Some(stream) = lock.streams.get_mut(&(connection_id, None)) {
1169
0
        stream.reset = Some(message);
1170
0
        stream.something_happened.notify(usize::MAX);
1171
0
    }
1172
0
}
1173
1174
0
pub(crate) fn stream_reset(connection_id: u32, stream_id: u32, message: Box<[u8]>) {
1175
0
    let message: String = str::from_utf8(&message)
1176
0
        .unwrap_or_else(|_| panic!("non-UTF-8 message"))
1177
0
        .to_owned();
1178
1179
    // Note that, as documented, it is illegal to call this function on single-stream substreams.
1180
    // We can thus assume that the `stream_id` is valid.
1181
0
    let mut lock = STATE.try_lock().unwrap();
1182
0
    let stream = lock
1183
0
        .streams
1184
0
        .get_mut(&(connection_id, Some(stream_id)))
1185
0
        .unwrap();
1186
0
    stream.reset = Some(message);
1187
0
    stream.something_happened.notify(usize::MAX);
1188
0
}