/__w/smoldot/smoldot/repo/wasm-node/rust/src/platform.rs
Line | Count | Source |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{bindings, timers::Delay}; |
19 | | |
20 | | use futures_lite::future::FutureExt as _; |
21 | | |
22 | | use smoldot_light::platform::{SubstreamDirection, read_write}; |
23 | | |
24 | | use alloc::{ |
25 | | borrow::{Cow, ToOwned as _}, |
26 | | boxed::Box, |
27 | | collections::{BTreeMap, VecDeque}, |
28 | | format, |
29 | | string::{String, ToString as _}, |
30 | | vec::Vec, |
31 | | }; |
32 | | use async_lock::Mutex; |
33 | | use core::{ |
34 | | fmt::{self, Write as _}, |
35 | | future, iter, mem, |
36 | | net::IpAddr, |
37 | | ops, pin, str, |
38 | | sync::atomic::{AtomicU32, AtomicU64, Ordering}, |
39 | | task, |
40 | | time::Duration, |
41 | | }; |
42 | | |
43 | | /// Total number of bytes that all the connections created through [`PlatformRef`] combined have |
44 | | /// received. |
45 | | pub static TOTAL_BYTES_RECEIVED: AtomicU64 = AtomicU64::new(0); |
46 | | /// Total number of bytes that all the connections created through [`PlatformRef`] combined have |
47 | | /// sent. |
48 | | pub static TOTAL_BYTES_SENT: AtomicU64 = AtomicU64::new(0); |
49 | | /// Total number of microseconds that all the tasks have spent executing. A `u64` will overflow |
50 | | /// after 584 542 years. |
51 | | pub static TOTAL_CPU_USAGE_US: AtomicU64 = AtomicU64::new(0); |
52 | | |
53 | | pub(crate) const PLATFORM_REF: PlatformRef = PlatformRef {}; |
54 | | |
55 | | /// Log level above which log entries aren't emitted. |
56 | | pub static MAX_LOG_LEVEL: AtomicU32 = AtomicU32::new(0); |
57 | | |
58 | | #[derive(Debug, Copy, Clone)] |
59 | | pub(crate) struct PlatformRef {} |
60 | | |
61 | | // TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code |
62 | | impl smoldot_light::platform::PlatformRef for PlatformRef { |
63 | | type Delay = Delay; |
64 | | type Instant = Duration; |
65 | | type MultiStream = MultiStreamWrapper; // Entry in the ̀`CONNECTIONS` map. |
66 | | type Stream = StreamWrapper; // Entry in the ̀`STREAMS` map and a read buffer. |
67 | | type StreamConnectFuture = future::Ready<Self::Stream>; |
68 | | type ReadWriteAccess<'a> = ReadWriteAccess<'a>; |
69 | | type StreamErrorRef<'a> = StreamError; |
70 | | type MultiStreamConnectFuture = pin::Pin< |
71 | | Box< |
72 | | dyn Future< |
73 | | Output = smoldot_light::platform::MultiStreamWebRtcConnection< |
74 | | Self::MultiStream, |
75 | | >, |
76 | | > + Send, |
77 | | >, |
78 | | >; |
79 | | type StreamUpdateFuture<'a> = pin::Pin<Box<dyn Future<Output = ()> + Send + 'a>>; |
80 | | type NextSubstreamFuture<'a> = pin::Pin< |
81 | | Box< |
82 | | dyn Future<Output = Option<(Self::Stream, smoldot_light::platform::SubstreamDirection)>> |
83 | | + Send |
84 | | + 'a, |
85 | | >, |
86 | | >; |
87 | | |
88 | 0 | fn now_from_unix_epoch(&self) -> Duration { |
89 | 0 | let microseconds = bindings::unix_timestamp_us(); |
90 | 0 | Duration::from_micros(microseconds) |
91 | 0 | } |
92 | | |
93 | 0 | fn now(&self) -> Self::Instant { |
94 | 0 | let microseconds = bindings::monotonic_clock_us(); |
95 | 0 | Duration::from_micros(microseconds) |
96 | 0 | } |
97 | | |
98 | 0 | fn fill_random_bytes(&self, buffer: &mut [u8]) { |
99 | | unsafe { |
100 | 0 | bindings::random_get( |
101 | 0 | u32::try_from(buffer.as_mut_ptr() as usize).unwrap(), |
102 | 0 | u32::try_from(buffer.len()).unwrap(), |
103 | | ) |
104 | | } |
105 | 0 | } |
106 | | |
107 | 0 | fn sleep(&self, duration: Duration) -> Self::Delay { |
108 | 0 | Delay::new(duration) |
109 | 0 | } |
110 | | |
111 | 0 | fn sleep_until(&self, when: Self::Instant) -> Self::Delay { |
112 | 0 | Delay::new_at_monotonic_clock(when) |
113 | 0 | } |
114 | | |
115 | 0 | fn spawn_task(&self, task_name: Cow<str>, task: impl Future<Output = ()> + Send + 'static) { |
116 | | // The code below processes tasks that have names. |
117 | | #[pin_project::pin_project] |
118 | | struct FutureAdapter<F> { |
119 | | name: String, |
120 | | #[pin] |
121 | | future: F, |
122 | | } |
123 | | |
124 | | impl<F: Future> Future for FutureAdapter<F> { |
125 | | type Output = F::Output; |
126 | 0 | fn poll(self: pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> { |
127 | 0 | let this = self.project(); |
128 | 0 | bindings::current_task_entered( |
129 | 0 | u32::try_from(this.name.as_bytes().as_ptr() as usize).unwrap(), |
130 | 0 | u32::try_from(this.name.as_bytes().len()).unwrap(), |
131 | | ); |
132 | | |
133 | 0 | let before_polling = bindings::monotonic_clock_us(); |
134 | 0 | let out = this.future.poll(cx); |
135 | 0 | let poll_duration = |
136 | 0 | Duration::from_micros(bindings::monotonic_clock_us() - before_polling); |
137 | 0 | TOTAL_CPU_USAGE_US.fetch_add( |
138 | 0 | u64::try_from(poll_duration.as_micros()).unwrap_or(u64::MAX), |
139 | 0 | Ordering::Relaxed, |
140 | | ); |
141 | | |
142 | 0 | bindings::current_task_exit(); |
143 | | |
144 | | // Print a warning if polling the task takes a long time. |
145 | | // It has been noticed that sometimes in Firefox polling a task takes a 16ms + a |
146 | | // small amount. This most likely indicates that Firefox does something like |
147 | | // freezing the JS/Wasm execution before resuming it at the next frame, thus adding |
148 | | // 16ms to the execution time. |
149 | | // For this reason, the threshold above which a task takes too long must be |
150 | | // above 16ms no matter what. |
151 | 0 | if poll_duration.as_millis() >= 20 { |
152 | 0 | smoldot_light::platform::PlatformRef::log( |
153 | 0 | &PLATFORM_REF, |
154 | 0 | smoldot_light::platform::LogLevel::Debug, |
155 | 0 | "smoldot", |
156 | 0 | "task-too-long-time", |
157 | 0 | [ |
158 | 0 | ("name", this.name as &dyn fmt::Display), |
159 | 0 | ( |
160 | 0 | "poll_duration_ms", |
161 | 0 | &poll_duration.as_millis() as &dyn fmt::Display, |
162 | 0 | ), |
163 | 0 | ] |
164 | 0 | .into_iter(), |
165 | 0 | ); |
166 | 0 | } |
167 | 0 | if poll_duration.as_millis() >= 150 { |
168 | 0 | smoldot_light::platform::PlatformRef::log( |
169 | 0 | &PLATFORM_REF, |
170 | 0 | smoldot_light::platform::LogLevel::Warn, |
171 | 0 | "smoldot", |
172 | 0 | &format!( |
173 | 0 | "The task named `{}` has occupied the CPU for an \ |
174 | 0 | unreasonable amount of time ({}ms).", |
175 | 0 | this.name, |
176 | 0 | poll_duration.as_millis(), |
177 | 0 | ), |
178 | 0 | iter::empty(), |
179 | 0 | ); |
180 | 0 | } |
181 | | |
182 | 0 | out |
183 | 0 | } Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvMs2_B1a_INtB1a_6ClientBO_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB1a_7ChainIdEEsa_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtB1a_15runtime_service14run_backgroundBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_15network_service5tasks29single_stream_connection_taskBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_15network_service5tasks35webrtc_multi_stream_connection_taskBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB1a_16json_rpc_service10background3runBO_E0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB1a_15runtime_serviceINtB2D_14RuntimeServiceBO_E31send_message_or_restart_service00ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB1a_20transactions_serviceINtB2D_19TransactionsServiceBO_E18send_to_background00ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvB7_9add_chains0_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_12sync_serviceINtB2B_11SyncServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_15network_serviceINtB2B_14NetworkServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_15network_serviceINtB2B_14NetworkServiceBO_E9add_chains_0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB1a_20transactions_serviceINtB2B_19TransactionsServiceBO_E3new0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvNtB7_4init4init0ENtNtNtCs1p5UDGgVI4d_4core6future6future6Future4pollB7_ |
184 | | } |
185 | | |
186 | 0 | let task = FutureAdapter { |
187 | 0 | name: task_name.into_owned(), |
188 | 0 | future: task, |
189 | 0 | }; |
190 | | |
191 | 0 | let (runnable, task) = async_task::spawn(task, |runnable| { |
192 | 0 | super::TASKS_QUEUE.push(runnable); |
193 | 0 | bindings::advance_execution_ready(); |
194 | 0 | }); Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B1a_INtB1a_6ClientBO_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB1a_7ChainIdEEsa_0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB1a_15runtime_service14run_backgroundBO_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_15network_service5tasks29single_stream_connection_taskBO_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_15network_service5tasks35webrtc_multi_stream_connection_taskBO_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB1a_16json_rpc_service10background3runBO_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB1a_15runtime_serviceINtB2i_14RuntimeServiceBO_E31send_message_or_restart_service00E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB1a_20transactions_serviceINtB2i_19TransactionsServiceBO_E18send_to_background00E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB7_9add_chains0_0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_12sync_serviceINtB2g_11SyncServiceBO_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_15network_serviceINtB2g_14NetworkServiceBO_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_15network_serviceINtB2g_14NetworkServiceBO_E9add_chains_0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB1a_20transactions_serviceINtB2g_19TransactionsServiceBO_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB7_4init4init0E0B7_ |
195 | | |
196 | 0 | task.detach(); |
197 | 0 | runnable.schedule(); |
198 | 0 | } Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B18_INtB18_6ClientBM_E9add_chainINtNtNtCsaFPxhswmqCN_5alloc3vec9into_iter8IntoIterNtB18_7ChainIdEEsa_0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB18_15runtime_service14run_backgroundBM_E0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_15network_service5tasks29single_stream_connection_taskBM_E0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_15network_service5tasks35webrtc_multi_stream_connection_taskBM_E0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB18_16json_rpc_service10background3runBM_E0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB18_15runtime_serviceINtB2g_14RuntimeServiceBM_E31send_message_or_restart_service00EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB18_20transactions_serviceINtB2g_19TransactionsServiceBM_E18send_to_background00EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB5_9add_chains0_0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_12sync_serviceINtB2e_11SyncServiceBM_E3new0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_15network_serviceINtB2e_14NetworkServiceBM_E3new0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_15network_serviceINtB2e_14NetworkServiceBM_E9add_chains_0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB18_20transactions_serviceINtB2e_19TransactionsServiceBM_E3new0EB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB5_4init4init0EB5_ |
199 | | |
200 | 0 | fn log<'a>( |
201 | 0 | &self, |
202 | 0 | log_level: smoldot_light::platform::LogLevel, |
203 | 0 | log_target: &'a str, |
204 | 0 | message: &'a str, |
205 | 0 | key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>, |
206 | 0 | ) { |
207 | 0 | let log_level = match log_level { |
208 | 0 | smoldot_light::platform::LogLevel::Error => 1, |
209 | 0 | smoldot_light::platform::LogLevel::Warn => 2, |
210 | 0 | smoldot_light::platform::LogLevel::Info => 3, |
211 | 0 | smoldot_light::platform::LogLevel::Debug => 4, |
212 | 0 | smoldot_light::platform::LogLevel::Trace => 5, |
213 | | }; |
214 | | |
215 | 0 | if log_level > MAX_LOG_LEVEL.load(Ordering::Relaxed) { |
216 | 0 | return; |
217 | 0 | } |
218 | | |
219 | 0 | let mut key_values = key_values.peekable(); |
220 | | |
221 | 0 | if key_values.peek().is_none() { |
222 | 0 | bindings::log( |
223 | 0 | log_level, |
224 | 0 | u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(), |
225 | 0 | u32::try_from(log_target.as_bytes().len()).unwrap(), |
226 | 0 | u32::try_from(message.as_bytes().as_ptr() as usize).unwrap(), |
227 | 0 | u32::try_from(message.as_bytes().len()).unwrap(), |
228 | | ) |
229 | | } else { |
230 | 0 | let mut message_build = String::with_capacity(128); |
231 | 0 | message_build.push_str(message); |
232 | 0 | let mut first = true; |
233 | 0 | for (key, value) in key_values { |
234 | 0 | if first { |
235 | 0 | let _ = write!(message_build, "; "); |
236 | 0 | first = false; |
237 | 0 | } else { |
238 | 0 | let _ = write!(message_build, ", "); |
239 | 0 | } |
240 | 0 | let _ = write!(message_build, "{}={}", key, value); |
241 | | } |
242 | | |
243 | 0 | bindings::log( |
244 | 0 | log_level, |
245 | 0 | u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(), |
246 | 0 | u32::try_from(log_target.as_bytes().len()).unwrap(), |
247 | 0 | u32::try_from(message_build.as_bytes().as_ptr() as usize).unwrap(), |
248 | 0 | u32::try_from(message_build.as_bytes().len()).unwrap(), |
249 | | ) |
250 | | } |
251 | 0 | } Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtCs1p5UDGgVI4d_4core5array4iter8IntoIterTReRDNtNtB28_3fmt7DisplayEL_EKj2_EEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter7sources5empty5EmptyTReRDNtNtB2a_3fmt7DisplayEL_EEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_IB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEIB22_B2T_INtNtB2Y_5empty5EmptyB3m_EEEEB5_ Unexecuted instantiation: _RINvXNtCs7snhGEhbuap_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCs508CmrSPkZh_13smoldot_light8platform11PlatformRef3logINtNtNtNtCs1p5UDGgVI4d_4core4iter8adapters5chain5ChainINtNtNtB28_7sources4once4OnceTReRDNtNtB2a_3fmt7DisplayEL_EEINtNtB2Y_5empty5EmptyB3m_EEEB5_ |
252 | | |
253 | 0 | fn client_name(&self) -> Cow<str> { |
254 | 0 | env!("CARGO_PKG_NAME").into() |
255 | 0 | } |
256 | | |
257 | 0 | fn client_version(&self) -> Cow<str> { |
258 | 0 | env!("CARGO_PKG_VERSION").into() |
259 | 0 | } |
260 | | |
261 | 0 | fn supports_connection_type( |
262 | 0 | &self, |
263 | 0 | connection_type: smoldot_light::platform::ConnectionType, |
264 | 0 | ) -> bool { |
265 | 0 | let ty = match connection_type { |
266 | 0 | smoldot_light::platform::ConnectionType::TcpIpv4 => 0, |
267 | 0 | smoldot_light::platform::ConnectionType::TcpIpv6 => 1, |
268 | 0 | smoldot_light::platform::ConnectionType::TcpDns => 2, |
269 | | smoldot_light::platform::ConnectionType::WebSocketIpv4 { |
270 | | remote_is_localhost: true, |
271 | | .. |
272 | | } |
273 | | | smoldot_light::platform::ConnectionType::WebSocketIpv6 { |
274 | | remote_is_localhost: true, |
275 | | .. |
276 | | } |
277 | | | smoldot_light::platform::ConnectionType::WebSocketDns { |
278 | | secure: false, |
279 | | remote_is_localhost: true, |
280 | 0 | } => 7, |
281 | 0 | smoldot_light::platform::ConnectionType::WebSocketIpv4 { .. } => 4, |
282 | 0 | smoldot_light::platform::ConnectionType::WebSocketIpv6 { .. } => 5, |
283 | 0 | smoldot_light::platform::ConnectionType::WebSocketDns { secure: false, .. } => 6, |
284 | 0 | smoldot_light::platform::ConnectionType::WebSocketDns { secure: true, .. } => 14, |
285 | 0 | smoldot_light::platform::ConnectionType::WebRtcIpv4 => 16, |
286 | 0 | smoldot_light::platform::ConnectionType::WebRtcIpv6 => 17, |
287 | | }; |
288 | | |
289 | 0 | bindings::connection_type_supported(ty) != 0 |
290 | 0 | } |
291 | | |
292 | 0 | fn connect_stream( |
293 | 0 | &self, |
294 | 0 | address: smoldot_light::platform::Address, |
295 | 0 | ) -> Self::StreamConnectFuture { |
296 | 0 | let mut lock = STATE.try_lock().unwrap(); |
297 | | |
298 | 0 | let connection_id = lock.next_connection_id; |
299 | 0 | lock.next_connection_id += 1; |
300 | | |
301 | 0 | let encoded_address: Vec<u8> = match address { |
302 | | smoldot_light::platform::Address::TcpIp { |
303 | 0 | ip: IpAddr::V4(ip), |
304 | 0 | port, |
305 | 0 | } => iter::once(0u8) |
306 | 0 | .chain(port.to_be_bytes()) |
307 | 0 | .chain(ip.to_string().bytes()) |
308 | 0 | .collect(), |
309 | | smoldot_light::platform::Address::TcpIp { |
310 | 0 | ip: IpAddr::V6(ip), |
311 | 0 | port, |
312 | 0 | } => iter::once(1u8) |
313 | 0 | .chain(port.to_be_bytes()) |
314 | 0 | .chain(ip.to_string().bytes()) |
315 | 0 | .collect(), |
316 | 0 | smoldot_light::platform::Address::TcpDns { hostname, port } => iter::once(2u8) |
317 | 0 | .chain(port.to_be_bytes()) |
318 | 0 | .chain(hostname.as_bytes().iter().copied()) |
319 | 0 | .collect(), |
320 | | smoldot_light::platform::Address::WebSocketIp { |
321 | 0 | ip: IpAddr::V4(ip), |
322 | 0 | port, |
323 | 0 | } => iter::once(4u8) |
324 | 0 | .chain(port.to_be_bytes()) |
325 | 0 | .chain(ip.to_string().bytes()) |
326 | 0 | .collect(), |
327 | | smoldot_light::platform::Address::WebSocketIp { |
328 | 0 | ip: IpAddr::V6(ip), |
329 | 0 | port, |
330 | 0 | } => iter::once(5u8) |
331 | 0 | .chain(port.to_be_bytes()) |
332 | 0 | .chain(ip.to_string().bytes()) |
333 | 0 | .collect(), |
334 | | smoldot_light::platform::Address::WebSocketDns { |
335 | 0 | hostname, |
336 | 0 | port, |
337 | | secure: false, |
338 | 0 | } => iter::once(6u8) |
339 | 0 | .chain(port.to_be_bytes()) |
340 | 0 | .chain(hostname.as_bytes().iter().copied()) |
341 | 0 | .collect(), |
342 | | smoldot_light::platform::Address::WebSocketDns { |
343 | 0 | hostname, |
344 | 0 | port, |
345 | | secure: true, |
346 | 0 | } => iter::once(14u8) |
347 | 0 | .chain(port.to_be_bytes()) |
348 | 0 | .chain(hostname.as_bytes().iter().copied()) |
349 | 0 | .collect(), |
350 | | }; |
351 | | |
352 | 0 | let write_closable = match address { |
353 | | smoldot_light::platform::Address::TcpIp { .. } |
354 | 0 | | smoldot_light::platform::Address::TcpDns { .. } => true, |
355 | | smoldot_light::platform::Address::WebSocketIp { .. } |
356 | 0 | | smoldot_light::platform::Address::WebSocketDns { .. } => false, |
357 | | }; |
358 | | |
359 | 0 | bindings::connection_new( |
360 | 0 | connection_id, |
361 | 0 | u32::try_from(encoded_address.as_ptr() as usize).unwrap(), |
362 | 0 | u32::try_from(encoded_address.len()).unwrap(), |
363 | | ); |
364 | | |
365 | 0 | let _prev_value = lock.connections.insert( |
366 | 0 | connection_id, |
367 | 0 | Connection { |
368 | 0 | inner: ConnectionInner::SingleStreamMsNoiseYamux, |
369 | 0 | something_happened: event_listener::Event::new(), |
370 | 0 | }, |
371 | | ); |
372 | 0 | debug_assert!(_prev_value.is_none()); |
373 | | |
374 | 0 | let _prev_value = lock.streams.insert( |
375 | 0 | (connection_id, None), |
376 | 0 | Stream { |
377 | 0 | reset: None, |
378 | 0 | messages_queue: VecDeque::with_capacity(8), |
379 | 0 | messages_queue_total_size: 0, |
380 | 0 | something_happened: event_listener::Event::new(), |
381 | 0 | writable_bytes_extra: 0, |
382 | 0 | }, |
383 | | ); |
384 | 0 | debug_assert!(_prev_value.is_none()); |
385 | | |
386 | 0 | future::ready(StreamWrapper { |
387 | 0 | connection_id, |
388 | 0 | stream_id: None, |
389 | 0 | read_buffer: Vec::new(), |
390 | 0 | inner_expected_incoming_bytes: Some(1), |
391 | 0 | is_reset: None, |
392 | 0 | writable_bytes: 0, |
393 | 0 | write_closable, |
394 | 0 | write_closed: false, |
395 | 0 | when_wake_up: None, |
396 | 0 | }) |
397 | 0 | } |
398 | | |
399 | 0 | fn connect_multistream( |
400 | 0 | &self, |
401 | 0 | address: smoldot_light::platform::MultiStreamAddress, |
402 | 0 | ) -> Self::MultiStreamConnectFuture { |
403 | 0 | let mut lock = STATE.try_lock().unwrap(); |
404 | | |
405 | 0 | let connection_id = lock.next_connection_id; |
406 | 0 | lock.next_connection_id += 1; |
407 | | |
408 | 0 | let encoded_address: Vec<u8> = match address { |
409 | | smoldot_light::platform::MultiStreamAddress::WebRtc { |
410 | 0 | ip: IpAddr::V4(ip), |
411 | 0 | port, |
412 | 0 | remote_certificate_sha256, |
413 | 0 | } => iter::once(16u8) |
414 | 0 | .chain(port.to_be_bytes()) |
415 | 0 | .chain(remote_certificate_sha256.iter().copied()) |
416 | 0 | .chain(ip.to_string().bytes()) |
417 | 0 | .collect(), |
418 | | smoldot_light::platform::MultiStreamAddress::WebRtc { |
419 | 0 | ip: IpAddr::V6(ip), |
420 | 0 | port, |
421 | 0 | remote_certificate_sha256, |
422 | 0 | } => iter::once(17u8) |
423 | 0 | .chain(port.to_be_bytes()) |
424 | 0 | .chain(remote_certificate_sha256.iter().copied()) |
425 | 0 | .chain(ip.to_string().bytes()) |
426 | 0 | .collect(), |
427 | | }; |
428 | | |
429 | 0 | bindings::connection_new( |
430 | 0 | connection_id, |
431 | 0 | u32::try_from(encoded_address.as_ptr() as usize).unwrap(), |
432 | 0 | u32::try_from(encoded_address.len()).unwrap(), |
433 | | ); |
434 | | |
435 | 0 | let _prev_value = lock.connections.insert( |
436 | 0 | connection_id, |
437 | 0 | Connection { |
438 | 0 | inner: ConnectionInner::MultiStreamUnknownHandshake { |
439 | 0 | opened_substreams_to_pick_up: VecDeque::with_capacity(0), |
440 | 0 | connection_handles_alive: 1, |
441 | 0 | }, |
442 | 0 | something_happened: event_listener::Event::new(), |
443 | 0 | }, |
444 | | ); |
445 | 0 | debug_assert!(_prev_value.is_none()); |
446 | | |
447 | 0 | Box::pin(async move { |
448 | | // Wait until the connection state is no longer "unknown handshake". |
449 | 0 | let mut lock = loop { |
450 | 0 | let something_happened = { |
451 | 0 | let mut lock = STATE.try_lock().unwrap(); |
452 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
453 | | |
454 | 0 | if matches!( |
455 | 0 | connection.inner, |
456 | | ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. } |
457 | | ) { |
458 | 0 | break lock; |
459 | 0 | } |
460 | | |
461 | 0 | connection.something_happened.listen() |
462 | | }; |
463 | | |
464 | 0 | something_happened.await |
465 | | }; |
466 | 0 | let lock = &mut *lock; |
467 | | |
468 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
469 | | |
470 | 0 | match &mut connection.inner { |
471 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } |
472 | | | ConnectionInner::MultiStreamUnknownHandshake { .. } => { |
473 | 0 | unreachable!() |
474 | | } |
475 | | ConnectionInner::MultiStreamWebRtc { |
476 | 0 | local_tls_certificate_sha256, |
477 | | .. |
478 | 0 | } => smoldot_light::platform::MultiStreamWebRtcConnection { |
479 | 0 | connection: MultiStreamWrapper(connection_id), |
480 | 0 | local_tls_certificate_sha256: *local_tls_certificate_sha256, |
481 | 0 | }, |
482 | | ConnectionInner::Reset { .. } => { |
483 | | // If the connection was already reset, we proceed anyway but provide a fake |
484 | | // certificate hash. This has absolutely no consequence. |
485 | 0 | smoldot_light::platform::MultiStreamWebRtcConnection { |
486 | 0 | connection: MultiStreamWrapper(connection_id), |
487 | 0 | local_tls_certificate_sha256: [0; 32], |
488 | 0 | } |
489 | | } |
490 | | } |
491 | 0 | }) |
492 | 0 | } |
493 | | |
494 | 0 | fn next_substream<'a>( |
495 | 0 | &self, |
496 | 0 | MultiStreamWrapper(connection_id): &'a mut Self::MultiStream, |
497 | 0 | ) -> Self::NextSubstreamFuture<'a> { |
498 | 0 | let connection_id = *connection_id; |
499 | | |
500 | 0 | Box::pin(async move { |
501 | 0 | let (stream_id, direction) = loop { |
502 | 0 | let something_happened = { |
503 | 0 | let mut lock = STATE.try_lock().unwrap(); |
504 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
505 | | |
506 | 0 | match &mut connection.inner { |
507 | 0 | ConnectionInner::Reset { .. } => return None, |
508 | | ConnectionInner::MultiStreamWebRtc { |
509 | 0 | opened_substreams_to_pick_up, |
510 | 0 | connection_handles_alive, |
511 | | .. |
512 | | } |
513 | | | ConnectionInner::MultiStreamUnknownHandshake { |
514 | 0 | opened_substreams_to_pick_up, |
515 | 0 | connection_handles_alive, |
516 | | .. |
517 | | } => { |
518 | 0 | if let Some((substream, direction)) = |
519 | 0 | opened_substreams_to_pick_up.pop_front() |
520 | | { |
521 | 0 | *connection_handles_alive += 1; |
522 | 0 | break (substream, direction); |
523 | 0 | } |
524 | | } |
525 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
526 | 0 | unreachable!() |
527 | | } |
528 | | } |
529 | | |
530 | 0 | connection.something_happened.listen() |
531 | | }; |
532 | | |
533 | 0 | something_happened.await |
534 | | }; |
535 | | |
536 | 0 | Some(( |
537 | 0 | StreamWrapper { |
538 | 0 | connection_id, |
539 | 0 | stream_id: Some(stream_id), |
540 | 0 | read_buffer: Vec::new(), |
541 | 0 | inner_expected_incoming_bytes: Some(1), |
542 | 0 | is_reset: None, |
543 | 0 | writable_bytes: 0, |
544 | 0 | write_closable: false, // Note: this is currently hardcoded for WebRTC. |
545 | 0 | write_closed: false, |
546 | 0 | when_wake_up: None, |
547 | 0 | }, |
548 | 0 | direction, |
549 | 0 | )) |
550 | 0 | }) |
551 | 0 | } |
552 | | |
553 | 0 | fn open_out_substream(&self, MultiStreamWrapper(connection_id): &mut Self::MultiStream) { |
554 | 0 | match STATE |
555 | 0 | .try_lock() |
556 | 0 | .unwrap() |
557 | 0 | .connections |
558 | 0 | .get(connection_id) |
559 | 0 | .unwrap() |
560 | 0 | .inner |
561 | | { |
562 | | ConnectionInner::MultiStreamWebRtc { .. } |
563 | 0 | | ConnectionInner::MultiStreamUnknownHandshake { .. } => { |
564 | 0 | bindings::connection_stream_open(*connection_id); |
565 | 0 | } |
566 | 0 | ConnectionInner::Reset { .. } => {} |
567 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
568 | 0 | unreachable!() |
569 | | } |
570 | | } |
571 | 0 | } |
572 | | |
573 | 0 | fn wait_read_write_again<'a>( |
574 | 0 | &self, |
575 | 0 | stream: pin::Pin<&'a mut Self::Stream>, |
576 | 0 | ) -> Self::StreamUpdateFuture<'a> { |
577 | 0 | Box::pin(async move { |
578 | 0 | let stream = stream.get_mut(); |
579 | | |
580 | 0 | if stream.is_reset.is_some() { |
581 | 0 | future::pending::<()>().await; |
582 | 0 | } |
583 | | |
584 | | loop { |
585 | 0 | let listener = { |
586 | 0 | let mut lock = STATE.try_lock().unwrap(); |
587 | 0 | let stream_inner = lock |
588 | 0 | .streams |
589 | 0 | .get_mut(&(stream.connection_id, stream.stream_id)) |
590 | 0 | .unwrap(); |
591 | | |
592 | 0 | if let Some(msg) = &stream_inner.reset { |
593 | 0 | stream.is_reset = Some(msg.clone()); |
594 | 0 | return; |
595 | 0 | } |
596 | | |
597 | 0 | let mut shall_return = false; |
598 | | |
599 | | // Move the buffers from `STATE` into `read_buffer`. |
600 | 0 | if !stream_inner.messages_queue.is_empty() { |
601 | 0 | stream |
602 | 0 | .read_buffer |
603 | 0 | .reserve(stream_inner.messages_queue_total_size); |
604 | | |
605 | 0 | while let Some(msg) = stream_inner.messages_queue.pop_front() { |
606 | 0 | stream_inner.messages_queue_total_size -= msg.len(); |
607 | | // TODO: could be optimized by reworking the bindings |
608 | 0 | stream.read_buffer.extend_from_slice(&msg); |
609 | 0 | if stream |
610 | 0 | .inner_expected_incoming_bytes |
611 | 0 | .map_or(false, |expected| expected <= stream.read_buffer.len()) |
612 | | { |
613 | 0 | shall_return = true; |
614 | 0 | break; |
615 | 0 | } |
616 | | } |
617 | 0 | } |
618 | | |
619 | 0 | if stream_inner.writable_bytes_extra != 0 { |
620 | 0 | // As documented, the number of writable bytes must never become |
621 | 0 | // exceedingly large (a few megabytes). As such, this can't overflow |
622 | 0 | // unless there is a bug on the JavaScript side. |
623 | 0 | stream.writable_bytes += stream_inner.writable_bytes_extra; |
624 | 0 | stream_inner.writable_bytes_extra = 0; |
625 | 0 | shall_return = true; |
626 | 0 | } |
627 | | |
628 | 0 | if shall_return { |
629 | 0 | return; |
630 | 0 | } |
631 | | |
632 | 0 | stream_inner.something_happened.listen() |
633 | | }; |
634 | | |
635 | 0 | let timer_stop = async move { |
636 | 0 | listener.await; |
637 | 0 | false |
638 | 0 | } |
639 | 0 | .or(async { |
640 | 0 | if let Some(when_wake_up) = stream.when_wake_up.as_mut() { |
641 | 0 | when_wake_up.await; |
642 | 0 | stream.when_wake_up = None; |
643 | 0 | true |
644 | | } else { |
645 | 0 | future::pending().await |
646 | | } |
647 | 0 | }) |
648 | 0 | .await; |
649 | | |
650 | 0 | if timer_stop { |
651 | 0 | return; |
652 | 0 | } |
653 | | } |
654 | 0 | }) |
655 | 0 | } |
656 | | |
657 | 0 | fn read_write_access<'a>( |
658 | 0 | &self, |
659 | 0 | stream: pin::Pin<&'a mut Self::Stream>, |
660 | 0 | ) -> Result<Self::ReadWriteAccess<'a>, Self::StreamErrorRef<'a>> { |
661 | 0 | let stream = stream.get_mut(); |
662 | | |
663 | 0 | if let Some(message) = &stream.is_reset { |
664 | 0 | return Err(StreamError { |
665 | 0 | message: message.clone(), |
666 | 0 | }); |
667 | 0 | } |
668 | | |
669 | | Ok(ReadWriteAccess { |
670 | | read_write: read_write::ReadWrite { |
671 | 0 | now: Duration::from_micros(bindings::monotonic_clock_us()), |
672 | 0 | incoming_buffer: mem::take(&mut stream.read_buffer), |
673 | 0 | expected_incoming_bytes: Some(0), |
674 | | read_bytes: 0, |
675 | 0 | write_buffers: Vec::new(), |
676 | | write_bytes_queued: 0, |
677 | 0 | write_bytes_queueable: if !stream.write_closed { |
678 | 0 | Some(stream.writable_bytes) |
679 | | } else { |
680 | 0 | None |
681 | | }, |
682 | 0 | wake_up_after: None, |
683 | | }, |
684 | 0 | stream, |
685 | | }) |
686 | 0 | } |
687 | | } |
688 | | |
689 | | pub(crate) struct ReadWriteAccess<'a> { |
690 | | read_write: read_write::ReadWrite<Duration>, |
691 | | stream: &'a mut StreamWrapper, |
692 | | } |
693 | | |
694 | | impl<'a> ops::Deref for ReadWriteAccess<'a> { |
695 | | type Target = read_write::ReadWrite<Duration>; |
696 | | |
697 | 0 | fn deref(&self) -> &Self::Target { |
698 | 0 | &self.read_write |
699 | 0 | } |
700 | | } |
701 | | |
702 | | impl<'a> ops::DerefMut for ReadWriteAccess<'a> { |
703 | 0 | fn deref_mut(&mut self) -> &mut Self::Target { |
704 | 0 | &mut self.read_write |
705 | 0 | } |
706 | | } |
707 | | |
708 | | impl<'a> Drop for ReadWriteAccess<'a> { |
709 | 0 | fn drop(&mut self) { |
710 | 0 | let mut lock = STATE.try_lock().unwrap(); |
711 | | |
712 | 0 | let stream_inner = lock |
713 | 0 | .streams |
714 | 0 | .get_mut(&(self.stream.connection_id, self.stream.stream_id)) |
715 | 0 | .unwrap(); |
716 | | |
717 | 0 | if (self.read_write.read_bytes != 0 |
718 | 0 | && self |
719 | 0 | .read_write |
720 | 0 | .expected_incoming_bytes |
721 | 0 | .map_or(false, |expected| { |
722 | 0 | expected >= self.read_write.incoming_buffer.len() |
723 | 0 | })) |
724 | 0 | || (self.read_write.write_bytes_queued != 0 |
725 | 0 | && self.read_write.write_bytes_queueable.is_some()) |
726 | 0 | { |
727 | 0 | self.read_write.wake_up_asap(); |
728 | 0 | } |
729 | | |
730 | 0 | self.stream.when_wake_up = self |
731 | 0 | .read_write |
732 | 0 | .wake_up_after |
733 | 0 | .map(Delay::new_at_monotonic_clock); |
734 | | |
735 | 0 | self.stream.read_buffer = mem::take(&mut self.read_write.incoming_buffer); |
736 | | |
737 | 0 | self.stream.inner_expected_incoming_bytes = self.read_write.expected_incoming_bytes; |
738 | | |
739 | 0 | if !self.read_write.write_buffers.is_empty() && stream_inner.reset.is_none() { |
740 | 0 | let mut io_vectors = Vec::with_capacity(self.read_write.write_buffers.len()); |
741 | 0 | let mut total_length = 0; |
742 | | |
743 | 0 | for buffer in &self.read_write.write_buffers { |
744 | 0 | io_vectors.push(bindings::StreamSendIoVector { |
745 | 0 | ptr: u32::try_from(buffer.as_ptr() as usize).unwrap(), |
746 | 0 | len: u32::try_from(buffer.len()).unwrap(), |
747 | 0 | }); |
748 | 0 | total_length += buffer.len(); |
749 | 0 | } |
750 | | |
751 | 0 | assert!(total_length <= self.stream.writable_bytes); |
752 | 0 | self.stream.writable_bytes -= total_length; |
753 | | |
754 | | // `unwrap()` is ok as there's no way that `buffer.len()` doesn't fit in a `u64`. |
755 | 0 | TOTAL_BYTES_SENT.fetch_add(u64::try_from(total_length).unwrap(), Ordering::Relaxed); |
756 | | |
757 | 0 | bindings::stream_send( |
758 | 0 | self.stream.connection_id, |
759 | 0 | self.stream.stream_id.unwrap_or(0), |
760 | 0 | u32::try_from(io_vectors.as_ptr() as usize).unwrap(), |
761 | 0 | u32::try_from(io_vectors.len()).unwrap(), |
762 | | ); |
763 | | |
764 | 0 | self.read_write.write_buffers.clear(); |
765 | 0 | } |
766 | | |
767 | 0 | if self.read_write.write_bytes_queueable.is_none() && !self.stream.write_closed { |
768 | 0 | if stream_inner.reset.is_none() && self.stream.write_closable { |
769 | 0 | bindings::stream_send_close( |
770 | 0 | self.stream.connection_id, |
771 | 0 | self.stream.stream_id.unwrap_or(0), |
772 | 0 | ); |
773 | 0 | } |
774 | | |
775 | 0 | self.stream.write_closed = true; |
776 | 0 | } |
777 | 0 | } |
778 | | } |
779 | | |
780 | | pub(crate) struct StreamWrapper { |
781 | | connection_id: u32, |
782 | | stream_id: Option<u32>, |
783 | | read_buffer: Vec<u8>, |
784 | | inner_expected_incoming_bytes: Option<usize>, |
785 | | /// `Some` if the remote has reset the stream and `update_stream` has since then been called. |
786 | | /// Contains the error message. |
787 | | is_reset: Option<String>, |
788 | | writable_bytes: usize, |
789 | | write_closable: bool, |
790 | | write_closed: bool, |
791 | | /// The stream should wake up after this delay. |
792 | | when_wake_up: Option<Delay>, |
793 | | } |
794 | | |
795 | | impl Drop for StreamWrapper { |
796 | 0 | fn drop(&mut self) { |
797 | 0 | let mut lock = STATE.try_lock().unwrap(); |
798 | 0 | let lock = &mut *lock; |
799 | | |
800 | 0 | let connection = lock.connections.get_mut(&self.connection_id).unwrap(); |
801 | 0 | let removed_stream = lock |
802 | 0 | .streams |
803 | 0 | .remove(&(self.connection_id, self.stream_id)) |
804 | 0 | .unwrap(); |
805 | | |
806 | 0 | let remove_connection = match &mut connection.inner { |
807 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
808 | 0 | if removed_stream.reset.is_none() { |
809 | 0 | bindings::reset_connection(self.connection_id); |
810 | 0 | } |
811 | | |
812 | 0 | debug_assert!(self.stream_id.is_none()); |
813 | 0 | true |
814 | | } |
815 | | ConnectionInner::MultiStreamWebRtc { |
816 | 0 | connection_handles_alive, |
817 | | .. |
818 | | } |
819 | | | ConnectionInner::MultiStreamUnknownHandshake { |
820 | 0 | connection_handles_alive, |
821 | | .. |
822 | | } => { |
823 | 0 | if removed_stream.reset.is_none() { |
824 | 0 | bindings::connection_stream_reset(self.connection_id, self.stream_id.unwrap()); |
825 | 0 | } |
826 | 0 | *connection_handles_alive -= 1; |
827 | 0 | let remove_connection = *connection_handles_alive == 0; |
828 | 0 | if remove_connection { |
829 | 0 | bindings::reset_connection(self.connection_id); |
830 | 0 | } |
831 | 0 | remove_connection |
832 | | } |
833 | | ConnectionInner::Reset { |
834 | 0 | connection_handles_alive, |
835 | | .. |
836 | | } => { |
837 | 0 | *connection_handles_alive -= 1; |
838 | 0 | *connection_handles_alive == 0 |
839 | | } |
840 | | }; |
841 | | |
842 | 0 | if remove_connection { |
843 | 0 | lock.connections.remove(&self.connection_id).unwrap(); |
844 | 0 | } |
845 | 0 | } |
846 | | } |
847 | | |
848 | | pub(crate) struct MultiStreamWrapper(u32); |
849 | | |
850 | | impl Drop for MultiStreamWrapper { |
851 | 0 | fn drop(&mut self) { |
852 | 0 | let mut lock = STATE.try_lock().unwrap(); |
853 | | |
854 | 0 | let connection = lock.connections.get_mut(&self.0).unwrap(); |
855 | 0 | let (remove_connection, reset_connection) = match &mut connection.inner { |
856 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
857 | 0 | unreachable!() |
858 | | } |
859 | | ConnectionInner::MultiStreamWebRtc { |
860 | 0 | connection_handles_alive, |
861 | | .. |
862 | | } |
863 | | | ConnectionInner::MultiStreamUnknownHandshake { |
864 | 0 | connection_handles_alive, |
865 | | .. |
866 | | } => { |
867 | 0 | *connection_handles_alive -= 1; |
868 | 0 | let v = *connection_handles_alive == 0; |
869 | 0 | (v, v) |
870 | | } |
871 | 0 | ConnectionInner::Reset { .. } => (true, false), |
872 | | }; |
873 | | |
874 | 0 | if remove_connection { |
875 | 0 | lock.connections.remove(&self.0).unwrap(); |
876 | 0 | } |
877 | 0 | if reset_connection { |
878 | 0 | bindings::reset_connection(self.0); |
879 | 0 | } |
880 | 0 | } |
881 | | } |
882 | | |
883 | | #[derive(Debug, derive_more::Display, derive_more::Error, Clone)] |
884 | | #[display("{message}")] |
885 | | pub(crate) struct StreamError { |
886 | | message: String, |
887 | | } |
888 | | |
889 | | static STATE: Mutex<NetworkState> = Mutex::new(NetworkState { |
890 | | next_connection_id: 0, |
891 | | connections: hashbrown::HashMap::with_hasher(FnvBuildHasher), |
892 | | streams: BTreeMap::new(), |
893 | | }); |
894 | | |
895 | | // TODO: we use a custom `FnvBuildHasher` because it's not possible to create `fnv::FnvBuildHasher` in a `const` context |
896 | | struct FnvBuildHasher; |
897 | | impl core::hash::BuildHasher for FnvBuildHasher { |
898 | | type Hasher = fnv::FnvHasher; |
899 | 0 | fn build_hasher(&self) -> fnv::FnvHasher { |
900 | 0 | fnv::FnvHasher::default() |
901 | 0 | } |
902 | | } |
903 | | |
904 | | /// All the connections and streams that are alive. |
905 | | /// |
906 | | /// Single-stream connections have one entry in `connections` and one entry in `streams` (with |
907 | | /// a `stream_id` always equal to `None`). |
908 | | /// Multi-stream connections have one entry in `connections` and zero or more entries in `streams`. |
909 | | struct NetworkState { |
910 | | next_connection_id: u32, |
911 | | connections: hashbrown::HashMap<u32, Connection, FnvBuildHasher>, |
912 | | streams: BTreeMap<(u32, Option<u32>), Stream>, |
913 | | } |
914 | | |
915 | | struct Connection { |
916 | | /// Type of connection and extra fields that depend on the type. |
917 | | inner: ConnectionInner, |
918 | | /// Event notified whenever one of the fields above is modified. |
919 | | something_happened: event_listener::Event, |
920 | | } |
921 | | |
922 | | enum ConnectionInner { |
923 | | SingleStreamMsNoiseYamux, |
924 | | MultiStreamUnknownHandshake { |
925 | | /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, |
926 | | /// but that haven't been reported through |
927 | | /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. |
928 | | opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, |
929 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
930 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
931 | | connection_handles_alive: u32, |
932 | | }, |
933 | | MultiStreamWebRtc { |
934 | | /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, |
935 | | /// but that haven't been reported through |
936 | | /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. |
937 | | opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, |
938 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
939 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
940 | | connection_handles_alive: u32, |
941 | | /// SHA256 hash of the TLS certificate used by the local node at the DTLS layer. |
942 | | local_tls_certificate_sha256: [u8; 32], |
943 | | }, |
944 | | /// [`bindings::connection_reset`] has been called |
945 | | Reset { |
946 | | /// Message given by the bindings to justify the closure. |
947 | | // TODO: why is this unused? shouldn't it be not unused? |
948 | | _message: String, |
949 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
950 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
951 | | connection_handles_alive: u32, |
952 | | }, |
953 | | } |
954 | | |
955 | | struct Stream { |
956 | | /// `Some` if [`bindings::stream_reset`] has been called. Contains the error message. |
957 | | reset: Option<String>, |
958 | | /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] that |
959 | | /// haven't been processed yet in a call to `update_stream`. |
960 | | writable_bytes_extra: usize, |
961 | | /// List of messages received through [`bindings::stream_message`]. Must never contain |
962 | | /// empty messages. |
963 | | messages_queue: VecDeque<Box<[u8]>>, |
964 | | /// Total size of all the messages stored in [`Stream::messages_queue`]. |
965 | | messages_queue_total_size: usize, |
966 | | /// Event notified whenever one of the fields above is modified, such as a new message being |
967 | | /// queued. |
968 | | something_happened: event_listener::Event, |
969 | | } |
970 | | |
971 | 0 | pub(crate) fn connection_multi_stream_set_handshake_info( |
972 | 0 | connection_id: u32, |
973 | 0 | handshake_ty: Box<[u8]>, |
974 | 0 | ) { |
975 | 0 | let (_, local_tls_certificate_sha256) = nom::Parser::parse( |
976 | 0 | &mut nom::sequence::preceded( |
977 | 0 | nom::bytes::streaming::tag::<_, _, nom::error::Error<&[u8]>>(&[0][..]), |
978 | 0 | nom::combinator::map(nom::bytes::streaming::take(32u32), |b| { |
979 | 0 | <&[u8; 32]>::try_from(b).unwrap() |
980 | 0 | }), |
981 | | ), |
982 | 0 | &handshake_ty[..], |
983 | | ) |
984 | 0 | .expect("invalid handshake type provided to connection_multi_stream_set_handshake_info"); |
985 | | |
986 | 0 | let mut lock = STATE.try_lock().unwrap(); |
987 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
988 | | |
989 | 0 | let (opened_substreams_to_pick_up, connection_handles_alive) = match &mut connection.inner { |
990 | | ConnectionInner::MultiStreamUnknownHandshake { |
991 | 0 | opened_substreams_to_pick_up, |
992 | 0 | connection_handles_alive, |
993 | 0 | } => ( |
994 | 0 | mem::take(opened_substreams_to_pick_up), |
995 | 0 | *connection_handles_alive, |
996 | 0 | ), |
997 | 0 | _ => unreachable!(), |
998 | | }; |
999 | | |
1000 | 0 | connection.inner = ConnectionInner::MultiStreamWebRtc { |
1001 | 0 | opened_substreams_to_pick_up, |
1002 | 0 | connection_handles_alive, |
1003 | 0 | local_tls_certificate_sha256: *local_tls_certificate_sha256, |
1004 | 0 | }; |
1005 | 0 | connection.something_happened.notify(usize::MAX); |
1006 | 0 | } |
1007 | | |
1008 | 0 | pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u32) { |
1009 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1010 | | |
1011 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1012 | | |
1013 | | // For single stream connections, the docs of this function mentions that `stream_id` can be |
1014 | | // any value. |
1015 | 0 | let actual_stream_id = match connection.inner { |
1016 | | ConnectionInner::MultiStreamWebRtc { .. } |
1017 | 0 | | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), |
1018 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, |
1019 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1020 | | }; |
1021 | | |
1022 | 0 | let stream = lock |
1023 | 0 | .streams |
1024 | 0 | .get_mut(&(connection_id, actual_stream_id)) |
1025 | 0 | .unwrap(); |
1026 | 0 | debug_assert!(stream.reset.is_none()); |
1027 | | |
1028 | | // As documented, the number of writable bytes must never become exceedingly large (a few |
1029 | | // megabytes). As such, this can't overflow unless there is a bug on the JavaScript side. |
1030 | 0 | stream.writable_bytes_extra += usize::try_from(bytes).unwrap(); |
1031 | 0 | stream.something_happened.notify(usize::MAX); |
1032 | 0 | } |
1033 | | |
1034 | 0 | pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Box<[u8]>) { |
1035 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1036 | | |
1037 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1038 | | |
1039 | | // For single stream connections, the docs of this function mentions that `stream_id` can be |
1040 | | // any value. |
1041 | 0 | let actual_stream_id = match connection.inner { |
1042 | | ConnectionInner::MultiStreamWebRtc { .. } |
1043 | 0 | | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), |
1044 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, |
1045 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1046 | | }; |
1047 | | |
1048 | 0 | let stream = lock |
1049 | 0 | .streams |
1050 | 0 | .get_mut(&(connection_id, actual_stream_id)) |
1051 | 0 | .unwrap(); |
1052 | 0 | debug_assert!(stream.reset.is_none()); |
1053 | | |
1054 | 0 | TOTAL_BYTES_RECEIVED.fetch_add(u64::try_from(message.len()).unwrap(), Ordering::Relaxed); |
1055 | | |
1056 | | // Ignore empty message to avoid all sorts of problems. |
1057 | 0 | if message.is_empty() { |
1058 | 0 | return; |
1059 | 0 | } |
1060 | | |
1061 | | // There is unfortunately no way to instruct the browser to back-pressure connections to |
1062 | | // remotes. |
1063 | | // |
1064 | | // In order to avoid DoS attacks, we refuse to buffer more than a certain amount of data per |
1065 | | // connection. This limit is completely arbitrary, and this is in no way a robust solution |
1066 | | // because this limit isn't in sync with any other part of the code. In other words, it could |
1067 | | // be legitimate for the remote to buffer a large amount of data. |
1068 | | // |
1069 | | // This corner case is handled by discarding the messages that would go over the limit. While |
1070 | | // this is not a great solution, going over that limit can be considered as a fault from the |
1071 | | // remote, the same way as it would be a fault from the remote to forget to send some bytes, |
1072 | | // and thus should be handled in a similar way by the higher level code. |
1073 | | // |
1074 | | // A better way to handle this would be to kill the connection abruptly. However, this would |
1075 | | // add a lot of complex code in this module, and the effort is clearly not worth it for this |
1076 | | // niche situation. |
1077 | | // |
1078 | | // While this problem is specific to browsers (Deno and NodeJS have ways to back-pressure |
1079 | | // connections), we add this hack for all platforms, for consistency. If this limit is ever |
1080 | | // reached, we want to be sure to detect it, even when testing on NodeJS or Deno. |
1081 | | // |
1082 | | // See <https://github.com/smol-dot/smoldot/issues/109>. |
1083 | | // TODO: do this properly eventually ^ |
1084 | 0 | if stream.messages_queue_total_size >= 25 * 1024 * 1024 { |
1085 | 0 | return; |
1086 | 0 | } |
1087 | | |
1088 | 0 | stream.messages_queue_total_size += message.len(); |
1089 | 0 | stream.messages_queue.push_back(message); |
1090 | 0 | stream.something_happened.notify(usize::MAX); |
1091 | 0 | } |
1092 | | |
1093 | 0 | pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { |
1094 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1095 | 0 | let lock = &mut *lock; |
1096 | | |
1097 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1098 | | if let ConnectionInner::MultiStreamWebRtc { |
1099 | 0 | opened_substreams_to_pick_up, |
1100 | | .. |
1101 | 0 | } = &mut connection.inner |
1102 | | { |
1103 | 0 | let _prev_value = lock.streams.insert( |
1104 | 0 | (connection_id, Some(stream_id)), |
1105 | 0 | Stream { |
1106 | 0 | reset: None, |
1107 | 0 | messages_queue: VecDeque::with_capacity(8), |
1108 | 0 | messages_queue_total_size: 0, |
1109 | 0 | something_happened: event_listener::Event::new(), |
1110 | 0 | writable_bytes_extra: 0, |
1111 | 0 | }, |
1112 | | ); |
1113 | | |
1114 | 0 | if _prev_value.is_some() { |
1115 | 0 | panic!("same stream_id used multiple times in connection_stream_opened") |
1116 | 0 | } |
1117 | | |
1118 | 0 | opened_substreams_to_pick_up.push_back(( |
1119 | 0 | stream_id, |
1120 | 0 | if outbound != 0 { |
1121 | 0 | SubstreamDirection::Outbound |
1122 | | } else { |
1123 | 0 | SubstreamDirection::Inbound |
1124 | | }, |
1125 | | )); |
1126 | | |
1127 | 0 | connection.something_happened.notify(usize::MAX); |
1128 | | } else { |
1129 | 0 | panic!() |
1130 | | } |
1131 | 0 | } |
1132 | | |
1133 | 0 | pub(crate) fn connection_reset(connection_id: u32, message: Box<[u8]>) { |
1134 | 0 | let message = str::from_utf8(&message) |
1135 | 0 | .unwrap_or_else(|_| panic!("non-UTF-8 message")) |
1136 | 0 | .to_owned(); |
1137 | | |
1138 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1139 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1140 | | |
1141 | 0 | let connection_handles_alive = match &connection.inner { |
1142 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something |
1143 | | ConnectionInner::MultiStreamWebRtc { |
1144 | 0 | connection_handles_alive, |
1145 | | .. |
1146 | | } |
1147 | | | ConnectionInner::MultiStreamUnknownHandshake { |
1148 | 0 | connection_handles_alive, |
1149 | | .. |
1150 | 0 | } => *connection_handles_alive, |
1151 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1152 | | }; |
1153 | | |
1154 | 0 | connection.inner = ConnectionInner::Reset { |
1155 | 0 | connection_handles_alive, |
1156 | 0 | _message: message.clone(), |
1157 | 0 | }; |
1158 | | |
1159 | 0 | connection.something_happened.notify(usize::MAX); |
1160 | | |
1161 | 0 | for ((_, _), stream) in lock |
1162 | 0 | .streams |
1163 | 0 | .range_mut((connection_id, Some(u32::MIN))..=(connection_id, Some(u32::MAX))) |
1164 | 0 | { |
1165 | 0 | stream.reset = Some(message.clone()); |
1166 | 0 | stream.something_happened.notify(usize::MAX); |
1167 | 0 | } |
1168 | 0 | if let Some(stream) = lock.streams.get_mut(&(connection_id, None)) { |
1169 | 0 | stream.reset = Some(message); |
1170 | 0 | stream.something_happened.notify(usize::MAX); |
1171 | 0 | } |
1172 | 0 | } |
1173 | | |
1174 | 0 | pub(crate) fn stream_reset(connection_id: u32, stream_id: u32, message: Box<[u8]>) { |
1175 | 0 | let message: String = str::from_utf8(&message) |
1176 | 0 | .unwrap_or_else(|_| panic!("non-UTF-8 message")) |
1177 | 0 | .to_owned(); |
1178 | | |
1179 | | // Note that, as documented, it is illegal to call this function on single-stream substreams. |
1180 | | // We can thus assume that the `stream_id` is valid. |
1181 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1182 | 0 | let stream = lock |
1183 | 0 | .streams |
1184 | 0 | .get_mut(&(connection_id, Some(stream_id))) |
1185 | 0 | .unwrap(); |
1186 | 0 | stream.reset = Some(message); |
1187 | 0 | stream.something_happened.notify(usize::MAX); |
1188 | 0 | } |