Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/wasm-node/rust/src/platform.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{bindings, timers::Delay};
19
20
use futures_lite::future::FutureExt as _;
21
22
use smoldot_light::platform::{read_write, SubstreamDirection};
23
24
use alloc::{
25
    borrow::{Cow, ToOwned as _},
26
    boxed::Box,
27
    collections::{BTreeMap, VecDeque},
28
    format,
29
    string::{String, ToString as _},
30
    vec::Vec,
31
};
32
use async_lock::Mutex;
33
use core::{
34
    fmt::{self, Write as _},
35
    future, iter, mem,
36
    net::{IpAddr, Ipv4Addr, Ipv6Addr},
37
    ops, pin, str,
38
    sync::atomic::{AtomicU32, AtomicU64, Ordering},
39
    task,
40
    time::Duration,
41
};
42
43
/// Total number of bytes that all the connections created through [`PlatformRef`] combined have
44
/// received.
45
pub static TOTAL_BYTES_RECEIVED: AtomicU64 = AtomicU64::new(0);
46
/// Total number of bytes that all the connections created through [`PlatformRef`] combined have
47
/// sent.
48
pub static TOTAL_BYTES_SENT: AtomicU64 = AtomicU64::new(0);
49
/// Total number of microseconds that all the tasks have spent executing. A `u64` will overflow
50
/// after 584 542 years.
51
pub static TOTAL_CPU_USAGE_US: AtomicU64 = AtomicU64::new(0);
52
53
pub(crate) const PLATFORM_REF: PlatformRef = PlatformRef {};
54
55
/// Log level above which log entries aren't emitted.
56
pub static MAX_LOG_LEVEL: AtomicU32 = AtomicU32::new(0);
57
58
#[derive(Debug, Copy, Clone)]
59
pub(crate) struct PlatformRef {}
60
61
// TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code
62
impl smoldot_light::platform::PlatformRef for PlatformRef {
63
    type Delay = Delay;
64
    type Instant = Duration;
65
    type MultiStream = MultiStreamWrapper; // Entry in the ̀`CONNECTIONS` map.
66
    type Stream = StreamWrapper; // Entry in the ̀`STREAMS` map and a read buffer.
67
    type StreamConnectFuture = future::Ready<Self::Stream>;
68
    type ReadWriteAccess<'a> = ReadWriteAccess<'a>;
69
    type StreamErrorRef<'a> = StreamError;
70
    type MultiStreamConnectFuture = pin::Pin<
71
        Box<
72
            dyn future::Future<
73
                    Output = smoldot_light::platform::MultiStreamWebRtcConnection<
74
                        Self::MultiStream,
75
                    >,
76
                > + Send,
77
        >,
78
    >;
79
    type StreamUpdateFuture<'a> = pin::Pin<Box<dyn future::Future<Output = ()> + Send + 'a>>;
80
    type NextSubstreamFuture<'a> = pin::Pin<
81
        Box<
82
            dyn future::Future<
83
                    Output = Option<(Self::Stream, smoldot_light::platform::SubstreamDirection)>,
84
                > + Send
85
                + 'a,
86
        >,
87
    >;
88
89
0
    fn now_from_unix_epoch(&self) -> Duration {
90
0
        let microseconds = unsafe { bindings::unix_timestamp_us() };
91
0
        Duration::from_micros(microseconds)
92
0
    }
93
94
0
    fn now(&self) -> Self::Instant {
95
0
        let microseconds = unsafe { bindings::monotonic_clock_us() };
96
0
        Duration::from_micros(microseconds)
97
0
    }
98
99
0
    fn fill_random_bytes(&self, buffer: &mut [u8]) {
100
0
        unsafe {
101
0
            bindings::random_get(
102
0
                u32::try_from(buffer.as_mut_ptr() as usize).unwrap(),
103
0
                u32::try_from(buffer.len()).unwrap(),
104
0
            )
105
0
        }
106
0
    }
107
108
0
    fn sleep(&self, duration: Duration) -> Self::Delay {
109
0
        Delay::new(duration)
110
0
    }
111
112
0
    fn sleep_until(&self, when: Self::Instant) -> Self::Delay {
113
0
        Delay::new_at_monotonic_clock(when)
114
0
    }
115
116
0
    fn spawn_task(
117
0
        &self,
118
0
        task_name: Cow<str>,
119
0
        task: impl future::Future<Output = ()> + Send + 'static,
120
0
    ) {
121
0
        // The code below processes tasks that have names.
122
0
        #[pin_project::pin_project]
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvMs2_B1b_INtB1b_6ClientBP_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB1b_7ChainIdEEsa_0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtB1b_15runtime_service14run_backgroundBP_E0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_15network_service5tasks29single_stream_connection_taskBP_E0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_15network_service5tasks35webrtc_multi_stream_connection_taskBP_E0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_16json_rpc_service10background3runBP_E0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNCNvMNtB1b_15runtime_serviceINtB2H_14RuntimeServiceBP_E31send_message_or_restart_service00E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNCNvMNtB1b_20transactions_serviceINtB2H_19TransactionsServiceBP_E18send_to_background00E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvB9_9add_chains0_0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_12sync_serviceINtB2F_11SyncServiceBP_E3new0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_15network_serviceINtB2F_14NetworkServiceBP_E3new0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_15network_serviceINtB2F_14NetworkServiceBP_E9add_chains_0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_20transactions_serviceINtB2F_19TransactionsServiceBP_E3new0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvNtB9_4init4init0E7projectB9_
Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterpE11project_refB9_
Unexecuted instantiation: _RINvNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__24___assert_not_repr_packedpEB9_
Unexecuted instantiation: _RNvXININvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtBa_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__s3_0pEINtB7_13FutureAdapterpENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBc_
123
0
        struct FutureAdapter<F> {
124
0
            name: String,
125
0
            #[pin]
126
0
            future: F,
127
0
        }
128
0
129
0
        impl<F: future::Future> future::Future for FutureAdapter<F> {
130
0
            type Output = F::Output;
131
0
            fn poll(self: pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> {
132
0
                let this = self.project();
133
0
                unsafe {
134
0
                    bindings::current_task_entered(
135
0
                        u32::try_from(this.name.as_bytes().as_ptr() as usize).unwrap(),
136
0
                        u32::try_from(this.name.as_bytes().len()).unwrap(),
137
0
                    )
138
0
                }
139
0
140
0
                let before_polling = unsafe { bindings::monotonic_clock_us() };
141
0
                let out = this.future.poll(cx);
142
0
                let poll_duration = Duration::from_micros(
143
0
                    unsafe { bindings::monotonic_clock_us() } - before_polling,
144
0
                );
145
0
                TOTAL_CPU_USAGE_US.fetch_add(
146
0
                    u64::try_from(poll_duration.as_micros()).unwrap_or(u64::MAX),
147
0
                    Ordering::Relaxed,
148
0
                );
149
0
150
0
                unsafe {
151
0
                    bindings::current_task_exit();
152
0
                }
153
0
154
0
                // Print a warning if polling the task takes a long time.
155
0
                // It has been noticed that sometimes in Firefox polling a task takes a 16ms + a
156
0
                // small amount. This most likely indicates that Firefox does something like
157
0
                // freezing the JS/Wasm execution before resuming it at the next frame, thus adding
158
0
                // 16ms to the execution time.
159
0
                // For this reason, the threshold above which a task takes too long must be
160
0
                // above 16ms no matter what.
161
0
                if poll_duration.as_millis() >= 20 {
162
0
                    smoldot_light::platform::PlatformRef::log(
163
0
                        &PLATFORM_REF,
164
0
                        smoldot_light::platform::LogLevel::Debug,
165
0
                        "smoldot",
166
0
                        "task-too-long-time",
167
0
                        [
168
0
                            ("name", this.name as &dyn fmt::Display),
169
0
                            (
170
0
                                "poll_duration_ms",
171
0
                                &poll_duration.as_millis() as &dyn fmt::Display,
172
0
                            ),
173
0
                        ]
174
0
                        .into_iter(),
175
0
                    );
176
0
                }
177
0
                if poll_duration.as_millis() >= 150 {
178
0
                    smoldot_light::platform::PlatformRef::log(
179
0
                        &PLATFORM_REF,
180
0
                        smoldot_light::platform::LogLevel::Warn,
181
0
                        "smoldot",
182
0
                        &format!(
183
0
                            "The task named `{}` has occupied the CPU for an \
184
0
                            unreasonable amount of time ({}ms).",
185
0
                            this.name,
186
0
                            poll_duration.as_millis(),
187
0
                        ),
188
0
                        iter::empty(),
189
0
                    );
190
0
                }
191
0
192
0
                out
193
0
            }
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvMs2_B19_INtB19_6ClientBN_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB19_7ChainIdEEsa_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtB19_15runtime_service14run_backgroundBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_15network_service5tasks29single_stream_connection_taskBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_15network_service5tasks35webrtc_multi_stream_connection_taskBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_16json_rpc_service10background3runBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB19_15runtime_serviceINtB2C_14RuntimeServiceBN_E31send_message_or_restart_service00ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB19_20transactions_serviceINtB2C_19TransactionsServiceBN_E18send_to_background00ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvB7_9add_chains0_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_12sync_serviceINtB2A_11SyncServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_15network_serviceINtB2A_14NetworkServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_15network_serviceINtB2A_14NetworkServiceBN_E9add_chains_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_20transactions_serviceINtB2A_19TransactionsServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvNtB7_4init4init0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_
194
0
        }
195
0
196
0
        let task = FutureAdapter {
197
0
            name: task_name.into_owned(),
198
0
            future: task,
199
0
        };
200
0
201
0
        let (runnable, task) = async_task::spawn(task, |runnable| {
202
0
            super::TASKS_QUEUE.push(runnable);
203
0
            unsafe {
204
0
                bindings::advance_execution_ready();
205
0
            }
206
0
        });
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B19_INtB19_6ClientBN_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB19_7ChainIdEEsa_0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB19_15runtime_service14run_backgroundBN_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_15network_service5tasks29single_stream_connection_taskBN_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_15network_service5tasks35webrtc_multi_stream_connection_taskBN_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_16json_rpc_service10background3runBN_E0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB19_15runtime_serviceINtB2h_14RuntimeServiceBN_E31send_message_or_restart_service00E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB19_20transactions_serviceINtB2h_19TransactionsServiceBN_E18send_to_background00E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB7_9add_chains0_0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_12sync_serviceINtB2f_11SyncServiceBN_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_15network_serviceINtB2f_14NetworkServiceBN_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_15network_serviceINtB2f_14NetworkServiceBN_E9add_chains_0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_20transactions_serviceINtB2f_19TransactionsServiceBN_E3new0E0B7_
Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB7_4init4init0E0B7_
207
0
208
0
        task.detach();
209
0
        runnable.schedule();
210
0
    }
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B17_INtB17_6ClientBL_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB17_7ChainIdEEsa_0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB17_15runtime_service14run_backgroundBL_E0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_15network_service5tasks29single_stream_connection_taskBL_E0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_15network_service5tasks35webrtc_multi_stream_connection_taskBL_E0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_16json_rpc_service10background3runBL_E0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB17_15runtime_serviceINtB2f_14RuntimeServiceBL_E31send_message_or_restart_service00EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB17_20transactions_serviceINtB2f_19TransactionsServiceBL_E18send_to_background00EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB5_9add_chains0_0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_12sync_serviceINtB2d_11SyncServiceBL_E3new0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_15network_serviceINtB2d_14NetworkServiceBL_E3new0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_15network_serviceINtB2d_14NetworkServiceBL_E9add_chains_0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_20transactions_serviceINtB2d_19TransactionsServiceBL_E3new0EB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB5_4init4init0EB5_
211
212
0
    fn log<'a>(
213
0
        &self,
214
0
        log_level: smoldot_light::platform::LogLevel,
215
0
        log_target: &'a str,
216
0
        message: &'a str,
217
0
        key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>,
218
0
    ) {
219
0
        let log_level = match log_level {
220
0
            smoldot_light::platform::LogLevel::Error => 1,
221
0
            smoldot_light::platform::LogLevel::Warn => 2,
222
0
            smoldot_light::platform::LogLevel::Info => 3,
223
0
            smoldot_light::platform::LogLevel::Debug => 4,
224
0
            smoldot_light::platform::LogLevel::Trace => 5,
225
        };
226
227
0
        if log_level > MAX_LOG_LEVEL.load(Ordering::Relaxed) {
228
0
            return;
229
0
        }
230
0
231
0
        let mut key_values = key_values.peekable();
232
0
233
0
        if key_values.peek().is_none() {
234
            unsafe {
235
0
                bindings::log(
236
0
                    log_level,
237
0
                    u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(),
238
0
                    u32::try_from(log_target.as_bytes().len()).unwrap(),
239
0
                    u32::try_from(message.as_bytes().as_ptr() as usize).unwrap(),
240
0
                    u32::try_from(message.as_bytes().len()).unwrap(),
241
0
                )
242
            }
243
        } else {
244
0
            let mut message_build = String::with_capacity(128);
245
0
            message_build.push_str(message);
246
0
            let mut first = true;
247
0
            for (key, value) in key_values {
248
0
                if first {
249
0
                    let _ = write!(message_build, "; ");
250
0
                    first = false;
251
0
                } else {
252
0
                    let _ = write!(message_build, ", ");
253
0
                }
254
0
                let _ = write!(message_build, "{}={}", key, value);
255
            }
256
257
            unsafe {
258
0
                bindings::log(
259
0
                    log_level,
260
0
                    u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(),
261
0
                    u32::try_from(log_target.as_bytes().len()).unwrap(),
262
0
                    u32::try_from(message_build.as_bytes().as_ptr() as usize).unwrap(),
263
0
                    u32::try_from(message_build.as_bytes().len()).unwrap(),
264
0
                )
265
            }
266
        }
267
0
    }
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtCsaYZPK01V26L_4core5array4iter8IntoIterTReRDNtNtB27_3fmt7DisplayEL_EKj2_EEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter7sources5empty5EmptyTReRDNtNtB29_3fmt7DisplayEL_EEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEB5_
Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEINtNtB2X_5empty5EmptyB3l_EEEB5_
268
269
0
    fn client_name(&self) -> Cow<str> {
270
0
        env!("CARGO_PKG_NAME").into()
271
0
    }
272
273
0
    fn client_version(&self) -> Cow<str> {
274
0
        env!("CARGO_PKG_VERSION").into()
275
0
    }
276
277
0
    fn supports_connection_type(
278
0
        &self,
279
0
        connection_type: smoldot_light::platform::ConnectionType,
280
0
    ) -> bool {
281
0
        let ty = match connection_type {
282
0
            smoldot_light::platform::ConnectionType::TcpIpv4 => 0,
283
0
            smoldot_light::platform::ConnectionType::TcpIpv6 => 1,
284
0
            smoldot_light::platform::ConnectionType::TcpDns => 2,
285
            smoldot_light::platform::ConnectionType::WebSocketIpv4 {
286
                remote_is_localhost: true,
287
                ..
288
            }
289
            | smoldot_light::platform::ConnectionType::WebSocketIpv6 {
290
                remote_is_localhost: true,
291
                ..
292
            }
293
            | smoldot_light::platform::ConnectionType::WebSocketDns {
294
                secure: false,
295
                remote_is_localhost: true,
296
0
            } => 7,
297
0
            smoldot_light::platform::ConnectionType::WebSocketIpv4 { .. } => 4,
298
0
            smoldot_light::platform::ConnectionType::WebSocketIpv6 { .. } => 5,
299
0
            smoldot_light::platform::ConnectionType::WebSocketDns { secure: false, .. } => 6,
300
0
            smoldot_light::platform::ConnectionType::WebSocketDns { secure: true, .. } => 14,
301
0
            smoldot_light::platform::ConnectionType::WebRtcIpv4 => 16,
302
0
            smoldot_light::platform::ConnectionType::WebRtcIpv6 => 17,
303
        };
304
305
0
        unsafe { bindings::connection_type_supported(ty) != 0 }
306
0
    }
307
308
0
    fn connect_stream(
309
0
        &self,
310
0
        address: smoldot_light::platform::Address,
311
0
    ) -> Self::StreamConnectFuture {
312
0
        let mut lock = STATE.try_lock().unwrap();
313
0
314
0
        let connection_id = lock.next_connection_id;
315
0
        lock.next_connection_id += 1;
316
317
0
        let encoded_address: Vec<u8> = match address {
318
            smoldot_light::platform::Address::TcpIp {
319
0
                ip: IpAddr::V4(ip),
320
0
                port,
321
0
            } => iter::once(0u8)
322
0
                .chain(port.to_be_bytes())
323
0
                .chain(Ipv4Addr::from(ip).to_string().bytes())
324
0
                .collect(),
325
            smoldot_light::platform::Address::TcpIp {
326
0
                ip: IpAddr::V6(ip),
327
0
                port,
328
0
            } => iter::once(1u8)
329
0
                .chain(port.to_be_bytes())
330
0
                .chain(Ipv6Addr::from(ip).to_string().bytes())
331
0
                .collect(),
332
0
            smoldot_light::platform::Address::TcpDns { hostname, port } => iter::once(2u8)
333
0
                .chain(port.to_be_bytes())
334
0
                .chain(hostname.as_bytes().iter().copied())
335
0
                .collect(),
336
            smoldot_light::platform::Address::WebSocketIp {
337
0
                ip: IpAddr::V4(ip),
338
0
                port,
339
0
            } => iter::once(4u8)
340
0
                .chain(port.to_be_bytes())
341
0
                .chain(Ipv4Addr::from(ip).to_string().bytes())
342
0
                .collect(),
343
            smoldot_light::platform::Address::WebSocketIp {
344
0
                ip: IpAddr::V6(ip),
345
0
                port,
346
0
            } => iter::once(5u8)
347
0
                .chain(port.to_be_bytes())
348
0
                .chain(Ipv6Addr::from(ip).to_string().bytes())
349
0
                .collect(),
350
            smoldot_light::platform::Address::WebSocketDns {
351
0
                hostname,
352
0
                port,
353
0
                secure: false,
354
0
            } => iter::once(6u8)
355
0
                .chain(port.to_be_bytes())
356
0
                .chain(hostname.as_bytes().iter().copied())
357
0
                .collect(),
358
            smoldot_light::platform::Address::WebSocketDns {
359
0
                hostname,
360
0
                port,
361
0
                secure: true,
362
0
            } => iter::once(14u8)
363
0
                .chain(port.to_be_bytes())
364
0
                .chain(hostname.as_bytes().iter().copied())
365
0
                .collect(),
366
        };
367
368
0
        let write_closable = match address {
369
            smoldot_light::platform::Address::TcpIp { .. }
370
0
            | smoldot_light::platform::Address::TcpDns { .. } => true,
371
            smoldot_light::platform::Address::WebSocketIp { .. }
372
0
            | smoldot_light::platform::Address::WebSocketDns { .. } => false,
373
        };
374
375
        unsafe {
376
0
            bindings::connection_new(
377
0
                connection_id,
378
0
                u32::try_from(encoded_address.as_ptr() as usize).unwrap(),
379
0
                u32::try_from(encoded_address.len()).unwrap(),
380
0
            )
381
0
        }
382
0
383
0
        let _prev_value = lock.connections.insert(
384
0
            connection_id,
385
0
            Connection {
386
0
                inner: ConnectionInner::SingleStreamMsNoiseYamux,
387
0
                something_happened: event_listener::Event::new(),
388
0
            },
389
0
        );
390
0
        debug_assert!(_prev_value.is_none());
391
392
0
        let _prev_value = lock.streams.insert(
393
0
            (connection_id, None),
394
0
            Stream {
395
0
                reset: None,
396
0
                messages_queue: VecDeque::with_capacity(8),
397
0
                messages_queue_total_size: 0,
398
0
                something_happened: event_listener::Event::new(),
399
0
                writable_bytes_extra: 0,
400
0
            },
401
0
        );
402
0
        debug_assert!(_prev_value.is_none());
403
404
0
        future::ready(StreamWrapper {
405
0
            connection_id,
406
0
            stream_id: None,
407
0
            read_buffer: Vec::new(),
408
0
            inner_expected_incoming_bytes: Some(1),
409
0
            is_reset: None,
410
0
            writable_bytes: 0,
411
0
            write_closable,
412
0
            write_closed: false,
413
0
            when_wake_up: None,
414
0
        })
415
0
    }
416
417
0
    fn connect_multistream(
418
0
        &self,
419
0
        address: smoldot_light::platform::MultiStreamAddress,
420
0
    ) -> Self::MultiStreamConnectFuture {
421
0
        let mut lock = STATE.try_lock().unwrap();
422
0
423
0
        let connection_id = lock.next_connection_id;
424
0
        lock.next_connection_id += 1;
425
426
0
        let encoded_address: Vec<u8> = match address {
427
            smoldot_light::platform::MultiStreamAddress::WebRtc {
428
0
                ip: IpAddr::V4(ip),
429
0
                port,
430
0
                remote_certificate_sha256,
431
0
            } => iter::once(16u8)
432
0
                .chain(port.to_be_bytes())
433
0
                .chain(remote_certificate_sha256.iter().copied())
434
0
                .chain(Ipv4Addr::from(ip).to_string().bytes())
435
0
                .collect(),
436
            smoldot_light::platform::MultiStreamAddress::WebRtc {
437
0
                ip: IpAddr::V6(ip),
438
0
                port,
439
0
                remote_certificate_sha256,
440
0
            } => iter::once(17u8)
441
0
                .chain(port.to_be_bytes())
442
0
                .chain(remote_certificate_sha256.iter().copied())
443
0
                .chain(Ipv6Addr::from(ip).to_string().bytes())
444
0
                .collect(),
445
        };
446
447
        unsafe {
448
0
            bindings::connection_new(
449
0
                connection_id,
450
0
                u32::try_from(encoded_address.as_ptr() as usize).unwrap(),
451
0
                u32::try_from(encoded_address.len()).unwrap(),
452
0
            )
453
0
        }
454
0
455
0
        let _prev_value = lock.connections.insert(
456
0
            connection_id,
457
0
            Connection {
458
0
                inner: ConnectionInner::MultiStreamUnknownHandshake {
459
0
                    opened_substreams_to_pick_up: VecDeque::with_capacity(0),
460
0
                    connection_handles_alive: 1,
461
0
                },
462
0
                something_happened: event_listener::Event::new(),
463
0
            },
464
0
        );
465
0
        debug_assert!(_prev_value.is_none());
466
467
0
        Box::pin(async move {
468
            // Wait until the connection state is no longer "unknown handshake".
469
0
            let mut lock = loop {
470
0
                let something_happened = {
471
0
                    let mut lock = STATE.try_lock().unwrap();
472
0
                    let connection = lock.connections.get_mut(&connection_id).unwrap();
473
474
0
                    if matches!(
475
0
                        connection.inner,
476
                        ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. }
477
                    ) {
478
0
                        break lock;
479
0
                    }
480
0
481
0
                    connection.something_happened.listen()
482
0
                };
483
0
484
0
                something_happened.await
485
            };
486
0
            let lock = &mut *lock;
487
0
488
0
            let connection = lock.connections.get_mut(&connection_id).unwrap();
489
0
490
0
            match &mut connection.inner {
491
                ConnectionInner::SingleStreamMsNoiseYamux { .. }
492
                | ConnectionInner::MultiStreamUnknownHandshake { .. } => {
493
0
                    unreachable!()
494
                }
495
                ConnectionInner::MultiStreamWebRtc {
496
0
                    local_tls_certificate_sha256,
497
0
                    ..
498
0
                } => smoldot_light::platform::MultiStreamWebRtcConnection {
499
0
                    connection: MultiStreamWrapper(connection_id),
500
0
                    local_tls_certificate_sha256: *local_tls_certificate_sha256,
501
0
                },
502
                ConnectionInner::Reset { .. } => {
503
                    // If the connection was already reset, we proceed anyway but provide a fake
504
                    // certificate hash. This has absolutely no consequence.
505
0
                    smoldot_light::platform::MultiStreamWebRtcConnection {
506
0
                        connection: MultiStreamWrapper(connection_id),
507
0
                        local_tls_certificate_sha256: [0; 32],
508
0
                    }
509
                }
510
            }
511
0
        })
512
0
    }
