/__w/smoldot/smoldot/repo/wasm-node/rust/src/platform.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{bindings, timers::Delay}; |
19 | | |
20 | | use futures_lite::future::FutureExt as _; |
21 | | |
22 | | use smoldot_light::platform::{read_write, SubstreamDirection}; |
23 | | |
24 | | use alloc::{ |
25 | | borrow::{Cow, ToOwned as _}, |
26 | | boxed::Box, |
27 | | collections::{BTreeMap, VecDeque}, |
28 | | format, |
29 | | string::{String, ToString as _}, |
30 | | vec::Vec, |
31 | | }; |
32 | | use async_lock::Mutex; |
33 | | use core::{ |
34 | | fmt::{self, Write as _}, |
35 | | future, iter, mem, |
36 | | net::{IpAddr, Ipv4Addr, Ipv6Addr}, |
37 | | ops, pin, str, |
38 | | sync::atomic::{AtomicU32, AtomicU64, Ordering}, |
39 | | task, |
40 | | time::Duration, |
41 | | }; |
42 | | |
43 | | /// Total number of bytes that all the connections created through [`PlatformRef`] combined have |
44 | | /// received. |
45 | | pub static TOTAL_BYTES_RECEIVED: AtomicU64 = AtomicU64::new(0); |
46 | | /// Total number of bytes that all the connections created through [`PlatformRef`] combined have |
47 | | /// sent. |
48 | | pub static TOTAL_BYTES_SENT: AtomicU64 = AtomicU64::new(0); |
49 | | /// Total number of microseconds that all the tasks have spent executing. A `u64` will overflow |
50 | | /// after 584 542 years. |
51 | | pub static TOTAL_CPU_USAGE_US: AtomicU64 = AtomicU64::new(0); |
52 | | |
53 | | pub(crate) const PLATFORM_REF: PlatformRef = PlatformRef {}; |
54 | | |
55 | | /// Log level above which log entries aren't emitted. |
56 | | pub static MAX_LOG_LEVEL: AtomicU32 = AtomicU32::new(0); |
57 | | |
58 | | #[derive(Debug, Copy, Clone)] |
59 | | pub(crate) struct PlatformRef {} |
60 | | |
61 | | // TODO: this trait implementation was written before GATs were stable in Rust; now that the associated types have lifetimes, it should be possible to considerably simplify this code |
62 | | impl smoldot_light::platform::PlatformRef for PlatformRef { |
63 | | type Delay = Delay; |
64 | | type Instant = Duration; |
65 | | type MultiStream = MultiStreamWrapper; // Entry in the ̀`CONNECTIONS` map. |
66 | | type Stream = StreamWrapper; // Entry in the ̀`STREAMS` map and a read buffer. |
67 | | type StreamConnectFuture = future::Ready<Self::Stream>; |
68 | | type ReadWriteAccess<'a> = ReadWriteAccess<'a>; |
69 | | type StreamErrorRef<'a> = StreamError; |
70 | | type MultiStreamConnectFuture = pin::Pin< |
71 | | Box< |
72 | | dyn future::Future< |
73 | | Output = smoldot_light::platform::MultiStreamWebRtcConnection< |
74 | | Self::MultiStream, |
75 | | >, |
76 | | > + Send, |
77 | | >, |
78 | | >; |
79 | | type StreamUpdateFuture<'a> = pin::Pin<Box<dyn future::Future<Output = ()> + Send + 'a>>; |
80 | | type NextSubstreamFuture<'a> = pin::Pin< |
81 | | Box< |
82 | | dyn future::Future< |
83 | | Output = Option<(Self::Stream, smoldot_light::platform::SubstreamDirection)>, |
84 | | > + Send |
85 | | + 'a, |
86 | | >, |
87 | | >; |
88 | | |
89 | 0 | fn now_from_unix_epoch(&self) -> Duration { |
90 | 0 | let microseconds = unsafe { bindings::unix_timestamp_us() }; |
91 | 0 | Duration::from_micros(microseconds) |
92 | 0 | } |
93 | | |
94 | 0 | fn now(&self) -> Self::Instant { |
95 | 0 | let microseconds = unsafe { bindings::monotonic_clock_us() }; |
96 | 0 | Duration::from_micros(microseconds) |
97 | 0 | } |
98 | | |
99 | 0 | fn fill_random_bytes(&self, buffer: &mut [u8]) { |
100 | 0 | unsafe { |
101 | 0 | bindings::random_get( |
102 | 0 | u32::try_from(buffer.as_mut_ptr() as usize).unwrap(), |
103 | 0 | u32::try_from(buffer.len()).unwrap(), |
104 | 0 | ) |
105 | 0 | } |
106 | 0 | } |
107 | | |
108 | 0 | fn sleep(&self, duration: Duration) -> Self::Delay { |
109 | 0 | Delay::new(duration) |
110 | 0 | } |
111 | | |
112 | 0 | fn sleep_until(&self, when: Self::Instant) -> Self::Delay { |
113 | 0 | Delay::new_at_monotonic_clock(when) |
114 | 0 | } |
115 | | |
116 | 0 | fn spawn_task( |
117 | 0 | &self, |
118 | 0 | task_name: Cow<str>, |
119 | 0 | task: impl future::Future<Output = ()> + Send + 'static, |
120 | 0 | ) { |
121 | 0 | // The code below processes tasks that have names. |
122 | 0 | #[pin_project::pin_project] Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvMs2_B1b_INtB1b_6ClientBP_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB1b_7ChainIdEEsa_0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtB1b_15runtime_service14run_backgroundBP_E0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_15network_service5tasks29single_stream_connection_taskBP_E0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_15network_service5tasks35webrtc_multi_stream_connection_taskBP_E0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCINvNtNtB1b_16json_rpc_service10background3runBP_E0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNCNvMNtB1b_15runtime_serviceINtB2H_14RuntimeServiceBP_E31send_message_or_restart_service00E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNCNvMNtB1b_20transactions_serviceINtB2H_19TransactionsServiceBP_E18send_to_background00E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvB9_9add_chains0_0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_12sync_serviceINtB2F_11SyncServiceBP_E3new0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_15network_serviceINtB2F_14NetworkServiceBP_E3new0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_15network_serviceINtB2F_14NetworkServiceBP_E9add_chains_0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvMNtB1b_20transactions_serviceINtB2F_19TransactionsServiceBP_E3new0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterNCNvNtB9_4init4init0E7projectB9_ Unexecuted instantiation: _RNvMNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__INtB4_13FutureAdapterpE11project_refB9_ Unexecuted instantiation: _RINvNvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB7_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__24___assert_not_repr_packedpEB9_ Unexecuted instantiation: _RNvXININvNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtBa_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_task1__s3_0pEINtB7_13FutureAdapterpENtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4dropBc_ |
123 | 0 | struct FutureAdapter<F> { |
124 | 0 | name: String, |
125 | 0 | #[pin] |
126 | 0 | future: F, |
127 | 0 | } |
128 | 0 |
|
129 | 0 | impl<F: future::Future> future::Future for FutureAdapter<F> { |
130 | 0 | type Output = F::Output; |
131 | 0 | fn poll(self: pin::Pin<&mut Self>, cx: &mut task::Context) -> task::Poll<Self::Output> { |
132 | 0 | let this = self.project(); |
133 | 0 | unsafe { |
134 | 0 | bindings::current_task_entered( |
135 | 0 | u32::try_from(this.name.as_bytes().as_ptr() as usize).unwrap(), |
136 | 0 | u32::try_from(this.name.as_bytes().len()).unwrap(), |
137 | 0 | ) |
138 | 0 | } |
139 | 0 |
|
140 | 0 | let before_polling = unsafe { bindings::monotonic_clock_us() }; |
141 | 0 | let out = this.future.poll(cx); |
142 | 0 | let poll_duration = Duration::from_micros( |
143 | 0 | unsafe { bindings::monotonic_clock_us() } - before_polling, |
144 | 0 | ); |
145 | 0 | TOTAL_CPU_USAGE_US.fetch_add( |
146 | 0 | u64::try_from(poll_duration.as_micros()).unwrap_or(u64::MAX), |
147 | 0 | Ordering::Relaxed, |
148 | 0 | ); |
149 | 0 |
|
150 | 0 | unsafe { |
151 | 0 | bindings::current_task_exit(); |
152 | 0 | } |
153 | 0 |
|
154 | 0 | // Print a warning if polling the task takes a long time. |
155 | 0 | // It has been noticed that sometimes in Firefox polling a task takes a 16ms + a |
156 | 0 | // small amount. This most likely indicates that Firefox does something like |
157 | 0 | // freezing the JS/Wasm execution before resuming it at the next frame, thus adding |
158 | 0 | // 16ms to the execution time. |
159 | 0 | // For this reason, the threshold above which a task takes too long must be |
160 | 0 | // above 16ms no matter what. |
161 | 0 | if poll_duration.as_millis() >= 20 { |
162 | 0 | smoldot_light::platform::PlatformRef::log( |
163 | 0 | &PLATFORM_REF, |
164 | 0 | smoldot_light::platform::LogLevel::Debug, |
165 | 0 | "smoldot", |
166 | 0 | "task-too-long-time", |
167 | 0 | [ |
168 | 0 | ("name", this.name as &dyn fmt::Display), |
169 | 0 | ( |
170 | 0 | "poll_duration_ms", |
171 | 0 | &poll_duration.as_millis() as &dyn fmt::Display, |
172 | 0 | ), |
173 | 0 | ] |
174 | 0 | .into_iter(), |
175 | 0 | ); |
176 | 0 | } |
177 | 0 | if poll_duration.as_millis() >= 150 { |
178 | 0 | smoldot_light::platform::PlatformRef::log( |
179 | 0 | &PLATFORM_REF, |
180 | 0 | smoldot_light::platform::LogLevel::Warn, |
181 | 0 | "smoldot", |
182 | 0 | &format!( |
183 | 0 | "The task named `{}` has occupied the CPU for an \ |
184 | 0 | unreasonable amount of time ({}ms).", |
185 | 0 | this.name, |
186 | 0 | poll_duration.as_millis(), |
187 | 0 | ), |
188 | 0 | iter::empty(), |
189 | 0 | ); |
190 | 0 | } |
191 | 0 |
|
192 | 0 | out |
193 | 0 | } Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvMs2_B19_INtB19_6ClientBN_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB19_7ChainIdEEsa_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtB19_15runtime_service14run_backgroundBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_15network_service5tasks29single_stream_connection_taskBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_15network_service5tasks35webrtc_multi_stream_connection_taskBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCINvNtNtB19_16json_rpc_service10background3runBN_E0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB19_15runtime_serviceINtB2C_14RuntimeServiceBN_E31send_message_or_restart_service00ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNCNvMNtB19_20transactions_serviceINtB2C_19TransactionsServiceBN_E18send_to_background00ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvB7_9add_chains0_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_12sync_serviceINtB2A_11SyncServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_15network_serviceINtB2A_14NetworkServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_15network_serviceINtB2A_14NetworkServiceBN_E9add_chains_0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvMNtB19_20transactions_serviceINtB2A_19TransactionsServiceBN_E3new0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ Unexecuted instantiation: _RNvXNvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskINtB2_13FutureAdapterNCNvNtB7_4init4init0ENtNtNtCsaYZPK01V26L_4core6future6future6Future4pollB7_ |
194 | 0 | } |
195 | 0 |
|
196 | 0 | let task = FutureAdapter { |
197 | 0 | name: task_name.into_owned(), |
198 | 0 | future: task, |
199 | 0 | }; |
200 | 0 |
|
201 | 0 | let (runnable, task) = async_task::spawn(task, |runnable| { |
202 | 0 | super::TASKS_QUEUE.push(runnable); |
203 | 0 | unsafe { |
204 | 0 | bindings::advance_execution_ready(); |
205 | 0 | } |
206 | 0 | }); Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B19_INtB19_6ClientBN_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB19_7ChainIdEEsa_0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB19_15runtime_service14run_backgroundBN_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_15network_service5tasks29single_stream_connection_taskBN_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_15network_service5tasks35webrtc_multi_stream_connection_taskBN_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB19_16json_rpc_service10background3runBN_E0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB19_15runtime_serviceINtB2h_14RuntimeServiceBN_E31send_message_or_restart_service00E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB19_20transactions_serviceINtB2h_19TransactionsServiceBN_E18send_to_background00E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB7_9add_chains0_0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_12sync_serviceINtB2f_11SyncServiceBN_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_15network_serviceINtB2f_14NetworkServiceBN_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_15network_serviceINtB2f_14NetworkServiceBN_E9add_chains_0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB19_20transactions_serviceINtB2f_19TransactionsServiceBN_E3new0E0B7_ Unexecuted instantiation: _RNCINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB5_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB7_4init4init0E0B7_ |
207 | 0 |
|
208 | 0 | task.detach(); |
209 | 0 | runnable.schedule(); |
210 | 0 | } Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvMs2_B17_INtB17_6ClientBL_E9add_chainINtNtNtCsdZExvAaxgia_5alloc3vec9into_iter8IntoIterNtB17_7ChainIdEEsa_0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtB17_15runtime_service14run_backgroundBL_E0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_15network_service5tasks29single_stream_connection_taskBL_E0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_15network_service5tasks35webrtc_multi_stream_connection_taskBL_E0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCINvNtNtB17_16json_rpc_service10background3runBL_E0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB17_15runtime_serviceINtB2f_14RuntimeServiceBL_E31send_message_or_restart_service00EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNCNvMNtB17_20transactions_serviceINtB2f_19TransactionsServiceBL_E18send_to_background00EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvB5_9add_chains0_0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_12sync_serviceINtB2d_11SyncServiceBL_E3new0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_15network_serviceINtB2d_14NetworkServiceBL_E3new0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_15network_serviceINtB2d_14NetworkServiceBL_E9add_chains_0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvMNtB17_20transactions_serviceINtB2d_19TransactionsServiceBL_E3new0EB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef10spawn_taskNCNvNtB5_4init4init0EB5_ |
211 | | |
212 | 0 | fn log<'a>( |
213 | 0 | &self, |
214 | 0 | log_level: smoldot_light::platform::LogLevel, |
215 | 0 | log_target: &'a str, |
216 | 0 | message: &'a str, |
217 | 0 | key_values: impl Iterator<Item = (&'a str, &'a dyn fmt::Display)>, |
218 | 0 | ) { |
219 | 0 | let log_level = match log_level { |
220 | 0 | smoldot_light::platform::LogLevel::Error => 1, |
221 | 0 | smoldot_light::platform::LogLevel::Warn => 2, |
222 | 0 | smoldot_light::platform::LogLevel::Info => 3, |
223 | 0 | smoldot_light::platform::LogLevel::Debug => 4, |
224 | 0 | smoldot_light::platform::LogLevel::Trace => 5, |
225 | | }; |
226 | | |
227 | 0 | if log_level > MAX_LOG_LEVEL.load(Ordering::Relaxed) { |
228 | 0 | return; |
229 | 0 | } |
230 | 0 |
|
231 | 0 | let mut key_values = key_values.peekable(); |
232 | 0 |
|
233 | 0 | if key_values.peek().is_none() { |
234 | | unsafe { |
235 | 0 | bindings::log( |
236 | 0 | log_level, |
237 | 0 | u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(), |
238 | 0 | u32::try_from(log_target.as_bytes().len()).unwrap(), |
239 | 0 | u32::try_from(message.as_bytes().as_ptr() as usize).unwrap(), |
240 | 0 | u32::try_from(message.as_bytes().len()).unwrap(), |
241 | 0 | ) |
242 | | } |
243 | | } else { |
244 | 0 | let mut message_build = String::with_capacity(128); |
245 | 0 | message_build.push_str(message); |
246 | 0 | let mut first = true; |
247 | 0 | for (key, value) in key_values { |
248 | 0 | if first { |
249 | 0 | let _ = write!(message_build, "; "); |
250 | 0 | first = false; |
251 | 0 | } else { |
252 | 0 | let _ = write!(message_build, ", "); |
253 | 0 | } |
254 | 0 | let _ = write!(message_build, "{}={}", key, value); |
255 | | } |
256 | | |
257 | | unsafe { |
258 | 0 | bindings::log( |
259 | 0 | log_level, |
260 | 0 | u32::try_from(log_target.as_bytes().as_ptr() as usize).unwrap(), |
261 | 0 | u32::try_from(log_target.as_bytes().len()).unwrap(), |
262 | 0 | u32::try_from(message_build.as_bytes().as_ptr() as usize).unwrap(), |
263 | 0 | u32::try_from(message_build.as_bytes().len()).unwrap(), |
264 | 0 | ) |
265 | | } |
266 | | } |
267 | 0 | } Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtCsaYZPK01V26L_4core5array4iter8IntoIterTReRDNtNtB27_3fmt7DisplayEL_EKj2_EEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter7sources5empty5EmptyTReRDNtNtB29_3fmt7DisplayEL_EEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_IB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEIB21_B2S_INtNtB2X_5empty5EmptyB3l_EEEEB5_ Unexecuted instantiation: _RINvXNtCsDDUKWWCHAU_18smoldot_light_wasm8platformNtB3_11PlatformRefNtNtCsih6EgvAwZF2_13smoldot_light8platform11PlatformRef3logINtNtNtNtCsaYZPK01V26L_4core4iter8adapters5chain5ChainINtNtNtB27_7sources4once4OnceTReRDNtNtB29_3fmt7DisplayEL_EEINtNtB2X_5empty5EmptyB3l_EEEB5_ |
268 | | |
269 | 0 | fn client_name(&self) -> Cow<str> { |
270 | 0 | env!("CARGO_PKG_NAME").into() |
271 | 0 | } |
272 | | |
273 | 0 | fn client_version(&self) -> Cow<str> { |
274 | 0 | env!("CARGO_PKG_VERSION").into() |
275 | 0 | } |
276 | | |
277 | 0 | fn supports_connection_type( |
278 | 0 | &self, |
279 | 0 | connection_type: smoldot_light::platform::ConnectionType, |
280 | 0 | ) -> bool { |
281 | 0 | let ty = match connection_type { |
282 | 0 | smoldot_light::platform::ConnectionType::TcpIpv4 => 0, |
283 | 0 | smoldot_light::platform::ConnectionType::TcpIpv6 => 1, |
284 | 0 | smoldot_light::platform::ConnectionType::TcpDns => 2, |
285 | | smoldot_light::platform::ConnectionType::WebSocketIpv4 { |
286 | | remote_is_localhost: true, |
287 | | .. |
288 | | } |
289 | | | smoldot_light::platform::ConnectionType::WebSocketIpv6 { |
290 | | remote_is_localhost: true, |
291 | | .. |
292 | | } |
293 | | | smoldot_light::platform::ConnectionType::WebSocketDns { |
294 | | secure: false, |
295 | | remote_is_localhost: true, |
296 | 0 | } => 7, |
297 | 0 | smoldot_light::platform::ConnectionType::WebSocketIpv4 { .. } => 4, |
298 | 0 | smoldot_light::platform::ConnectionType::WebSocketIpv6 { .. } => 5, |
299 | 0 | smoldot_light::platform::ConnectionType::WebSocketDns { secure: false, .. } => 6, |
300 | 0 | smoldot_light::platform::ConnectionType::WebSocketDns { secure: true, .. } => 14, |
301 | 0 | smoldot_light::platform::ConnectionType::WebRtcIpv4 => 16, |
302 | 0 | smoldot_light::platform::ConnectionType::WebRtcIpv6 => 17, |
303 | | }; |
304 | | |
305 | 0 | unsafe { bindings::connection_type_supported(ty) != 0 } |
306 | 0 | } |
307 | | |
308 | 0 | fn connect_stream( |
309 | 0 | &self, |
310 | 0 | address: smoldot_light::platform::Address, |
311 | 0 | ) -> Self::StreamConnectFuture { |
312 | 0 | let mut lock = STATE.try_lock().unwrap(); |
313 | 0 |
|
314 | 0 | let connection_id = lock.next_connection_id; |
315 | 0 | lock.next_connection_id += 1; |
316 | | |
317 | 0 | let encoded_address: Vec<u8> = match address { |
318 | | smoldot_light::platform::Address::TcpIp { |
319 | 0 | ip: IpAddr::V4(ip), |
320 | 0 | port, |
321 | 0 | } => iter::once(0u8) |
322 | 0 | .chain(port.to_be_bytes()) |
323 | 0 | .chain(Ipv4Addr::from(ip).to_string().bytes()) |
324 | 0 | .collect(), |
325 | | smoldot_light::platform::Address::TcpIp { |
326 | 0 | ip: IpAddr::V6(ip), |
327 | 0 | port, |
328 | 0 | } => iter::once(1u8) |
329 | 0 | .chain(port.to_be_bytes()) |
330 | 0 | .chain(Ipv6Addr::from(ip).to_string().bytes()) |
331 | 0 | .collect(), |
332 | 0 | smoldot_light::platform::Address::TcpDns { hostname, port } => iter::once(2u8) |
333 | 0 | .chain(port.to_be_bytes()) |
334 | 0 | .chain(hostname.as_bytes().iter().copied()) |
335 | 0 | .collect(), |
336 | | smoldot_light::platform::Address::WebSocketIp { |
337 | 0 | ip: IpAddr::V4(ip), |
338 | 0 | port, |
339 | 0 | } => iter::once(4u8) |
340 | 0 | .chain(port.to_be_bytes()) |
341 | 0 | .chain(Ipv4Addr::from(ip).to_string().bytes()) |
342 | 0 | .collect(), |
343 | | smoldot_light::platform::Address::WebSocketIp { |
344 | 0 | ip: IpAddr::V6(ip), |
345 | 0 | port, |
346 | 0 | } => iter::once(5u8) |
347 | 0 | .chain(port.to_be_bytes()) |
348 | 0 | .chain(Ipv6Addr::from(ip).to_string().bytes()) |
349 | 0 | .collect(), |
350 | | smoldot_light::platform::Address::WebSocketDns { |
351 | 0 | hostname, |
352 | 0 | port, |
353 | 0 | secure: false, |
354 | 0 | } => iter::once(6u8) |
355 | 0 | .chain(port.to_be_bytes()) |
356 | 0 | .chain(hostname.as_bytes().iter().copied()) |
357 | 0 | .collect(), |
358 | | smoldot_light::platform::Address::WebSocketDns { |
359 | 0 | hostname, |
360 | 0 | port, |
361 | 0 | secure: true, |
362 | 0 | } => iter::once(14u8) |
363 | 0 | .chain(port.to_be_bytes()) |
364 | 0 | .chain(hostname.as_bytes().iter().copied()) |
365 | 0 | .collect(), |
366 | | }; |
367 | | |
368 | 0 | let write_closable = match address { |
369 | | smoldot_light::platform::Address::TcpIp { .. } |
370 | 0 | | smoldot_light::platform::Address::TcpDns { .. } => true, |
371 | | smoldot_light::platform::Address::WebSocketIp { .. } |
372 | 0 | | smoldot_light::platform::Address::WebSocketDns { .. } => false, |
373 | | }; |
374 | | |
375 | | unsafe { |
376 | 0 | bindings::connection_new( |
377 | 0 | connection_id, |
378 | 0 | u32::try_from(encoded_address.as_ptr() as usize).unwrap(), |
379 | 0 | u32::try_from(encoded_address.len()).unwrap(), |
380 | 0 | ) |
381 | 0 | } |
382 | 0 |
|
383 | 0 | let _prev_value = lock.connections.insert( |
384 | 0 | connection_id, |
385 | 0 | Connection { |
386 | 0 | inner: ConnectionInner::SingleStreamMsNoiseYamux, |
387 | 0 | something_happened: event_listener::Event::new(), |
388 | 0 | }, |
389 | 0 | ); |
390 | 0 | debug_assert!(_prev_value.is_none()); |
391 | | |
392 | 0 | let _prev_value = lock.streams.insert( |
393 | 0 | (connection_id, None), |
394 | 0 | Stream { |
395 | 0 | reset: None, |
396 | 0 | messages_queue: VecDeque::with_capacity(8), |
397 | 0 | messages_queue_total_size: 0, |
398 | 0 | something_happened: event_listener::Event::new(), |
399 | 0 | writable_bytes_extra: 0, |
400 | 0 | }, |
401 | 0 | ); |
402 | 0 | debug_assert!(_prev_value.is_none()); |
403 | | |
404 | 0 | future::ready(StreamWrapper { |
405 | 0 | connection_id, |
406 | 0 | stream_id: None, |
407 | 0 | read_buffer: Vec::new(), |
408 | 0 | inner_expected_incoming_bytes: Some(1), |
409 | 0 | is_reset: None, |
410 | 0 | writable_bytes: 0, |
411 | 0 | write_closable, |
412 | 0 | write_closed: false, |
413 | 0 | when_wake_up: None, |
414 | 0 | }) |
415 | 0 | } |
416 | | |
417 | 0 | fn connect_multistream( |
418 | 0 | &self, |
419 | 0 | address: smoldot_light::platform::MultiStreamAddress, |
420 | 0 | ) -> Self::MultiStreamConnectFuture { |
421 | 0 | let mut lock = STATE.try_lock().unwrap(); |
422 | 0 |
|
423 | 0 | let connection_id = lock.next_connection_id; |
424 | 0 | lock.next_connection_id += 1; |
425 | | |
426 | 0 | let encoded_address: Vec<u8> = match address { |
427 | | smoldot_light::platform::MultiStreamAddress::WebRtc { |
428 | 0 | ip: IpAddr::V4(ip), |
429 | 0 | port, |
430 | 0 | remote_certificate_sha256, |
431 | 0 | } => iter::once(16u8) |
432 | 0 | .chain(port.to_be_bytes()) |
433 | 0 | .chain(remote_certificate_sha256.iter().copied()) |
434 | 0 | .chain(Ipv4Addr::from(ip).to_string().bytes()) |
435 | 0 | .collect(), |
436 | | smoldot_light::platform::MultiStreamAddress::WebRtc { |
437 | 0 | ip: IpAddr::V6(ip), |
438 | 0 | port, |
439 | 0 | remote_certificate_sha256, |
440 | 0 | } => iter::once(17u8) |
441 | 0 | .chain(port.to_be_bytes()) |
442 | 0 | .chain(remote_certificate_sha256.iter().copied()) |
443 | 0 | .chain(Ipv6Addr::from(ip).to_string().bytes()) |
444 | 0 | .collect(), |
445 | | }; |
446 | | |
447 | | unsafe { |
448 | 0 | bindings::connection_new( |
449 | 0 | connection_id, |
450 | 0 | u32::try_from(encoded_address.as_ptr() as usize).unwrap(), |
451 | 0 | u32::try_from(encoded_address.len()).unwrap(), |
452 | 0 | ) |
453 | 0 | } |
454 | 0 |
|
455 | 0 | let _prev_value = lock.connections.insert( |
456 | 0 | connection_id, |
457 | 0 | Connection { |
458 | 0 | inner: ConnectionInner::MultiStreamUnknownHandshake { |
459 | 0 | opened_substreams_to_pick_up: VecDeque::with_capacity(0), |
460 | 0 | connection_handles_alive: 1, |
461 | 0 | }, |
462 | 0 | something_happened: event_listener::Event::new(), |
463 | 0 | }, |
464 | 0 | ); |
465 | 0 | debug_assert!(_prev_value.is_none()); |
466 | | |
467 | 0 | Box::pin(async move { |
468 | | // Wait until the connection state is no longer "unknown handshake". |
469 | 0 | let mut lock = loop { |
470 | 0 | let something_happened = { |
471 | 0 | let mut lock = STATE.try_lock().unwrap(); |
472 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
473 | | |
474 | 0 | if matches!( |
475 | 0 | connection.inner, |
476 | | ConnectionInner::Reset { .. } | ConnectionInner::MultiStreamWebRtc { .. } |
477 | | ) { |
478 | 0 | break lock; |
479 | 0 | } |
480 | 0 |
|
481 | 0 | connection.something_happened.listen() |
482 | 0 | }; |
483 | 0 |
|
484 | 0 | something_happened.await |
485 | | }; |
486 | 0 | let lock = &mut *lock; |
487 | 0 |
|
488 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
489 | 0 |
|
490 | 0 | match &mut connection.inner { |
491 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } |
492 | | | ConnectionInner::MultiStreamUnknownHandshake { .. } => { |
493 | 0 | unreachable!() |
494 | | } |
495 | | ConnectionInner::MultiStreamWebRtc { |
496 | 0 | local_tls_certificate_sha256, |
497 | 0 | .. |
498 | 0 | } => smoldot_light::platform::MultiStreamWebRtcConnection { |
499 | 0 | connection: MultiStreamWrapper(connection_id), |
500 | 0 | local_tls_certificate_sha256: *local_tls_certificate_sha256, |
501 | 0 | }, |
502 | | ConnectionInner::Reset { .. } => { |
503 | | // If the connection was already reset, we proceed anyway but provide a fake |
504 | | // certificate hash. This has absolutely no consequence. |
505 | 0 | smoldot_light::platform::MultiStreamWebRtcConnection { |
506 | 0 | connection: MultiStreamWrapper(connection_id), |
507 | 0 | local_tls_certificate_sha256: [0; 32], |
508 | 0 | } |
509 | | } |
510 | | } |
511 | 0 | }) |
512 | 0 | } |
513 | | |
514 | 0 | fn next_substream<'a>( |
515 | 0 | &self, |
516 | 0 | MultiStreamWrapper(connection_id): &'a mut Self::MultiStream, |
517 | 0 | ) -> Self::NextSubstreamFuture<'a> { |
518 | 0 | let connection_id = *connection_id; |
519 | 0 |
|
520 | 0 | Box::pin(async move { |
521 | 0 | let (stream_id, direction) = loop { |
522 | 0 | let something_happened = { |
523 | 0 | let mut lock = STATE.try_lock().unwrap(); |
524 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
525 | 0 |
|
526 | 0 | match &mut connection.inner { |
527 | 0 | ConnectionInner::Reset { .. } => return None, |
528 | | ConnectionInner::MultiStreamWebRtc { |
529 | 0 | opened_substreams_to_pick_up, |
530 | 0 | connection_handles_alive, |
531 | | .. |
532 | | } |
533 | | | ConnectionInner::MultiStreamUnknownHandshake { |
534 | 0 | opened_substreams_to_pick_up, |
535 | 0 | connection_handles_alive, |
536 | | .. |
537 | | } => { |
538 | 0 | if let Some((substream, direction)) = |
539 | 0 | opened_substreams_to_pick_up.pop_front() |
540 | | { |
541 | 0 | *connection_handles_alive += 1; |
542 | 0 | break (substream, direction); |
543 | 0 | } |
544 | | } |
545 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
546 | 0 | unreachable!() |
547 | | } |
548 | | } |
549 | | |
550 | 0 | connection.something_happened.listen() |
551 | 0 | }; |
552 | 0 |
|
553 | 0 | something_happened.await |
554 | | }; |
555 | | |
556 | 0 | Some(( |
557 | 0 | StreamWrapper { |
558 | 0 | connection_id, |
559 | 0 | stream_id: Some(stream_id), |
560 | 0 | read_buffer: Vec::new(), |
561 | 0 | inner_expected_incoming_bytes: Some(1), |
562 | 0 | is_reset: None, |
563 | 0 | writable_bytes: 0, |
564 | 0 | write_closable: false, // Note: this is currently hardcoded for WebRTC. |
565 | 0 | write_closed: false, |
566 | 0 | when_wake_up: None, |
567 | 0 | }, |
568 | 0 | direction, |
569 | 0 | )) |
570 | 0 | }) |
571 | 0 | } |
572 | | |
573 | 0 | fn open_out_substream(&self, MultiStreamWrapper(connection_id): &mut Self::MultiStream) { |
574 | 0 | match STATE |
575 | 0 | .try_lock() |
576 | 0 | .unwrap() |
577 | 0 | .connections |
578 | 0 | .get(connection_id) |
579 | 0 | .unwrap() |
580 | 0 | .inner |
581 | | { |
582 | | ConnectionInner::MultiStreamWebRtc { .. } |
583 | | | ConnectionInner::MultiStreamUnknownHandshake { .. } => unsafe { |
584 | 0 | bindings::connection_stream_open(*connection_id) |
585 | | }, |
586 | 0 | ConnectionInner::Reset { .. } => {} |
587 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
588 | 0 | unreachable!() |
589 | | } |
590 | | } |
591 | 0 | } |
592 | | |
593 | 0 | fn wait_read_write_again<'a>( |
594 | 0 | &self, |
595 | 0 | stream: pin::Pin<&'a mut Self::Stream>, |
596 | 0 | ) -> Self::StreamUpdateFuture<'a> { |
597 | 0 | Box::pin(async move { |
598 | 0 | let stream = stream.get_mut(); |
599 | 0 |
|
600 | 0 | if stream.is_reset.is_some() { |
601 | 0 | future::pending::<()>().await; |
602 | 0 | } |
603 | | |
604 | | loop { |
605 | 0 | let listener = { |
606 | 0 | let mut lock = STATE.try_lock().unwrap(); |
607 | 0 | let stream_inner = lock |
608 | 0 | .streams |
609 | 0 | .get_mut(&(stream.connection_id, stream.stream_id)) |
610 | 0 | .unwrap(); |
611 | | |
612 | 0 | if let Some(msg) = &stream_inner.reset { |
613 | 0 | stream.is_reset = Some(msg.clone()); |
614 | 0 | return; |
615 | 0 | } |
616 | 0 |
|
617 | 0 | let mut shall_return = false; |
618 | 0 |
|
619 | 0 | // Move the buffers from `STATE` into `read_buffer`. |
620 | 0 | if !stream_inner.messages_queue.is_empty() { |
621 | 0 | stream |
622 | 0 | .read_buffer |
623 | 0 | .reserve(stream_inner.messages_queue_total_size); |
624 | | |
625 | 0 | while let Some(msg) = stream_inner.messages_queue.pop_front() { |
626 | 0 | stream_inner.messages_queue_total_size -= msg.len(); |
627 | 0 | // TODO: could be optimized by reworking the bindings |
628 | 0 | stream.read_buffer.extend_from_slice(&msg); |
629 | 0 | if stream |
630 | 0 | .inner_expected_incoming_bytes |
631 | 0 | .map_or(false, |expected| expected <= stream.read_buffer.len()) |
632 | | { |
633 | 0 | shall_return = true; |
634 | 0 | break; |
635 | 0 | } |
636 | | } |
637 | 0 | } |
638 | | |
639 | 0 | if stream_inner.writable_bytes_extra != 0 { |
640 | 0 | // As documented, the number of writable bytes must never become |
641 | 0 | // exceedingly large (a few megabytes). As such, this can't overflow |
642 | 0 | // unless there is a bug on the JavaScript side. |
643 | 0 | stream.writable_bytes += stream_inner.writable_bytes_extra; |
644 | 0 | stream_inner.writable_bytes_extra = 0; |
645 | 0 | shall_return = true; |
646 | 0 | } |
647 | | |
648 | 0 | if shall_return { |
649 | 0 | return; |
650 | 0 | } |
651 | 0 |
|
652 | 0 | stream_inner.something_happened.listen() |
653 | | }; |
654 | | |
655 | 0 | let timer_stop = async move { |
656 | 0 | listener.await; |
657 | 0 | false |
658 | 0 | } |
659 | 0 | .or(async { |
660 | 0 | if let Some(when_wake_up) = stream.when_wake_up.as_mut() { |
661 | 0 | when_wake_up.await; |
662 | 0 | stream.when_wake_up = None; |
663 | 0 | true |
664 | | } else { |
665 | 0 | future::pending().await |
666 | | } |
667 | 0 | }) |
668 | 0 | .await; |
669 | | |
670 | 0 | if timer_stop { |
671 | 0 | return; |
672 | 0 | } |
673 | | } |
674 | 0 | }) |
675 | 0 | } |
676 | | |
677 | 0 | fn read_write_access<'a>( |
678 | 0 | &self, |
679 | 0 | stream: pin::Pin<&'a mut Self::Stream>, |
680 | 0 | ) -> Result<Self::ReadWriteAccess<'a>, Self::StreamErrorRef<'a>> { |
681 | 0 | let stream = stream.get_mut(); |
682 | | |
683 | 0 | if let Some(message) = &stream.is_reset { |
684 | 0 | return Err(StreamError { |
685 | 0 | message: message.clone(), |
686 | 0 | }); |
687 | 0 | } |
688 | 0 |
|
689 | 0 | Ok(ReadWriteAccess { |
690 | 0 | read_write: read_write::ReadWrite { |
691 | 0 | now: unsafe { Duration::from_micros(bindings::monotonic_clock_us()) }, |
692 | 0 | incoming_buffer: mem::take(&mut stream.read_buffer), |
693 | 0 | expected_incoming_bytes: Some(0), |
694 | 0 | read_bytes: 0, |
695 | 0 | write_buffers: Vec::new(), |
696 | 0 | write_bytes_queued: 0, |
697 | 0 | write_bytes_queueable: if !stream.write_closed { |
698 | 0 | Some(stream.writable_bytes) |
699 | | } else { |
700 | 0 | None |
701 | | }, |
702 | 0 | wake_up_after: None, |
703 | 0 | }, |
704 | 0 | stream, |
705 | | }) |
706 | 0 | } |
707 | | } |
708 | | |
709 | | pub(crate) struct ReadWriteAccess<'a> { |
710 | | read_write: read_write::ReadWrite<Duration>, |
711 | | stream: &'a mut StreamWrapper, |
712 | | } |
713 | | |
714 | | impl<'a> ops::Deref for ReadWriteAccess<'a> { |
715 | | type Target = read_write::ReadWrite<Duration>; |
716 | | |
717 | 0 | fn deref(&self) -> &Self::Target { |
718 | 0 | &self.read_write |
719 | 0 | } |
720 | | } |
721 | | |
722 | | impl<'a> ops::DerefMut for ReadWriteAccess<'a> { |
723 | 0 | fn deref_mut(&mut self) -> &mut Self::Target { |
724 | 0 | &mut self.read_write |
725 | 0 | } |
726 | | } |
727 | | |
728 | | impl<'a> Drop for ReadWriteAccess<'a> { |
729 | 0 | fn drop(&mut self) { |
730 | 0 | let mut lock = STATE.try_lock().unwrap(); |
731 | 0 |
|
732 | 0 | let stream_inner = lock |
733 | 0 | .streams |
734 | 0 | .get_mut(&(self.stream.connection_id, self.stream.stream_id)) |
735 | 0 | .unwrap(); |
736 | 0 |
|
737 | 0 | if (self.read_write.read_bytes != 0 |
738 | 0 | && self |
739 | 0 | .read_write |
740 | 0 | .expected_incoming_bytes |
741 | 0 | .map_or(false, |expected| { |
742 | 0 | expected >= self.read_write.incoming_buffer.len() |
743 | 0 | })) |
744 | 0 | || (self.read_write.write_bytes_queued != 0 |
745 | 0 | && self.read_write.write_bytes_queueable.is_some()) |
746 | 0 | { |
747 | 0 | self.read_write.wake_up_asap(); |
748 | 0 | } |
749 | | |
750 | 0 | self.stream.when_wake_up = self |
751 | 0 | .read_write |
752 | 0 | .wake_up_after |
753 | 0 | .map(Delay::new_at_monotonic_clock); |
754 | 0 |
|
755 | 0 | self.stream.read_buffer = mem::take(&mut self.read_write.incoming_buffer); |
756 | 0 |
|
757 | 0 | self.stream.inner_expected_incoming_bytes = self.read_write.expected_incoming_bytes; |
758 | 0 |
|
759 | 0 | if !self.read_write.write_buffers.is_empty() && stream_inner.reset.is_none() { |
760 | 0 | let mut io_vectors = Vec::with_capacity(self.read_write.write_buffers.len()); |
761 | 0 | let mut total_length = 0; |
762 | | |
763 | 0 | for buffer in &self.read_write.write_buffers { |
764 | 0 | io_vectors.push(bindings::StreamSendIoVector { |
765 | 0 | ptr: u32::try_from(buffer.as_ptr() as usize).unwrap(), |
766 | 0 | len: u32::try_from(buffer.len()).unwrap(), |
767 | 0 | }); |
768 | 0 | total_length += buffer.len(); |
769 | 0 | } |
770 | | |
771 | 0 | assert!(total_length <= self.stream.writable_bytes); |
772 | 0 | self.stream.writable_bytes -= total_length; |
773 | 0 |
|
774 | 0 | // `unwrap()` is ok as there's no way that `buffer.len()` doesn't fit in a `u64`. |
775 | 0 | TOTAL_BYTES_SENT.fetch_add(u64::try_from(total_length).unwrap(), Ordering::Relaxed); |
776 | 0 |
|
777 | 0 | unsafe { |
778 | 0 | bindings::stream_send( |
779 | 0 | self.stream.connection_id, |
780 | 0 | self.stream.stream_id.unwrap_or(0), |
781 | 0 | u32::try_from(io_vectors.as_ptr() as usize).unwrap(), |
782 | 0 | u32::try_from(io_vectors.len()).unwrap(), |
783 | 0 | ); |
784 | 0 | } |
785 | 0 |
|
786 | 0 | self.read_write.write_buffers.clear(); |
787 | 0 | } |
788 | | |
789 | 0 | if self.read_write.write_bytes_queueable.is_none() && !self.stream.write_closed { |
790 | 0 | if stream_inner.reset.is_none() && self.stream.write_closable { |
791 | 0 | unsafe { |
792 | 0 | bindings::stream_send_close( |
793 | 0 | self.stream.connection_id, |
794 | 0 | self.stream.stream_id.unwrap_or(0), |
795 | 0 | ); |
796 | 0 | } |
797 | 0 | } |
798 | | |
799 | 0 | self.stream.write_closed = true; |
800 | 0 | } |
801 | 0 | } |
802 | | } |
803 | | |
804 | | pub(crate) struct StreamWrapper { |
805 | | connection_id: u32, |
806 | | stream_id: Option<u32>, |
807 | | read_buffer: Vec<u8>, |
808 | | inner_expected_incoming_bytes: Option<usize>, |
809 | | /// `Some` if the remote has reset the stream and `update_stream` has since then been called. |
810 | | /// Contains the error message. |
811 | | is_reset: Option<String>, |
812 | | writable_bytes: usize, |
813 | | write_closable: bool, |
814 | | write_closed: bool, |
815 | | /// The stream should wake up after this delay. |
816 | | when_wake_up: Option<Delay>, |
817 | | } |
818 | | |
819 | | impl Drop for StreamWrapper { |
820 | 0 | fn drop(&mut self) { |
821 | 0 | let mut lock = STATE.try_lock().unwrap(); |
822 | 0 | let lock = &mut *lock; |
823 | 0 |
|
824 | 0 | let connection = lock.connections.get_mut(&self.connection_id).unwrap(); |
825 | 0 | let removed_stream = lock |
826 | 0 | .streams |
827 | 0 | .remove(&(self.connection_id, self.stream_id)) |
828 | 0 | .unwrap(); |
829 | | |
830 | 0 | let remove_connection = match &mut connection.inner { |
831 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
832 | 0 | if removed_stream.reset.is_none() { |
833 | 0 | unsafe { |
834 | 0 | bindings::reset_connection(self.connection_id); |
835 | 0 | } |
836 | 0 | } |
837 | | |
838 | 0 | debug_assert!(self.stream_id.is_none()); |
839 | 0 | true |
840 | | } |
841 | | ConnectionInner::MultiStreamWebRtc { |
842 | 0 | connection_handles_alive, |
843 | | .. |
844 | | } |
845 | | | ConnectionInner::MultiStreamUnknownHandshake { |
846 | 0 | connection_handles_alive, |
847 | | .. |
848 | | } => { |
849 | 0 | if removed_stream.reset.is_none() { |
850 | | unsafe { |
851 | 0 | bindings::connection_stream_reset( |
852 | 0 | self.connection_id, |
853 | 0 | self.stream_id.unwrap(), |
854 | 0 | ) |
855 | | } |
856 | 0 | } |
857 | 0 | *connection_handles_alive -= 1; |
858 | 0 | let remove_connection = *connection_handles_alive == 0; |
859 | 0 | if remove_connection { |
860 | 0 | unsafe { |
861 | 0 | bindings::reset_connection(self.connection_id); |
862 | 0 | } |
863 | 0 | } |
864 | 0 | remove_connection |
865 | | } |
866 | | ConnectionInner::Reset { |
867 | 0 | connection_handles_alive, |
868 | 0 | .. |
869 | 0 | } => { |
870 | 0 | *connection_handles_alive -= 1; |
871 | 0 | *connection_handles_alive == 0 |
872 | | } |
873 | | }; |
874 | | |
875 | 0 | if remove_connection { |
876 | 0 | lock.connections.remove(&self.connection_id).unwrap(); |
877 | 0 | } |
878 | 0 | } |
879 | | } |
880 | | |
881 | | pub(crate) struct MultiStreamWrapper(u32); |
882 | | |
883 | | impl Drop for MultiStreamWrapper { |
884 | 0 | fn drop(&mut self) { |
885 | 0 | let mut lock = STATE.try_lock().unwrap(); |
886 | 0 |
|
887 | 0 | let connection = lock.connections.get_mut(&self.0).unwrap(); |
888 | 0 | let (remove_connection, reset_connection) = match &mut connection.inner { |
889 | | ConnectionInner::SingleStreamMsNoiseYamux { .. } => { |
890 | 0 | unreachable!() |
891 | | } |
892 | | ConnectionInner::MultiStreamWebRtc { |
893 | 0 | connection_handles_alive, |
894 | | .. |
895 | | } |
896 | | | ConnectionInner::MultiStreamUnknownHandshake { |
897 | 0 | connection_handles_alive, |
898 | | .. |
899 | | } => { |
900 | 0 | *connection_handles_alive -= 1; |
901 | 0 | let v = *connection_handles_alive == 0; |
902 | 0 | (v, v) |
903 | | } |
904 | 0 | ConnectionInner::Reset { .. } => (true, false), |
905 | | }; |
906 | | |
907 | 0 | if remove_connection { |
908 | 0 | lock.connections.remove(&self.0).unwrap(); |
909 | 0 | } |
910 | 0 | if reset_connection { |
911 | 0 | unsafe { |
912 | 0 | bindings::reset_connection(self.0); |
913 | 0 | } |
914 | 0 | } |
915 | 0 | } |
916 | | } |
917 | | |
918 | 0 | #[derive(Debug, derive_more::Display, Clone)] |
919 | | #[display(fmt = "{message}")] |
920 | | pub(crate) struct StreamError { |
921 | | message: String, |
922 | | } |
923 | | |
924 | | static STATE: Mutex<NetworkState> = Mutex::new(NetworkState { |
925 | | next_connection_id: 0, |
926 | | connections: hashbrown::HashMap::with_hasher(FnvBuildHasher), |
927 | | streams: BTreeMap::new(), |
928 | | }); |
929 | | |
930 | | // TODO: we use a custom `FnvBuildHasher` because it's not possible to create `fnv::FnvBuildHasher` in a `const` context |
931 | | struct FnvBuildHasher; |
932 | | impl core::hash::BuildHasher for FnvBuildHasher { |
933 | | type Hasher = fnv::FnvHasher; |
934 | 0 | fn build_hasher(&self) -> fnv::FnvHasher { |
935 | 0 | fnv::FnvHasher::default() |
936 | 0 | } |
937 | | } |
938 | | |
939 | | /// All the connections and streams that are alive. |
940 | | /// |
941 | | /// Single-stream connections have one entry in `connections` and one entry in `streams` (with |
942 | | /// a `stream_id` always equal to `None`). |
943 | | /// Multi-stream connections have one entry in `connections` and zero or more entries in `streams`. |
944 | | struct NetworkState { |
945 | | next_connection_id: u32, |
946 | | connections: hashbrown::HashMap<u32, Connection, FnvBuildHasher>, |
947 | | streams: BTreeMap<(u32, Option<u32>), Stream>, |
948 | | } |
949 | | |
950 | | struct Connection { |
951 | | /// Type of connection and extra fields that depend on the type. |
952 | | inner: ConnectionInner, |
953 | | /// Event notified whenever one of the fields above is modified. |
954 | | something_happened: event_listener::Event, |
955 | | } |
956 | | |
957 | | enum ConnectionInner { |
958 | | SingleStreamMsNoiseYamux, |
959 | | MultiStreamUnknownHandshake { |
960 | | /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, |
961 | | /// but that haven't been reported through |
962 | | /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. |
963 | | opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, |
964 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
965 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
966 | | connection_handles_alive: u32, |
967 | | }, |
968 | | MultiStreamWebRtc { |
969 | | /// List of substreams that the host (i.e. JavaScript side) has reported have been opened, |
970 | | /// but that haven't been reported through |
971 | | /// [`smoldot_light::platform::PlatformRef::next_substream`] yet. |
972 | | opened_substreams_to_pick_up: VecDeque<(u32, SubstreamDirection)>, |
973 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
974 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
975 | | connection_handles_alive: u32, |
976 | | /// SHA256 hash of the TLS certificate used by the local node at the DTLS layer. |
977 | | local_tls_certificate_sha256: [u8; 32], |
978 | | }, |
979 | | /// [`bindings::connection_reset`] has been called |
980 | | Reset { |
981 | | /// Message given by the bindings to justify the closure. |
982 | | // TODO: why is this unused? shouldn't it be not unused? |
983 | | _message: String, |
984 | | /// Number of objects (connections and streams) in the [`PlatformRef`] API that reference |
985 | | /// this connection. If it switches from 1 to 0, the connection must be removed. |
986 | | connection_handles_alive: u32, |
987 | | }, |
988 | | } |
989 | | |
990 | | struct Stream { |
991 | | /// `Some` if [`bindings::stream_reset`] has been called. Contains the error message. |
992 | | reset: Option<String>, |
993 | | /// Sum of the writable bytes reported through [`bindings::stream_writable_bytes`] that |
994 | | /// haven't been processed yet in a call to `update_stream`. |
995 | | writable_bytes_extra: usize, |
996 | | /// List of messages received through [`bindings::stream_message`]. Must never contain |
997 | | /// empty messages. |
998 | | messages_queue: VecDeque<Box<[u8]>>, |
999 | | /// Total size of all the messages stored in [`Stream::messages_queue`]. |
1000 | | messages_queue_total_size: usize, |
1001 | | /// Event notified whenever one of the fields above is modified, such as a new message being |
1002 | | /// queued. |
1003 | | something_happened: event_listener::Event, |
1004 | | } |
1005 | | |
1006 | 0 | pub(crate) fn connection_multi_stream_set_handshake_info( |
1007 | 0 | connection_id: u32, |
1008 | 0 | handshake_ty: Vec<u8>, |
1009 | 0 | ) { |
1010 | 0 | let (_, local_tls_certificate_sha256) = nom::sequence::preceded( |
1011 | 0 | nom::bytes::streaming::tag::<_, _, nom::error::Error<&[u8]>>(&[0]), |
1012 | 0 | nom::combinator::map(nom::bytes::streaming::take(32u32), |b| { |
1013 | 0 | <&[u8; 32]>::try_from(b).unwrap() |
1014 | 0 | }), |
1015 | 0 | )(&handshake_ty[..]) |
1016 | 0 | .expect("invalid handshake type provided to connection_multi_stream_set_handshake_info"); |
1017 | 0 |
|
1018 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1019 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1020 | | |
1021 | 0 | let (opened_substreams_to_pick_up, connection_handles_alive) = match &mut connection.inner { |
1022 | | ConnectionInner::MultiStreamUnknownHandshake { |
1023 | 0 | opened_substreams_to_pick_up, |
1024 | 0 | connection_handles_alive, |
1025 | 0 | } => ( |
1026 | 0 | mem::take(opened_substreams_to_pick_up), |
1027 | 0 | *connection_handles_alive, |
1028 | 0 | ), |
1029 | 0 | _ => unreachable!(), |
1030 | | }; |
1031 | | |
1032 | 0 | connection.inner = ConnectionInner::MultiStreamWebRtc { |
1033 | 0 | opened_substreams_to_pick_up, |
1034 | 0 | connection_handles_alive, |
1035 | 0 | local_tls_certificate_sha256: *local_tls_certificate_sha256, |
1036 | 0 | }; |
1037 | 0 | connection.something_happened.notify(usize::MAX); |
1038 | 0 | } |
1039 | | |
1040 | 0 | pub(crate) fn stream_writable_bytes(connection_id: u32, stream_id: u32, bytes: u32) { |
1041 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1042 | 0 |
|
1043 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1044 | | |
1045 | | // For single stream connections, the docs of this function mentions that `stream_id` can be |
1046 | | // any value. |
1047 | 0 | let actual_stream_id = match connection.inner { |
1048 | | ConnectionInner::MultiStreamWebRtc { .. } |
1049 | 0 | | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), |
1050 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, |
1051 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1052 | | }; |
1053 | | |
1054 | 0 | let stream = lock |
1055 | 0 | .streams |
1056 | 0 | .get_mut(&(connection_id, actual_stream_id)) |
1057 | 0 | .unwrap(); |
1058 | 0 | debug_assert!(stream.reset.is_none()); |
1059 | | |
1060 | | // As documented, the number of writable bytes must never become exceedingly large (a few |
1061 | | // megabytes). As such, this can't overflow unless there is a bug on the JavaScript side. |
1062 | 0 | stream.writable_bytes_extra += usize::try_from(bytes).unwrap(); |
1063 | 0 | stream.something_happened.notify(usize::MAX); |
1064 | 0 | } |
1065 | | |
1066 | 0 | pub(crate) fn stream_message(connection_id: u32, stream_id: u32, message: Vec<u8>) { |
1067 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1068 | 0 |
|
1069 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1070 | | |
1071 | | // For single stream connections, the docs of this function mentions that `stream_id` can be |
1072 | | // any value. |
1073 | 0 | let actual_stream_id = match connection.inner { |
1074 | | ConnectionInner::MultiStreamWebRtc { .. } |
1075 | 0 | | ConnectionInner::MultiStreamUnknownHandshake { .. } => Some(stream_id), |
1076 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => None, |
1077 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1078 | | }; |
1079 | | |
1080 | 0 | let stream = lock |
1081 | 0 | .streams |
1082 | 0 | .get_mut(&(connection_id, actual_stream_id)) |
1083 | 0 | .unwrap(); |
1084 | 0 | debug_assert!(stream.reset.is_none()); |
1085 | | |
1086 | 0 | TOTAL_BYTES_RECEIVED.fetch_add(u64::try_from(message.len()).unwrap(), Ordering::Relaxed); |
1087 | 0 |
|
1088 | 0 | // Ignore empty message to avoid all sorts of problems. |
1089 | 0 | if message.is_empty() { |
1090 | 0 | return; |
1091 | 0 | } |
1092 | 0 |
|
1093 | 0 | // There is unfortunately no way to instruct the browser to back-pressure connections to |
1094 | 0 | // remotes. |
1095 | 0 | // |
1096 | 0 | // In order to avoid DoS attacks, we refuse to buffer more than a certain amount of data per |
1097 | 0 | // connection. This limit is completely arbitrary, and this is in no way a robust solution |
1098 | 0 | // because this limit isn't in sync with any other part of the code. In other words, it could |
1099 | 0 | // be legitimate for the remote to buffer a large amount of data. |
1100 | 0 | // |
1101 | 0 | // This corner case is handled by discarding the messages that would go over the limit. While |
1102 | 0 | // this is not a great solution, going over that limit can be considered as a fault from the |
1103 | 0 | // remote, the same way as it would be a fault from the remote to forget to send some bytes, |
1104 | 0 | // and thus should be handled in a similar way by the higher level code. |
1105 | 0 | // |
1106 | 0 | // A better way to handle this would be to kill the connection abruptly. However, this would |
1107 | 0 | // add a lot of complex code in this module, and the effort is clearly not worth it for this |
1108 | 0 | // niche situation. |
1109 | 0 | // |
1110 | 0 | // While this problem is specific to browsers (Deno and NodeJS have ways to back-pressure |
1111 | 0 | // connections), we add this hack for all platforms, for consistency. If this limit is ever |
1112 | 0 | // reached, we want to be sure to detect it, even when testing on NodeJS or Deno. |
1113 | 0 | // |
1114 | 0 | // See <https://github.com/smol-dot/smoldot/issues/109>. |
1115 | 0 | // TODO: do this properly eventually ^ |
1116 | 0 | if stream.messages_queue_total_size >= 25 * 1024 * 1024 { |
1117 | 0 | return; |
1118 | 0 | } |
1119 | 0 |
|
1120 | 0 | stream.messages_queue_total_size += message.len(); |
1121 | 0 | stream.messages_queue.push_back(message.into_boxed_slice()); |
1122 | 0 | stream.something_happened.notify(usize::MAX); |
1123 | 0 | } |
1124 | | |
1125 | 0 | pub(crate) fn connection_stream_opened(connection_id: u32, stream_id: u32, outbound: u32) { |
1126 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1127 | 0 | let lock = &mut *lock; |
1128 | 0 |
|
1129 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1130 | | if let ConnectionInner::MultiStreamWebRtc { |
1131 | 0 | opened_substreams_to_pick_up, |
1132 | | .. |
1133 | 0 | } = &mut connection.inner |
1134 | | { |
1135 | 0 | let _prev_value = lock.streams.insert( |
1136 | 0 | (connection_id, Some(stream_id)), |
1137 | 0 | Stream { |
1138 | 0 | reset: None, |
1139 | 0 | messages_queue: VecDeque::with_capacity(8), |
1140 | 0 | messages_queue_total_size: 0, |
1141 | 0 | something_happened: event_listener::Event::new(), |
1142 | 0 | writable_bytes_extra: 0, |
1143 | 0 | }, |
1144 | 0 | ); |
1145 | 0 |
|
1146 | 0 | if _prev_value.is_some() { |
1147 | 0 | panic!("same stream_id used multiple times in connection_stream_opened") |
1148 | 0 | } |
1149 | 0 |
|
1150 | 0 | opened_substreams_to_pick_up.push_back(( |
1151 | 0 | stream_id, |
1152 | 0 | if outbound != 0 { |
1153 | 0 | SubstreamDirection::Outbound |
1154 | | } else { |
1155 | 0 | SubstreamDirection::Inbound |
1156 | | }, |
1157 | | )); |
1158 | | |
1159 | 0 | connection.something_happened.notify(usize::MAX); |
1160 | | } else { |
1161 | 0 | panic!() |
1162 | | } |
1163 | 0 | } |
1164 | | |
1165 | 0 | pub(crate) fn connection_reset(connection_id: u32, message: Vec<u8>) { |
1166 | 0 | let message = str::from_utf8(&message) |
1167 | 0 | .unwrap_or_else(|_| panic!("non-UTF-8 message")) |
1168 | 0 | .to_owned(); |
1169 | 0 |
|
1170 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1171 | 0 | let connection = lock.connections.get_mut(&connection_id).unwrap(); |
1172 | | |
1173 | 0 | let connection_handles_alive = match &connection.inner { |
1174 | 0 | ConnectionInner::SingleStreamMsNoiseYamux { .. } => 1, // TODO: I believe that this is correct but a bit confusing; might be helpful to refactor with an enum or something |
1175 | | ConnectionInner::MultiStreamWebRtc { |
1176 | 0 | connection_handles_alive, |
1177 | | .. |
1178 | | } |
1179 | | | ConnectionInner::MultiStreamUnknownHandshake { |
1180 | 0 | connection_handles_alive, |
1181 | | .. |
1182 | 0 | } => *connection_handles_alive, |
1183 | 0 | ConnectionInner::Reset { .. } => unreachable!(), |
1184 | | }; |
1185 | | |
1186 | 0 | connection.inner = ConnectionInner::Reset { |
1187 | 0 | connection_handles_alive, |
1188 | 0 | _message: message.clone(), |
1189 | 0 | }; |
1190 | 0 |
|
1191 | 0 | connection.something_happened.notify(usize::MAX); |
1192 | | |
1193 | 0 | for ((_, _), stream) in lock |
1194 | 0 | .streams |
1195 | 0 | .range_mut((connection_id, Some(u32::MIN))..=(connection_id, Some(u32::MAX))) |
1196 | 0 | { |
1197 | 0 | stream.reset = Some(message.clone()); |
1198 | 0 | stream.something_happened.notify(usize::MAX); |
1199 | 0 | } |
1200 | 0 | if let Some(stream) = lock.streams.get_mut(&(connection_id, None)) { |
1201 | 0 | stream.reset = Some(message); |
1202 | 0 | stream.something_happened.notify(usize::MAX); |
1203 | 0 | } |
1204 | 0 | } |
1205 | | |
1206 | 0 | pub(crate) fn stream_reset(connection_id: u32, stream_id: u32, message: Vec<u8>) { |
1207 | 0 | let message: String = str::from_utf8(&message) |
1208 | 0 | .unwrap_or_else(|_| panic!("non-UTF-8 message")) |
1209 | 0 | .to_owned(); |
1210 | 0 |
|
1211 | 0 | // Note that, as documented, it is illegal to call this function on single-stream substreams. |
1212 | 0 | // We can thus assume that the `stream_id` is valid. |
1213 | 0 | let mut lock = STATE.try_lock().unwrap(); |
1214 | 0 | let stream = lock |
1215 | 0 | .streams |
1216 | 0 | .get_mut(&(connection_id, Some(stream_id))) |
1217 | 0 | .unwrap(); |
1218 | 0 | stream.reset = Some(message); |
1219 | 0 | stream.something_happened.notify(usize::MAX); |
1220 | 0 | } |