/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/multi_stream.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | // TODO: needs docs |
19 | | |
20 | | use super::{ |
21 | | super::super::read_write::ReadWrite, substream, Config, Event, SubstreamId, SubstreamIdInner, |
22 | | }; |
23 | | use crate::{libp2p::connection::webrtc_framing, util}; |
24 | | |
25 | | use alloc::{collections::VecDeque, string::String, vec::Vec}; |
26 | | use core::{ |
27 | | fmt, |
28 | | hash::Hash, |
29 | | ops::{Add, Index, IndexMut, Sub}, |
30 | | time::Duration, |
31 | | }; |
32 | | use rand_chacha::rand_core::{RngCore as _, SeedableRng as _}; |
33 | | |
34 | | pub use substream::InboundTy; |
35 | | |
36 | | /// State machine of a fully-established connection where substreams are handled externally. |
37 | | pub struct MultiStream<TNow, TSubId, TSubUd> { |
38 | | /// Events that should be yielded from [`MultiStream::pull_event`]. |
39 | | pending_events: VecDeque<Event<TSubUd>>, |
40 | | |
41 | | /// List of all open substreams, both inbound and outbound. |
42 | | /// |
43 | | /// There are two substreams namespaces: "out substreams", used for API purposes when it comes |
44 | | /// to notifications and requests, and "in substreams", used for API purposes when it comes to |
45 | | /// raw data sent/received on a substream. When the user for example resets an "in substream", |
46 | | /// the "out substream" remains valid. |
47 | | in_substreams: hashbrown::HashMap<TSubId, Substream<TNow, TSubUd>, util::SipHasherBuild>, |
48 | | |
49 | | out_in_substreams_map: hashbrown::HashMap<u32, TSubId, fnv::FnvBuildHasher>, |
50 | | |
51 | | next_out_substream_id: u32, |
52 | | |
53 | | /// List of outgoing substreams that aren't opened yet. |
54 | | /// |
55 | | /// Every time an outgoing substream is opened, an item is pulled from this list. |
56 | | /// |
57 | | /// Does not include the ping substream. |
58 | | desired_out_substreams: VecDeque<Substream<TNow, TSubUd>>, |
59 | | |
60 | | /// Substream used for outgoing pings. |
61 | | /// |
62 | | /// Initially contains `None` as the substream for pings isn't opened yet. |
63 | | /// |
64 | | /// Because of the API of [`substream::Substream`] concerning pings, there is no need to |
65 | | /// handle situations where the substream fails to negotiate, as this is handled by making |
66 | | /// outgoing pings error. This substream is therefore constant. |
67 | | ping_substream: Option<TSubId>, |
68 | | /// When to start the next ping attempt. |
69 | | next_ping: TNow, |
70 | | /// Source of randomness to generate ping payloads. |
71 | | /// |
72 | | /// Note that we use ChaCha20 because the rest of the code base also uses ChaCha20. This avoids |
73 | | /// unnecessary code being included in the binary and reduces the binary size. |
74 | | ping_payload_randomness: rand_chacha::ChaCha20Rng, |
75 | | |
76 | | /// See [`Config::max_inbound_substreams`]. |
77 | | // TODO: not enforced at the moment |
78 | | _max_inbound_substreams: usize, |
79 | | /// See [`Config::max_protocol_name_len`]. |
80 | | max_protocol_name_len: usize, |
81 | | /// See [`Config::ping_protocol`]. |
82 | | ping_protocol: String, |
83 | | /// See [`Config::ping_interval`]. |
84 | | ping_interval: Duration, |
85 | | /// See [`Config::ping_timeout`]. |
86 | | ping_timeout: Duration, |
87 | | } |
88 | | |
89 | | struct Substream<TNow, TSubUd> { |
90 | | id: u32, |
91 | | /// Opaque data decided by the user. `None` if the substream doesn't exist on the API layer |
92 | | /// yet. |
93 | | user_data: Option<TSubUd>, |
94 | | /// Underlying state machine for the substream. Always `Some` while the substream is alive, |
95 | | /// and `None` if it has been reset. |
96 | | inner: Option<substream::Substream<TNow>>, |
97 | | /// State of the message frames. |
98 | | framing: webrtc_framing::WebRtcFraming, |
99 | | } |
100 | | |
101 | | const MAX_PENDING_EVENTS: usize = 4; |
102 | | |
103 | | impl<TNow, TSubId, TSubUd> MultiStream<TNow, TSubId, TSubUd> |
104 | | where |
105 | | TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord, |
106 | | TSubId: Clone + PartialEq + Eq + Hash, |
107 | | { |
108 | | /// Creates a new connection from the given configuration. |
109 | 0 | pub fn webrtc(config: Config<TNow>) -> MultiStream<TNow, TSubId, TSubUd> { |
110 | 0 | let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed); |
111 | 0 |
|
112 | 0 | MultiStream { |
113 | 0 | pending_events: { |
114 | 0 | // Note that the capacity is higher than `MAX_PENDING_EVENTS` because resetting |
115 | 0 | // substreams can unconditionally queue an event, and the API doesn't give the |
116 | 0 | // possibility to not reset a substream (as that would introduce too much |
117 | 0 | // complexity). For this reason, we reserve enough for the events that can happen |
118 | 0 | // by reading/writing substreams plus events that can happen by resetting |
119 | 0 | // substreams. |
120 | 0 | let cap = MAX_PENDING_EVENTS + config.substreams_capacity; |
121 | 0 | VecDeque::with_capacity(cap) |
122 | 0 | }, |
123 | 0 | in_substreams: hashbrown::HashMap::with_capacity_and_hasher( |
124 | 0 | config.substreams_capacity, |
125 | 0 | util::SipHasherBuild::new({ |
126 | 0 | let mut seed = [0; 16]; |
127 | 0 | randomness.fill_bytes(&mut seed); |
128 | 0 | seed |
129 | 0 | }), |
130 | 0 | ), |
131 | 0 | out_in_substreams_map: hashbrown::HashMap::with_capacity_and_hasher( |
132 | 0 | config.substreams_capacity, |
133 | 0 | Default::default(), |
134 | 0 | ), |
135 | 0 | next_out_substream_id: 0, |
136 | 0 | desired_out_substreams: VecDeque::with_capacity(config.substreams_capacity), |
137 | 0 | ping_substream: None, |
138 | 0 | next_ping: config.first_out_ping, |
139 | 0 | ping_payload_randomness: randomness, |
140 | 0 | _max_inbound_substreams: config.max_inbound_substreams, |
141 | 0 | max_protocol_name_len: config.max_protocol_name_len, |
142 | 0 | ping_protocol: config.ping_protocol, |
143 | 0 | ping_interval: config.ping_interval, |
144 | 0 | ping_timeout: config.ping_timeout, |
145 | 0 | } |
146 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE6webrtcBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE6webrtcCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE6webrtcBa_ |
147 | | |
148 | | /// Removes an event from the queue of events and returns it. |
149 | | /// |
150 | | /// This method should be called after [`MultiStream::substream_read_write`] or |
151 | | /// [`MultiStream::reset_substream`] is called. |
152 | 0 | pub fn pull_event(&mut self) -> Option<Event<TSubUd>> { |
153 | 0 | self.pending_events.pop_front() |
154 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE10pull_eventBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE10pull_eventCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE10pull_eventBa_ |
155 | | |
156 | | /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`]. |
157 | | /// |
158 | | /// The new value only applies to substreams opened after this function has been called. |
159 | 0 | pub fn set_max_protocol_name_len(&mut self, new_value: usize) { |
160 | 0 | self.max_protocol_name_len = new_value; |
161 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE25set_max_protocol_name_lenBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE25set_max_protocol_name_lenBa_ |
162 | | |
163 | | /// Returns the number of new outbound substreams that the state machine would like to see |
164 | | /// opened. |
165 | | /// |
166 | | /// This value doesn't change automatically over time but only after a call to |
167 | | /// [`MultiStream::substream_read_write`], [`MultiStream::add_substream`], |
168 | | /// [`MultiStream::reset_substream`], [`MultiStream::add_request`], or |
169 | | /// [`MultiStream::open_notifications_substream`]. |
170 | | /// |
171 | | /// Note that the user is expected to track the number of substreams that are currently being |
172 | | /// opened. For example, if this function returns 2 and there are already 2 substreams |
173 | | /// currently being opened, then there is no need to open any additional one. |
174 | 0 | pub fn desired_outbound_substreams(&self) -> u32 { |
175 | 0 | u32::try_from(self.desired_out_substreams.len()) |
176 | 0 | .unwrap_or(u32::MAX) |
177 | 0 | .saturating_add(if self.ping_substream.is_none() { 1 } else { 0 }) |
178 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE27desired_outbound_substreamsBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE27desired_outbound_substreamsCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE27desired_outbound_substreamsBa_ |
179 | | |
180 | | /// Notifies the state machine that a new substream has been opened. |
181 | | /// |
182 | | /// `outbound` indicates whether the substream has been opened by the remote (`false`) or |
183 | | /// locally (`true`). |
184 | | /// |
185 | | /// If `outbound` is `true`, then the value returned by |
186 | | /// [`MultiStream::desired_outbound_substreams`] will decrease by one. |
187 | | /// |
188 | | /// # Panic |
189 | | /// |
190 | | /// Panics if there already exists a substream with an identical identifier. |
191 | | /// |
192 | 0 | pub fn add_substream(&mut self, id: TSubId, outbound: bool) { |
193 | 0 | let substream = if !outbound { |
194 | 0 | let out_substream_id = self.next_out_substream_id; |
195 | 0 | self.next_out_substream_id += 1; |
196 | 0 |
|
197 | 0 | Substream { |
198 | 0 | id: out_substream_id, |
199 | 0 | inner: Some(substream::Substream::ingoing(self.max_protocol_name_len)), |
200 | 0 | user_data: None, |
201 | 0 | framing: webrtc_framing::WebRtcFraming::new(), |
202 | 0 | } |
203 | 0 | } else if self.ping_substream.is_none() { |
204 | 0 | let out_substream_id = self.next_out_substream_id; |
205 | 0 | self.next_out_substream_id += 1; |
206 | 0 |
|
207 | 0 | self.ping_substream = Some(id.clone()); |
208 | 0 |
|
209 | 0 | Substream { |
210 | 0 | id: out_substream_id, |
211 | 0 | inner: Some(substream::Substream::ping_out(self.ping_protocol.clone())), |
212 | 0 | user_data: None, |
213 | 0 | framing: webrtc_framing::WebRtcFraming::new(), |
214 | 0 | } |
215 | 0 | } else if let Some(desired) = self.desired_out_substreams.pop_front() { |
216 | 0 | desired |
217 | | } else { |
218 | | // TODO: reset the new substream |
219 | 0 | todo!() |
220 | | }; |
221 | | |
222 | 0 | let _prev_val = self.out_in_substreams_map.insert(substream.id, id.clone()); |
223 | 0 | debug_assert!(_prev_val.is_none()); |
224 | | |
225 | 0 | let previous_value = self.in_substreams.insert(id, substream); |
226 | 0 | if previous_value.is_some() { |
227 | | // There is already a substream with that identifier. This is forbidden by the API of |
228 | | // this function. |
229 | 0 | panic!() |
230 | 0 | } |
231 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE13add_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE13add_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE13add_substreamBa_ |
232 | | |
233 | | /// Immediately destroys the substream with the given identifier. |
234 | | /// |
235 | | /// The given identifier is now considered invalid by the state machine. |
236 | | /// |
237 | | /// # Panic |
238 | | /// |
239 | | /// Panics if there is no substream with that identifier. |
240 | | /// |
241 | 0 | pub fn reset_substream(&mut self, substream_id: &TSubId) { |
242 | 0 | let mut substream = self.in_substreams.remove(substream_id).unwrap(); |
243 | 0 | let _was_in = self.out_in_substreams_map.remove(&substream.id); |
244 | 0 | debug_assert!(_was_in.is_none()); |
245 | | |
246 | 0 | if Some(substream_id) == self.ping_substream.as_ref() { |
247 | 0 | self.ping_substream = None; |
248 | 0 | } |
249 | | |
250 | 0 | let maybe_event = substream.inner.unwrap().reset(); |
251 | 0 | if let Some(event) = maybe_event { |
252 | 0 | Self::on_substream_event( |
253 | 0 | &mut self.pending_events, |
254 | 0 | substream.id, |
255 | 0 | &mut substream.user_data, |
256 | 0 | event, |
257 | 0 | ); |
258 | 0 | } |
259 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE15reset_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE15reset_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE15reset_substreamBa_ |
260 | | |
261 | | /// Reads/writes data on the substream. |
262 | | /// |
263 | | /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead |
264 | | /// according to the state machine and its identifier is now invalid. If the reading or |
265 | | /// writing side of the substream was still open, then the user should reset that substream. |
266 | | /// |
267 | | /// This method will refuse to accept data if too many events are already queued. Use |
268 | | /// [`MultiStream::pull_event`] to empty the queue of events between calls to this method. |
269 | | /// |
270 | | /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and |
271 | | /// [`ReadWrite::write_bytes_queueable`] must always be `Some`. |
272 | | /// |
273 | | /// # Panic |
274 | | /// |
275 | | /// Panics if there is no substream with that identifier. |
276 | | /// Panics if this is a WebRTC connection, and the reading or writing side is closed. |
277 | | /// |
278 | | #[must_use] |
279 | 0 | pub fn substream_read_write( |
280 | 0 | &mut self, |
281 | 0 | substream_id: &TSubId, |
282 | 0 | read_write: &'_ mut ReadWrite<TNow>, |
283 | 0 | ) -> SubstreamFate { |
284 | 0 | let substream = self.in_substreams.get_mut(substream_id).unwrap(); |
285 | 0 |
|
286 | 0 | // In WebRTC, the reading and writing side is never closed. |
287 | 0 | assert!( |
288 | 0 | read_write.expected_incoming_bytes.is_some() |
289 | 0 | && read_write.write_bytes_queueable.is_some() |
290 | | ); |
291 | | |
292 | | // Reading/writing the ping substream is used to queue new outgoing pings. |
293 | 0 | if Some(substream_id) == self.ping_substream.as_ref() { |
294 | 0 | if read_write.now >= self.next_ping { |
295 | 0 | let mut payload = [0u8; 32]; |
296 | 0 | self.ping_payload_randomness.fill_bytes(&mut payload); |
297 | 0 | substream.inner.as_mut().unwrap().queue_ping( |
298 | 0 | &payload, |
299 | 0 | read_write.now.clone(), |
300 | 0 | self.ping_timeout, |
301 | 0 | ); |
302 | 0 | self.next_ping = read_write.now.clone() + self.ping_interval; |
303 | 0 | } |
304 | | |
305 | 0 | read_write.wake_up_after(&self.next_ping); |
306 | 0 | } |
307 | | |
308 | | // Don't process any more data before events are pulled. |
309 | 0 | if self.pending_events.len() >= MAX_PENDING_EVENTS { |
310 | 0 | return SubstreamFate::Continue; |
311 | 0 | } |
312 | | |
313 | | // Now process the substream. |
314 | 0 | let event = match substream.framing.read_write(read_write) { |
315 | 0 | Ok(mut framing) => { |
316 | 0 | let (substream_update, event) = |
317 | 0 | substream.inner.take().unwrap().read_write(&mut framing); |
318 | 0 | substream.inner = substream_update; |
319 | 0 | event |
320 | | } |
321 | 0 | Err(_) => substream.inner.take().unwrap().reset(), |
322 | | }; |
323 | | |
324 | 0 | if let Some(event) = event { |
325 | 0 | read_write.wake_up_asap(); |
326 | 0 | Self::on_substream_event( |
327 | 0 | &mut self.pending_events, |
328 | 0 | substream.id, |
329 | 0 | &mut substream.user_data, |
330 | 0 | event, |
331 | 0 | ) |
332 | 0 | } |
333 | | |
334 | | // The substream is `None` if it needs to be reset. |
335 | 0 | if substream.inner.is_none() { |
336 | 0 | if Some(substream_id) == self.ping_substream.as_ref() { |
337 | 0 | self.ping_substream = None; |
338 | 0 | } |
339 | 0 | self.out_in_substreams_map.remove(&substream.id); |
340 | 0 | self.in_substreams.remove(substream_id); |
341 | 0 | SubstreamFate::Reset |
342 | | } else { |
343 | 0 | SubstreamFate::Continue |
344 | | } |
345 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE20substream_read_writeBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE20substream_read_writeCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE20substream_read_writeBa_ |
346 | | |
347 | | /// Turns an event from the [`substream`] module into an [`Event`] and adds it to the queue. |
348 | 0 | fn on_substream_event( |
349 | 0 | pending_events: &mut VecDeque<Event<TSubUd>>, |
350 | 0 | substream_id: u32, |
351 | 0 | substream_user_data: &mut Option<TSubUd>, |
352 | 0 | event: substream::Event, |
353 | 0 | ) { |
354 | 0 | pending_events.push_back(match event { |
355 | | substream::Event::InboundError { |
356 | 0 | error, |
357 | 0 | was_accepted: false, |
358 | 0 | } => Event::InboundError(error), |
359 | | substream::Event::InboundError { |
360 | | was_accepted: true, .. |
361 | 0 | } => Event::InboundAcceptedCancel { |
362 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
363 | 0 | user_data: substream_user_data.take().unwrap(), |
364 | 0 | }, |
365 | 0 | substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated { |
366 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
367 | 0 | protocol_name, |
368 | 0 | }, |
369 | 0 | substream::Event::InboundNegotiatedCancel => Event::InboundAcceptedCancel { |
370 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
371 | 0 | user_data: substream_user_data.take().unwrap(), |
372 | 0 | }, |
373 | 0 | substream::Event::RequestIn { request } => Event::RequestIn { |
374 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
375 | 0 | request, |
376 | 0 | }, |
377 | 0 | substream::Event::Response { response } => Event::Response { |
378 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
379 | 0 | response, |
380 | 0 | user_data: substream_user_data.take().unwrap(), |
381 | 0 | }, |
382 | 0 | substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen { |
383 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
384 | 0 | handshake, |
385 | 0 | }, |
386 | 0 | substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel { |
387 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
388 | 0 | }, |
389 | 0 | substream::Event::NotificationIn { notification } => Event::NotificationIn { |
390 | 0 | notification, |
391 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
392 | 0 | }, |
393 | 0 | substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose { |
394 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
395 | 0 | outcome, |
396 | 0 | user_data: substream_user_data.take().unwrap(), |
397 | 0 | }, |
398 | 0 | substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult { |
399 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
400 | 0 | result: match result { |
401 | 0 | Ok(r) => Ok(r), |
402 | 0 | Err(err) => Err((err, substream_user_data.take().unwrap())), |
403 | | }, |
404 | | }, |
405 | | substream::Event::NotificationsOutCloseDemanded => { |
406 | 0 | Event::NotificationsOutCloseDemanded { |
407 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
408 | 0 | } |
409 | | } |
410 | 0 | substream::Event::NotificationsOutReset => Event::NotificationsOutReset { |
411 | 0 | id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)), |
412 | 0 | user_data: substream_user_data.take().unwrap(), |
413 | 0 | }, |
414 | 0 | substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time }, |
415 | | substream::Event::PingOutError { .. } => { |
416 | | // Because ping events are automatically generated by the external API without any |
417 | | // guarantee, it is safe to merge multiple failed pings into one. |
418 | 0 | Event::PingOutFailed |
419 | | } |
420 | | }); |
421 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18on_substream_eventBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE18on_substream_eventCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18on_substream_eventBa_ |
422 | | |
423 | | /// Sends a request to the remote. |
424 | | /// |
425 | | /// This method only inserts the request into the connection object. The request will later |
426 | | /// be sent out through [`MultiStream::substream_read_write`]. |
427 | | /// |
428 | | /// Assuming that the remote is using the same implementation, an [`Event::RequestIn`] will |
429 | | /// be generated on its side. |
430 | | /// |
431 | | /// If `request` is `None`, then no request is sent to the remote at all. If `request` is |
432 | | /// `Some`, then a (potentially-empty) request is sent. If `Some(&[])` is provided, a |
433 | | /// length-prefix containing a 0 is sent to the remote. |
434 | | /// |
435 | | /// After the remote has sent back a response, an [`Event::Response`] event will be generated |
436 | | /// locally. The `user_data` parameter will be passed back. |
437 | | /// |
438 | | /// The timeout is the time between the moment the substream is opened and the moment the |
439 | | /// response is sent back. If the emitter doesn't send the request or if the receiver doesn't |
440 | | /// answer during this time window, the request is considered failed. |
441 | 0 | pub fn add_request( |
442 | 0 | &mut self, |
443 | 0 | protocol_name: String, |
444 | 0 | request: Option<Vec<u8>>, |
445 | 0 | timeout: TNow, |
446 | 0 | max_response_size: usize, |
447 | 0 | user_data: TSubUd, |
448 | 0 | ) -> SubstreamId { |
449 | 0 | let substream_id = self.next_out_substream_id; |
450 | 0 | self.next_out_substream_id += 1; |
451 | 0 |
|
452 | 0 | self.desired_out_substreams.push_back(Substream { |
453 | 0 | id: substream_id, |
454 | 0 | inner: Some(substream::Substream::request_out( |
455 | 0 | protocol_name, |
456 | 0 | timeout, |
457 | 0 | request, |
458 | 0 | max_response_size, |
459 | 0 | )), |
460 | 0 | user_data: Some(user_data), |
461 | 0 | framing: webrtc_framing::WebRtcFraming::new(), |
462 | 0 | }); |
463 | 0 |
|
464 | 0 | // TODO: ? do this? substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size |
465 | 0 |
|
466 | 0 | SubstreamId(SubstreamIdInner::MultiStream(substream_id)) |
467 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE11add_requestBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE11add_requestBa_ |
468 | | |
469 | | /// Returns the user data associated to a notifications substream. |
470 | | /// |
471 | | /// Returns `None` if the substream doesn't exist or isn't a notifications substream. |
472 | 0 | pub fn notifications_substream_user_data_mut( |
473 | 0 | &mut self, |
474 | 0 | id: SubstreamId, |
475 | 0 | ) -> Option<&mut TSubUd> { |
476 | 0 | let id = match id.0 { |
477 | 0 | SubstreamIdInner::MultiStream(id) => id, |
478 | 0 | _ => return None, |
479 | | }; |
480 | | |
481 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&id).unwrap(); |
482 | 0 |
|
483 | 0 | self.in_substreams |
484 | 0 | .get_mut(inner_substream_id) |
485 | 0 | .unwrap() |
486 | 0 | .user_data |
487 | 0 | .as_mut() |
488 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE37notifications_substream_user_data_mutBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE37notifications_substream_user_data_mutBa_ |
489 | | |
490 | | /// Opens a outgoing substream with the given protocol, destined for a stream of |
491 | | /// notifications. |
492 | | /// |
493 | | /// The remote must first accept (or reject) the substream before notifications can be sent |
494 | | /// on it. |
495 | | /// |
496 | | /// This method only inserts the opening handshake into the connection object. The handshake |
497 | | /// will later be sent out through [`MultiStream::substream_read_write`]. |
498 | | /// |
499 | | /// Assuming that the remote is using the same implementation, an |
500 | | /// [`Event::NotificationsInOpen`] will be generated on its side. |
501 | | /// |
502 | 0 | pub fn open_notifications_substream( |
503 | 0 | &mut self, |
504 | 0 | protocol_name: String, |
505 | 0 | max_handshake_size: usize, |
506 | 0 | handshake: Vec<u8>, |
507 | 0 | timeout: TNow, |
508 | 0 | user_data: TSubUd, |
509 | 0 | ) -> SubstreamId { |
510 | 0 | let substream_id = self.next_out_substream_id; |
511 | 0 | self.next_out_substream_id += 1; |
512 | 0 |
|
513 | 0 | self.desired_out_substreams.push_back(Substream { |
514 | 0 | id: substream_id, |
515 | 0 | inner: Some(substream::Substream::notifications_out( |
516 | 0 | timeout, |
517 | 0 | protocol_name, |
518 | 0 | handshake, |
519 | 0 | max_handshake_size, |
520 | 0 | )), |
521 | 0 | user_data: Some(user_data), |
522 | 0 | framing: webrtc_framing::WebRtcFraming::new(), |
523 | 0 | }); |
524 | 0 |
|
525 | 0 | SubstreamId(SubstreamIdInner::MultiStream(substream_id)) |
526 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28open_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28open_notifications_substreamBa_ |
527 | | |
528 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol |
529 | | /// name and indicate the type of the protocol. |
530 | | /// |
531 | | /// # Panic |
532 | | /// |
533 | | /// Panics if the substream is not in the correct state. |
534 | | /// |
535 | 0 | pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) { |
536 | 0 | let substream_id = match substream_id.0 { |
537 | 0 | SubstreamIdInner::MultiStream(id) => id, |
538 | 0 | _ => panic!(), |
539 | | }; |
540 | | |
541 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
542 | 0 |
|
543 | 0 | let substream = self.in_substreams.get_mut(inner_substream_id).unwrap(); |
544 | 0 | substream.inner.as_mut().unwrap().accept_inbound(ty); |
545 | 0 | debug_assert!(substream.user_data.is_none()); |
546 | 0 | substream.user_data = Some(user_data); |
547 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14accept_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14accept_inboundBa_ |
548 | | |
549 | | /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the |
550 | | /// protocol name as not supported. |
551 | | /// |
552 | | /// # Panic |
553 | | /// |
554 | | /// Panics if the substream is not in the correct state. |
555 | | /// |
556 | 0 | pub fn reject_inbound(&mut self, substream_id: SubstreamId) { |
557 | 0 | let substream_id = match substream_id.0 { |
558 | 0 | SubstreamIdInner::MultiStream(id) => id, |
559 | 0 | _ => panic!(), |
560 | | }; |
561 | | |
562 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
563 | 0 |
|
564 | 0 | self.in_substreams |
565 | 0 | .get_mut(inner_substream_id) |
566 | 0 | .unwrap() |
567 | 0 | .inner |
568 | 0 | .as_mut() |
569 | 0 | .unwrap() |
570 | 0 | .reject_inbound(); |
571 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14reject_inboundBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14reject_inboundBa_ |
572 | | |
573 | | /// Accepts an inbound notifications protocol. Must be called in response to a |
574 | | /// [`Event::NotificationsInOpen`]. |
575 | | /// |
576 | | /// # Panic |
577 | | /// |
578 | | /// Panics if the substream id is not valid or the substream is of the wrong type. |
579 | | /// |
580 | 0 | pub fn accept_in_notifications_substream( |
581 | 0 | &mut self, |
582 | 0 | substream_id: SubstreamId, |
583 | 0 | handshake: Vec<u8>, |
584 | 0 | max_notification_size: usize, |
585 | 0 | ) { |
586 | 0 | let substream_id = match substream_id.0 { |
587 | 0 | SubstreamIdInner::MultiStream(id) => id, |
588 | 0 | _ => panic!(), |
589 | | }; |
590 | | |
591 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
592 | 0 |
|
593 | 0 | self.in_substreams |
594 | 0 | .get_mut(inner_substream_id) |
595 | 0 | .unwrap() |
596 | 0 | .inner |
597 | 0 | .as_mut() |
598 | 0 | .unwrap() |
599 | 0 | .accept_in_notifications_substream(handshake, max_notification_size); |
600 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33accept_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33accept_in_notifications_substreamBa_ |
601 | | |
602 | | /// Rejects an inbound notifications protocol. Must be called in response to a |
603 | | /// [`Event::NotificationsInOpen`]. |
604 | | /// |
605 | | /// # Panic |
606 | | /// |
607 | | /// Panics if the substream id is not valid or the substream is of the wrong type. |
608 | | /// |
609 | 0 | pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) { |
610 | 0 | let substream_id = match substream_id.0 { |
611 | 0 | SubstreamIdInner::MultiStream(id) => id, |
612 | 0 | _ => panic!(), |
613 | | }; |
614 | | |
615 | | // TODO: can panic if pending event hasn't been processed |
616 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
617 | 0 |
|
618 | 0 | self.in_substreams |
619 | 0 | .get_mut(inner_substream_id) |
620 | 0 | .unwrap() |
621 | 0 | .inner |
622 | 0 | .as_mut() |
623 | 0 | .unwrap() |
624 | 0 | .reject_in_notifications_substream(); |
625 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33reject_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33reject_in_notifications_substreamBa_ |
626 | | |
627 | | /// Queues a notification to be written out on the given substream. |
628 | | /// |
629 | | /// # About back-pressure |
630 | | /// |
631 | | /// This method unconditionally queues up data. You must be aware that the remote, however, |
632 | | /// can decide to delay indefinitely the sending of that data, which can potentially lead to |
633 | | /// an unbounded increase in memory. |
634 | | /// |
635 | | /// As such, you are encouraged to call this method only if the amount of queued data (as |
636 | | /// determined by calling [`MultiStream::notification_substream_queued_bytes`]) is below a |
637 | | /// certain threshold. If above, the notification should be silently discarded. |
638 | | /// |
639 | | /// # Panic |
640 | | /// |
641 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
642 | | /// notifications substream isn't in the appropriate state. |
643 | | /// |
644 | 0 | pub fn write_notification_unbounded( |
645 | 0 | &mut self, |
646 | 0 | substream_id: SubstreamId, |
647 | 0 | notification: Vec<u8>, |
648 | 0 | ) { |
649 | 0 | let substream_id = match substream_id.0 { |
650 | 0 | SubstreamIdInner::MultiStream(id) => id, |
651 | 0 | _ => panic!(), |
652 | | }; |
653 | | |
654 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
655 | 0 |
|
656 | 0 | self.in_substreams |
657 | 0 | .get_mut(inner_substream_id) |
658 | 0 | .unwrap() |
659 | 0 | .inner |
660 | 0 | .as_mut() |
661 | 0 | .unwrap() |
662 | 0 | .write_notification_unbounded(notification); |
663 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28write_notification_unboundedBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28write_notification_unboundedBa_ |
664 | | |
665 | | /// Returns the number of bytes waiting to be sent out on that substream. |
666 | | /// |
667 | | /// See the documentation of [`MultiStream::write_notification_unbounded`] for context. |
668 | | /// |
669 | | /// # Panic |
670 | | /// |
671 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
672 | | /// notifications substream isn't in the appropriate state. |
673 | | /// |
674 | 0 | pub fn notification_substream_queued_bytes(&self, substream_id: SubstreamId) -> usize { |
675 | 0 | let substream_id = match substream_id.0 { |
676 | 0 | SubstreamIdInner::MultiStream(id) => id, |
677 | 0 | _ => panic!(), |
678 | | }; |
679 | | |
680 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
681 | 0 |
|
682 | 0 | self.in_substreams |
683 | 0 | .get(inner_substream_id) |
684 | 0 | .unwrap() |
685 | 0 | .inner |
686 | 0 | .as_ref() |
687 | 0 | .unwrap() |
688 | 0 | .notification_substream_queued_bytes() |
689 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE35notification_substream_queued_bytesBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE35notification_substream_queued_bytesBa_ |
690 | | |
691 | | /// Closes a notifications substream opened after a successful |
692 | | /// [`Event::NotificationsOutResult`]. |
693 | | /// |
694 | | /// This can be done even when in the negotiation phase, in other words before the remote has |
695 | | /// accepted/refused the substream. |
696 | | /// |
697 | | /// # Panic |
698 | | /// |
699 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
700 | | /// notifications substream isn't in the appropriate state. |
701 | | /// |
702 | 0 | pub fn close_out_notifications_substream(&mut self, substream_id: SubstreamId) { |
703 | 0 | let substream_id = match substream_id.0 { |
704 | 0 | SubstreamIdInner::MultiStream(id) => id, |
705 | 0 | _ => panic!(), |
706 | | }; |
707 | | |
708 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
709 | 0 |
|
710 | 0 | self.in_substreams |
711 | 0 | .get_mut(inner_substream_id) |
712 | 0 | .unwrap() |
713 | 0 | .inner |
714 | 0 | .as_mut() |
715 | 0 | .unwrap() |
716 | 0 | .close_out_notifications_substream(); |
717 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33close_out_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33close_out_notifications_substreamBa_ |
718 | | |
719 | | /// Closes a notifications substream that was accepted using |
720 | | /// [`MultiStream::accept_in_notifications_substream`]. |
721 | | /// |
722 | | /// # Panic |
723 | | /// |
724 | | /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the |
725 | | /// notifications substream isn't in the appropriate state. |
726 | | /// |
727 | 0 | pub fn close_in_notifications_substream(&mut self, substream_id: SubstreamId, timeout: TNow) { |
728 | 0 | let substream_id = match substream_id.0 { |
729 | 0 | SubstreamIdInner::MultiStream(id) => id, |
730 | 0 | _ => panic!(), |
731 | | }; |
732 | | |
733 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
734 | 0 |
|
735 | 0 | self.in_substreams |
736 | 0 | .get_mut(inner_substream_id) |
737 | 0 | .unwrap() |
738 | 0 | .inner |
739 | 0 | .as_mut() |
740 | 0 | .unwrap() |
741 | 0 | .close_in_notifications_substream(timeout); |
742 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE32close_in_notifications_substreamBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE32close_in_notifications_substreamBa_ |
743 | | |
744 | | /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`]. |
745 | | /// |
746 | | /// Returns an error if the [`SubstreamId`] is invalid. |
747 | 0 | pub fn respond_in_request( |
748 | 0 | &mut self, |
749 | 0 | substream_id: SubstreamId, |
750 | 0 | response: Result<Vec<u8>, ()>, |
751 | 0 | ) -> Result<(), substream::RespondInRequestError> { |
752 | 0 | let substream_id = match substream_id.0 { |
753 | 0 | SubstreamIdInner::MultiStream(id) => id, |
754 | 0 | _ => return Err(substream::RespondInRequestError::SubstreamClosed), |
755 | | }; |
756 | | |
757 | 0 | let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
758 | 0 |
|
759 | 0 | self.in_substreams |
760 | 0 | .get_mut(inner_substream_id) |
761 | 0 | .ok_or(substream::RespondInRequestError::SubstreamClosed)? |
762 | | .inner |
763 | 0 | .as_mut() |
764 | 0 | .unwrap() |
765 | 0 | .respond_in_request(response) |
766 | 0 | } Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18respond_in_requestBa_ Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18respond_in_requestBa_ |
767 | | } |
768 | | |
769 | | impl<TNow, TSubId, TSubUd> Index<SubstreamId> for MultiStream<TNow, TSubId, TSubUd> |
770 | | where |
771 | | TSubId: Clone + PartialEq + Eq + Hash, |
772 | | { |
773 | | type Output = TSubUd; |
774 | | |
775 | 0 | fn index(&self, substream_id: SubstreamId) -> &Self::Output { |
776 | 0 | let substream_id = match substream_id.0 { |
777 | 0 | SubstreamIdInner::MultiStream(id) => id, |
778 | 0 | _ => panic!(), |
779 | | }; |
780 | | |
781 | 0 | let inner_sub_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
782 | 0 |
|
783 | 0 | self.in_substreams |
784 | 0 | .get(inner_sub_id) |
785 | 0 | .unwrap() |
786 | 0 | .user_data |
787 | 0 | .as_ref() |
788 | 0 | .unwrap() |
789 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_ Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB4_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1F_6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB1F_3ops5index5IndexNtB6_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_ |
790 | | } |
791 | | |
792 | | impl<TNow, TSubId, TSubUd> IndexMut<SubstreamId> for MultiStream<TNow, TSubId, TSubUd> |
793 | | where |
794 | | TSubId: Clone + PartialEq + Eq + Hash, |
795 | | { |
796 | 0 | fn index_mut(&mut self, substream_id: SubstreamId) -> &mut Self::Output { |
797 | 0 | let substream_id = match substream_id.0 { |
798 | 0 | SubstreamIdInner::MultiStream(id) => id, |
799 | 0 | _ => panic!(), |
800 | | }; |
801 | | |
802 | 0 | let inner_sub_id = self.out_in_substreams_map.get(&substream_id).unwrap(); |
803 | 0 |
|
804 | 0 | self.in_substreams |
805 | 0 | .get_mut(inner_sub_id) |
806 | 0 | .unwrap() |
807 | 0 | .user_data |
808 | 0 | .as_mut() |
809 | 0 | .unwrap() |
810 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams0_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_ Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams0_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_ |
811 | | } |
812 | | |
813 | | impl<TNow, TSubId, TSubUd> fmt::Debug for MultiStream<TNow, TSubId, TSubUd> { |
814 | 0 | fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { |
815 | 0 | f.debug_tuple("Established").finish() |
816 | 0 | } Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams1_0pppEINtB5_11MultiStreampppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams1_0pppEINtB5_11MultiStreampppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_ |
817 | | } |
818 | | |
819 | | /// Whether a substream should remain open or be killed. |
820 | | #[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)] |
821 | | pub enum SubstreamFate { |
822 | | /// Substream remains open. |
823 | | Continue, |
824 | | /// Substream is now considered dead and has been removed from the state machine. Its |
825 | | /// identifier is now invalid. |
826 | | Reset, |
827 | | } |