513
514
0
    fn next_substream<'a>(
515
0
        &self,
516
0
        MultiStreamWrapper(connection_id): &'a mut Self::MultiStream,
517
0
    ) -> Self::NextSubstreamFuture<'a> {
518
0
        let connection_id = *connection_id;
519
0
520
0
        Box::pin(async move {
521
0
            let (stream_id, direction) = loop {
522
0
                let something_happened = {
523
0
                    let mut lock = STATE.try_lock().unwrap();
524
0
                    let connection = lock.connections.get_mut(&connection_id).unwrap();
525
0
526
0
                    match &mut connection.inner {
527
0
                        ConnectionInner::Reset { .. } => return None,
528
                        ConnectionInner::MultiStreamWebRtc {
529
0
                            opened_substreams_to_pick_up,
530
0
                            connection_handles_alive,
531
                            ..
532
                        }
533
                        | ConnectionInner::MultiStreamUnknownHandshake {
534
0
                            opened_substreams_to_pick_up,
535
0
                            connection_handles_alive,
536
                            ..
537
                        } => {
538
0
                            if let Some((substream, direction)) =
539
0
                                opened_substreams_to_pick_up.pop_front()
540
                            {
541
0
                                *connection_handles_alive += 1;
542
0
                                break (substream, direction);
543
0
                            }
544
                        }
545
                        ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
546
0
                            unreachable!()
547
                        }
548
                    }
549
550
0
                    connection.something_happened.listen()
551
0
                };
552
0
553
0
                something_happened.await
554
            };
555
556
0
            Some((
557
0
                StreamWrapper {
558
0
                    connection_id,
559
0
                    stream_id: Some(stream_id),
560
0
                    read_buffer: Vec::new(),
561
0
                    inner_expected_incoming_bytes: Some(1),
562
0
                    is_reset: None,
563
0
                    writable_bytes: 0,
564
0
                    write_closable: false, // Note: this is currently hardcoded for WebRTC.
565
0
                    write_closed: false,
566
0
                    when_wake_up: None,
567
0
                },
568
0
                direction,
569
0
            ))
570
0
        })
571
0
    }
572
573
0
    fn open_out_substream(&self, MultiStreamWrapper(connection_id): &mut Self::MultiStream) {
574
0
        match STATE
575
0
            .try_lock()
576
0
            .unwrap()
577
0
            .connections
578
0
            .get(connection_id)
579
0
            .unwrap()
580
0
            .inner
581
        {
582
            ConnectionInner::MultiStreamWebRtc { .. }
583
            | ConnectionInner::MultiStreamUnknownHandshake { .. } => unsafe {
584
0
                bindings::connection_stream_open(*connection_id)
585
            },
586
0
            ConnectionInner::Reset { .. } => {}
587
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
588
0
                unreachable!()
589
            }
590
        }
591
0
    }
592
593
0
    fn wait_read_write_again<'a>(
594
0
        &self,
595
0
        stream: pin::Pin<&'a mut Self::Stream>,
596
0
    ) -> Self::StreamUpdateFuture<'a> {
597
0
        Box::pin(async move {
598
0
            let stream = stream.get_mut();
599
0
600
0
            if stream.is_reset.is_some() {
601
0
                future::pending::<()>().await;
602
0
            }
603
604
            loop {
605
0
                let listener = {
606
0
                    let mut lock = STATE.try_lock().unwrap();
607
0
                    let stream_inner = lock
608
0
                        .streams
609
0
                        .get_mut(&(stream.connection_id, stream.stream_id))
610
0
                        .unwrap();
611
612
0
                    if let Some(msg) = &stream_inner.reset {
613
0
                        stream.is_reset = Some(msg.clone());
614
0
                        return;
615
0
                    }
616
0
617
0
                    let mut shall_return = false;
618
0
619
0
                    // Move the buffers from `STATE` into `read_buffer`.
620
0
                    if !stream_inner.messages_queue.is_empty() {
621
0
                        stream
622
0
                            .read_buffer
623
0
                            .reserve(stream_inner.messages_queue_total_size);
624
625
0
                        while let Some(msg) = stream_inner.messages_queue.pop_front() {
626
0
                            stream_inner.messages_queue_total_size -= msg.len();
627
0
                            // TODO: could be optimized by reworking the bindings
628
0
                            stream.read_buffer.extend_from_slice(&msg);
629
0
                            if stream
630
0
                                .inner_expected_incoming_bytes
631
0
                                .map_or(false, |expected| expected <= stream.read_buffer.len())
632
                            {
633
0
                                shall_return = true;
634
0
                                break;
635
0
                            }
636
                        }
637
0
                    }
638
639
0
                    if stream_inner.writable_bytes_extra != 0 {
640
0
                        // As documented, the number of writable bytes must never become
641
0
                        // exceedingly large (a few megabytes). As such, this can't overflow
642
0
                        // unless there is a bug on the JavaScript side.
643
0
                        stream.writable_bytes += stream_inner.writable_bytes_extra;
644
0
                        stream_inner.writable_bytes_extra = 0;
645
0
                        shall_return = true;
646
0
                    }
647
648
0
                    if shall_return {
649
0
                        return;
650
0
                    }
651
0
652
0
                    stream_inner.something_happened.listen()
653
                };
654
655
0
                let timer_stop = async move {
656
0
                    listener.await;
657
0
                    false
658
0
                }
659
0
                .or(async {
660
0
                    if let Some(when_wake_up) = stream.when_wake_up.as_mut() {
661
0
                        when_wake_up.await;
662
0
                        stream.when_wake_up = None;
663
0
                        true
664
                    } else {
665
0
                        future::pending().await
666
                    }
667
0
                })
668
0
                .await;
669
670
0
                if timer_stop {
671
0
                    return;
672
0
                }
673
            }
674
0
        })
675
0
    }
676
677
0
    fn read_write_access<'a>(
678
0
        &self,
679
0
        stream: pin::Pin<&'a mut Self::Stream>,
680
0
    ) -> Result<Self::ReadWriteAccess<'a>, Self::StreamErrorRef<'a>> {
681
0
        let stream = stream.get_mut();
682
683
0
        if let Some(message) = &stream.is_reset {
684
0
            return Err(StreamError {
685
0
                message: message.clone(),
686
0
            });
687
0
        }
688
0
689
0
        Ok(ReadWriteAccess {
690
0
            read_write: read_write::ReadWrite {
691
0
                now: unsafe { Duration::from_micros(bindings::monotonic_clock_us()) },
692
0
                incoming_buffer: mem::take(&mut stream.read_buffer),
693
0
                expected_incoming_bytes: Some(0),
694
0
                read_bytes: 0,
695
0
                write_buffers: Vec::new(),
696
0
                write_bytes_queued: 0,
697
0
                write_bytes_queueable: if !stream.write_closed {
698
0
                    Some(stream.writable_bytes)
699
                } else {
700
0
                    None
701
                },
702
0
                wake_up_after: None,
703
0
            },
704
0
            stream,
705
        })
706
0
    }
707
}
708
709
pub(crate) struct ReadWriteAccess<'a> {
710
    read_write: read_write::ReadWrite<Duration>,
711
    stream: &'a mut StreamWrapper,
712
}
713
714
impl<'a> ops::Deref for ReadWriteAccess<'a> {
715
    type Target = read_write::ReadWrite<Duration>;
716
717
0
    fn deref(&self) -> &Self::Target {
718
0
        &self.read_write
719
0
    }
720
}
721
722
impl<'a> ops::DerefMut for ReadWriteAccess<'a> {
723
0
    fn deref_mut(&mut self) -> &mut Self::Target {
724
0
        &mut self.read_write
725
0
    }
726
}
727
728
impl<'a> Drop for ReadWriteAccess<'a> {
729
0
    fn drop(&mut self) {
730
0
        let mut lock = STATE.try_lock().unwrap();
731
0
732
0
        let stream_inner = lock
733
0
            .streams
734
0
            .get_mut(&(self.stream.connection_id, self.stream.stream_id))
735
0
            .unwrap();
736
0
737
0
        if (self.read_write.read_bytes != 0
738
0
            && self
739
0
                .read_write
740
0
                .expected_incoming_bytes
741
0
                .map_or(false, |expected| {
742
0
                    expected >= self.read_write.incoming_buffer.len()
743
0
                }))
744
0
            || (self.read_write.write_bytes_queued != 0
745
0
                && self.read_write.write_bytes_queueable.is_some())
746
0
        {
747
0
            self.read_write.wake_up_asap();
748
0
        }
749
750
0
        self.stream.when_wake_up = self
751
0
            .read_write
752
0
            .wake_up_after
753
0
            .map(Delay::new_at_monotonic_clock);
754
0
755
0
        self.stream.read_buffer = mem::take(&mut self.read_write.incoming_buffer);
756
0
757
0
        self.stream.inner_expected_incoming_bytes = self.read_write.expected_incoming_bytes;
758
0
759
0
        if !self.read_write.write_buffers.is_empty() && stream_inner.reset.is_none() {
760
0
            let mut io_vectors = Vec::with_capacity(self.read_write.write_buffers.len());
761
0
            let mut total_length = 0;
762
763
0
            for buffer in &self.read_write.write_buffers {
764
0
                io_vectors.push(bindings::StreamSendIoVector {
765
0
                    ptr: u32::try_from(buffer.as_ptr() as usize).unwrap(),
766
0
                    len: u32::try_from(buffer.len()).unwrap(),
767
0
                });
768
0
                total_length += buffer.len();
769
0
            }
770
771
0
            assert!(total_length <= self.stream.writable_bytes);
772
0
            self.stream.writable_bytes -= total_length;
773
0
774
0
            // `unwrap()` is ok as there's no way that `buffer.len()` doesn't fit in a `u64`.
775
0
            TOTAL_BYTES_SENT.fetch_add(u64::try_from(total_length).unwrap(), Ordering::Relaxed);
776
0
777
0
            unsafe {
778
0
                bindings::stream_send(
779
0
                    self.stream.connection_id,
780
0
                    self.stream.stream_id.unwrap_or(0),
781
0
                    u32::try_from(io_vectors.as_ptr() as usize).unwrap(),
782
0
                    u32::try_from(io_vectors.len()).unwrap(),
783
0
                );
784
0
            }
785
0
786
0
            self.read_write.write_buffers.clear();
787
0
        }
788
789
0
        if self.read_write.write_bytes_queueable.is_none() && !self.stream.write_closed {
790
0
            if stream_inner.reset.is_none() && self.stream.write_closable {
791
0
                unsafe {
792
0
                    bindings::stream_send_close(
793
0
                        self.stream.connection_id,
794
0
                        self.stream.stream_id.unwrap_or(0),
795
0
                    );
796
0
                }
797
0
            }
798
799
0
            self.stream.write_closed = true;
800
0
        }
801
0
    }
802
}
803
804
pub(crate) struct StreamWrapper {
805
    connection_id: u32,
806
    stream_id: Option<u32>,
807
    read_buffer: Vec<u8>,
808
    inner_expected_incoming_bytes: Option<usize>,
809
    /// `Some` if the remote has reset the stream and `update_stream` has since then been called.
810
    /// Contains the error message.
811
    is_reset: Option<String>,
812
    writable_bytes: usize,
813
    write_closable: bool,
814
    write_closed: bool,
815
    /// The stream should wake up after this delay.
816
    when_wake_up: Option<Delay>,
817
}
818
819
impl Drop for StreamWrapper {
820
0
    fn drop(&mut self) {
821
0
        let mut lock = STATE.try_lock().unwrap();
822
0
        let lock = &mut *lock;
823
0
824
0
        let connection = lock.connections.get_mut(&self.connection_id).unwrap();
825
0
        let removed_stream = lock
826
0
            .streams
827
0
            .remove(&(self.connection_id, self.stream_id))
828
0
            .unwrap();
829
830
0
        let remove_connection = match &mut connection.inner {
831
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
832
0
                if removed_stream.reset.is_none() {
833
0
                    unsafe {
834
0
                        bindings::reset_connection(self.connection_id);
835
0
                    }
836
0
                }
837
838
0
                debug_assert!(self.stream_id.is_none());
839
0
                true
840
            }
841
            ConnectionInner::MultiStreamWebRtc {
842
0
                connection_handles_alive,
843
                ..
844
            }
845
            | ConnectionInner::MultiStreamUnknownHandshake {
846
0
                connection_handles_alive,
847
                ..
848
            } => {
849
0
                if removed_stream.reset.is_none() {
850
                    unsafe {
851
0
                        bindings::connection_stream_reset(
852
0
                            self.connection_id,
853
0
                            self.stream_id.unwrap(),
854
0
                        )
855
                    }
856
0
                }
857
0
                *connection_handles_alive -= 1;
858
0
                let remove_connection = *connection_handles_alive == 0;
859
0
                if remove_connection {
860
0
                    unsafe {
861
0
                        bindings::reset_connection(self.connection_id);
862
0
                    }
863
0
                }
864
0
                remove_connection
865
            }
866
            ConnectionInner::Reset {
867
0
                connection_handles_alive,
868
0
                ..
869
0
            } => {
870
0
                *connection_handles_alive -= 1;
871
0
                *connection_handles_alive == 0
872
            }
873
        };
874
875
0
        if remove_connection {
876
0
            lock.connections.remove(&self.connection_id).unwrap();
877
0
        }
878
0
    }
879
}
880
881
pub(crate) struct MultiStreamWrapper(u32);
882
883
impl Drop for MultiStreamWrapper {
884
0
    fn drop(&mut self) {
885
0
        let mut lock = STATE.try_lock().unwrap();
886
0
887
0
        let connection = lock.connections.get_mut(&self.0).unwrap();
888
0
        let (remove_connection, reset_connection) = match &mut connection.inner {
889
            ConnectionInner::SingleStreamMsNoiseYamux { .. } => {
890
0
                unreachable!()
891
            }
892
            ConnectionInner::MultiStreamWebRtc {
893
0
                connection_handles_alive,
894
                ..
895
            }
896
            | ConnectionInner::MultiStreamUnknownHandshake {
897
0
                connection_handles_alive,
898
                ..
899
            } => {
900
0
                *connection_handles_alive -= 1;
901
0
                let v = *connection_handles_alive == 0;
902
0
                (v, v)
903
            }
904
0
            ConnectionInner::Reset { .. } => (true, false),
905
        };
906
907
0
        if remove_connection {
908
0
            lock.connections.remove(&self.0).unwrap();
909
0
        }
910
0
        if reset_connection {
911
0
            unsafe {
912
0
                bindings::reset_connection(self.0);
913
0
            }
914
0
        }
915
0
    }
916
}
917
918
0
#[derive(Debug, derive_more::Display, Clone)]
919
#[display(fmt = "{message}")]
920
pub(crate) struct StreamError {
921
    message: String,
922
}
923
924
static STATE: Mutex<NetworkState> = Mutex::new(NetworkState {
925
    next_connection_id: 0,
926
    connections: hashbrown::HashMap::with_hasher(FnvBuildHasher),
927
    streams: BTreeMap::new(),
928
});
929
930
// TODO: we use a custom `FnvBuildHasher` because it's not possible to create `fnv::FnvBuildHasher` in a `const` context
931
struct FnvBuildHasher;
932
impl core::hash::BuildHasher for FnvBuildHasher {
933
    type Hasher = fnv::FnvHasher;
934
0
    fn build_hasher(&self) -> fnv::FnvHasher {
935
0
        fnv::FnvHasher::default()
936
0
    }
937
}
938
939
/// All the connections and streams that are alive.
940
///
941
/// Single-stream connections have one entry in `connections` and one entry in `streams` (with
942
/// a `stream_id` always equal to `None`).
943
/// Multi-stream connections have one entry in `connections` and zero or more entries in `streams`.
944
struct NetworkState {
945
    next_connection_id: u32,
946
    connections: hashbrown::HashMap<u32, Connection, FnvBuildHasher>,
947
    streams: BTreeMap<(u32, Option<u32>), Stream>,
948
}
949
950
struct Connection {
951
    /// Type of connection and extra fields that depend on the type.
952
    inner: ConnectionInner,
953
    /// Event notified whenever one of the fields above is modified.
954
    something_happened: event_listener::Event,
955
}
956
957
enum ConnectionInner {
958
    SingleStreamMsNoiseYamux,
959
    MultiStreamUnknownHandshake {
960
        /// List of substreams that the host (i.e. JavaScript side) has reported have been opened,
961
        /// but that haven't been reported through
962
        /// [`smoldot_light::platform::PlatformRef::next_substream`] yet.
963
        opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>,
964
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
965
        /// this connection. If it switches from 1 to 0, the connection must be removed.
966
        connection_handles_alive: u32,
967
    },
968
    MultiStreamWebRtc {
969
        /// List of substreams that the host (i.e. JavaScript side) has reported have been opened,
970
        /// but that haven't been reported through
971
        /// [`smoldot_light::platform::PlatformRef::next_substream`] yet.
972
        opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>,
973
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
974
        /// this connection. If it switches from 1 to 0, the connection must be removed.
975
        connection_handles_alive: u32,
976
        /// SHA256 hash of the TLS certificate used by the local node at the DTLS layer.
977
        local_tls_certificate_sha256: [u8; 32],
978
    },
979
    /// [`bindings::connection_reset`] has been called
980
    Reset {
981
        /// Message given by the bindings to justify the closure.
982
        // TODO: why is this unused? shouldn't it be not unused?
983
        _message: String,
984
        /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference
985
        /// this connection. If it switches from 1 to 0, the connection must be removed.
986
        connection_handles_alive: u32,
987
    },
988
}
989
990
struct Stream {
991
    /// `Some` if [`bindings::stream_reset`] has been called. Contains the error message.
992
    reset: Option<String>,
993
    /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] that
994
    /// haven't been processed yet in a call to `update_stream`.
995
    writable_bytes_extra: usize,
996
    /// List of messages received through [`bindings::stream_message`]. Must never contain
997
    /// empty messages.
998
    messages_queue: VecDeque<Box<[u8]>>,
999
    /// Total size of all the messages stored in [`Stream::messages_queue`].
1000
    messages_queue_total_size: usize,
1001
    /// Event notified whenever one of the fields above is modified, such as a new message being
1002
    /// queued.
1003
    something_happened: event_listener::Event,
1004
}
1005
1006
0
pub(crate) fn connection_multi_stream_set_handshake_info(
1007
0
    connection_id: u32,
1008
0
    handshake_ty: Vec<u8>,
1009
0
) {
1010
0
    let (_, local_tls_certificate_sha256) = nom::sequence::preceded(
1011
0
        nom::bytes::streaming::tag::<_, _, nom::error::Error<&[u8]>>(&[0]),
1012
0
        nom::combinator::map(nom::bytes::streaming::take(32u32), |b| {
1013
0
            <&[u8; 32]>::try_from(b).unwrap()
1014
0
        }),
1015
0
    )(&handshake_ty[..])
1016
0
    .expect("invalid handshake type provided to connection_multi_stream_set_handshake_info");
1017
0
1018
0
    let mut lock = STATE.try_lock().unwrap();
1019
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1020
1021
0
    let (opened_substreams_to_pick_up, connection_handles_alive) = match &mut connection.inner {
1022
        ConnectionInner::MultiStreamUnknownHandshake {
1023
0
            opened_substreams_to_pick_up,
1024
0
            connection_handles_alive,
1025
0
        } => (
1026
0
            mem::take(opened_substreams_to_pick_up),
1027
0
            *connection_handles_alive,
1028
0
        ),
1029
0
        _ => unreachable!(),
1030
    };
1031
1032
0
    connection.inner = ConnectionInner::MultiStreamWebRtc {
1033
0
        opened_substreams_to_pick_up,
1034
0
        connection_handles_alive,
1035
0
        local_tls_certificate_sha256: *local_tls_certificate_sha256,
1036
0
    };
1037
0
    connection.something_happened.notify(usize::MAX);
1038
0
}
1039
1040
0
pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u32) {
1041
0
    let mut lock = STATE.try_lock().unwrap();
1042
0
1043
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1044
1045
    // For single stream connections, the docs of this function mentions that `stream_id` can be
1046
    // any value.
1047
0
    let actual_stream_id = match connection.inner {
1048
        ConnectionInner::MultiStreamWebRtc { .. }
1049
0
        | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id),
1050
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => None,
1051
0
        ConnectionInner::Reset { .. } => unreachable!(),
1052
    };
1053
1054
0
    let stream = lock
1055
0
        .streams
1056
0
        .get_mut(&(connection_id, actual_stream_id))
1057
0
        .unwrap();
1058
0
    debug_assert!(stream.reset.is_none());
1059
1060
    // As documented, the number of writable bytes must never become exceedingly large (a few
1061
    // megabytes). As such, this can't overflow unless there is a bug on the JavaScript side.
1062
0
    stream.writable_bytes_extra += usize::try_from(bytes).unwrap();
1063
0
    stream.something_happened.notify(usize::MAX);
1064
0
}
1065
1066
0
pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Vec<u8>) {
1067
0
    let mut lock = STATE.try_lock().unwrap();
1068
0
1069
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1070
1071
    // For single stream connections, the docs of this function mentions that `stream_id` can be
1072
    // any value.
1073
0
    let actual_stream_id = match connection.inner {
1074
        ConnectionInner::MultiStreamWebRtc { .. }
1075
0
        | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id),
1076
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => None,
1077
0
        ConnectionInner::Reset { .. } => unreachable!(),
1078
    };
1079
1080
0
    let stream = lock
1081
0
        .streams
1082
0
        .get_mut(&(connection_id, actual_stream_id))
1083
0
        .unwrap();
1084
0
    debug_assert!(stream.reset.is_none());
1085
1086
0
    TOTAL_BYTES_RECEIVED.fetch_add(u64::try_from(message.len()).unwrap(), Ordering::Relaxed);
1087
0
1088
0
    // Ignore empty message to avoid all sorts of problems.
1089
0
    if message.is_empty() {
1090
0
        return;
1091
0
    }
1092
0
1093
0
    // There is unfortunately no way to instruct the browser to back-pressure connections to
1094
0
    // remotes.
1095
0
    //
1096
0
    // In order to avoid DoS attacks, we refuse to buffer more than a certain amount of data per
1097
0
    // connection. This limit is completely arbitrary, and this is in no way a robust solution
1098
0
    // because this limit isn't in sync with any other part of the code. In other words, it could
1099
0
    // be legitimate for the remote to buffer a large amount of data.
1100
0
    //
1101
0
    // This corner case is handled by discarding the messages that would go over the limit. While
1102
0
    // this is not a great solution, going over that limit can be considered as a fault from the
1103
0
    // remote, the same way as it would be a fault from the remote to forget to send some bytes,
1104
0
    // and thus should be handled in a similar way by the higher level code.
1105
0
    //
1106
0
    // A better way to handle this would be to kill the connection abruptly. However, this would
1107
0
    // add a lot of complex code in this module, and the effort is clearly not worth it for this
1108
0
    // niche situation.
1109
0
    //
1110
0
    // While this problem is specific to browsers (Deno and NodeJS have ways to back-pressure
1111
0
    // connections), we add this hack for all platforms, for consistency. If this limit is ever
1112
0
    // reached, we want to be sure to detect it, even when testing on NodeJS or Deno.
1113
0
    //
1114
0
    // See <https://github.com/smol-dot/smoldot/issues/109>.
1115
0
    // TODO: do this properly eventually ^
1116
0
    if stream.messages_queue_total_size >= 25 * 1024 * 1024 {
1117
0
        return;
1118
0
    }
1119
0
1120
0
    stream.messages_queue_total_size += message.len();
1121
0
    stream.messages_queue.push_back(message.into_boxed_slice());
1122
0
    stream.something_happened.notify(usize::MAX);
1123
0
}
1124
1125
0
pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) {
1126
0
    let mut lock = STATE.try_lock().unwrap();
1127
0
    let lock = &mut *lock;
1128
0
1129
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1130
    if let ConnectionInner::MultiStreamWebRtc {
1131
0
        opened_substreams_to_pick_up,
1132
        ..
1133
0
    } = &mut connection.inner
1134
    {
1135
0
        let _prev_value = lock.streams.insert(
1136
0
            (connection_id, Some(stream_id)),
1137
0
            Stream {
1138
0
                reset: None,
1139
0
                messages_queue: VecDeque::with_capacity(8),
1140
0
                messages_queue_total_size: 0,
1141
0
                something_happened: event_listener::Event::new(),
1142
0
                writable_bytes_extra: 0,
1143
0
            },
1144
0
        );
1145
0
1146
0
        if _prev_value.is_some() {
1147
0
            panic!("same stream_id used multiple times in connection_stream_opened")
1148
0
        }
1149
0
1150
0
        opened_substreams_to_pick_up.push_back((
1151
0
            stream_id,
1152
0
            if outbound != 0 {
1153
0
                SubstreamDirection::Outbound
1154
            } else {
1155
0
                SubstreamDirection::Inbound
1156
            },
1157
        ));
1158
1159
0
        connection.something_happened.notify(usize::MAX);
1160
    } else {
1161
0
        panic!()
1162
    }
1163
0
}
1164
1165
0
pub(crate) fn connection_reset(connection_id: u32, message: Vec<u8>) {
1166
0
    let message = str::from_utf8(&message)
1167
0
        .unwrap_or_else(|_| panic!("non-UTF-8 message"))
1168
0
        .to_owned();
1169
0
1170
0
    let mut lock = STATE.try_lock().unwrap();
1171
0
    let connection = lock.connections.get_mut(&connection_id).unwrap();
1172
1173
0
    let connection_handles_alive = match &connection.inner {
1174
0
        ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something
1175
        ConnectionInner::MultiStreamWebRtc {
1176
0
            connection_handles_alive,
1177
            ..
1178
        }
1179
        | ConnectionInner::MultiStreamUnknownHandshake {
1180
0
            connection_handles_alive,
1181
            ..
1182
0
        } => *connection_handles_alive,
1183
0
        ConnectionInner::Reset { .. } => unreachable!(),
1184
    };
1185
1186
0
    connection.inner = ConnectionInner::Reset {
1187
0
        connection_handles_alive,
1188
0
        _message: message.clone(),
1189
0
    };
1190
0
1191
0
    connection.something_happened.notify(usize::MAX);
1192
1193
0
    for ((_, _), stream) in lock
1194
0
        .streams
1195
0
        .range_mut((connection_id, Some(u32::MIN))..=(connection_id, Some(u32::MAX)))
1196
0
    {
1197
0
        stream.reset = Some(message.clone());
1198
0
        stream.something_happened.notify(usize::MAX);
1199
0
    }
1200
0
    if let Some(stream) = lock.streams.get_mut(&(connection_id, None)) {
1201
0
        stream.reset = Some(message);
1202
0
        stream.something_happened.notify(usize::MAX);
1203
0
    }
1204
0
}
1205
1206
0
pub(crate) fn stream_reset(connection_id: u32, stream_id: u32, message: Vec<u8>) {
1207
0
    let message: String = str::from_utf8(&message)
1208
0
        .unwrap_or_else(|_| panic!("non-UTF-8 message"))
1209
0
        .to_owned();
1210
0
1211
0
    // Note that, as documented, it is illegal to call this function on single-stream substreams.
1212
0
    // We can thus assume that the `stream_id` is valid.
1213
0
    let mut lock = STATE.try_lock().unwrap();
1214
0
    let stream = lock
1215
0
        .streams
1216
0
        .get_mut(&(connection_id, Some(stream_id)))
1217
0
        .unwrap();
1218
0
    stream.reset = Some(message);
1219
0
    stream.something_happened.notify(usize::MAX);
1220
0
}