Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/multi_stream.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
// TODO: needs docs
19
20
use super::{
21
    super::super::read_write::ReadWrite, substream, Config, Event, SubstreamId, SubstreamIdInner,
22
};
23
use crate::{libp2p::connection::webrtc_framing, util};
24
25
use alloc::{collections::VecDeque, string::String, vec::Vec};
26
use core::{
27
    fmt,
28
    hash::Hash,
29
    ops::{Add, Index, IndexMut, Sub},
30
    time::Duration,
31
};
32
use rand_chacha::rand_core::{RngCore as _, SeedableRng as _};
33
34
pub use substream::InboundTy;
35
36
/// State machine of a fully-established connection where substreams are handled externally.
37
pub struct MultiStream<TNow, TSubId, TSubUd> {
38
    /// Events that should be yielded from [`MultiStream::pull_event`].
39
    pending_events: VecDeque<Event<TSubUd>>,
40
41
    /// List of all open substreams, both inbound and outbound.
42
    ///
43
    /// There are two substreams namespaces: "out substreams", used for API purposes when it comes
44
    /// to notifications and requests, and "in substreams", used for API purposes when it comes to
45
    /// raw data sent/received on a substream. When the user for example resets an "in substream",
46
    /// the "out substream" remains valid.
47
    in_substreams: hashbrown::HashMap<TSubId, Substream<TNow, TSubUd>, util::SipHasherBuild>,
48
49
    out_in_substreams_map: hashbrown::HashMap<u32, TSubId, fnv::FnvBuildHasher>,
50
51
    next_out_substream_id: u32,
52
53
    /// List of outgoing substreams that aren't opened yet.
54
    ///
55
    /// Every time an outgoing substream is opened, an item is pulled from this list.
56
    ///
57
    /// Does not include the ping substream.
58
    desired_out_substreams: VecDeque<Substream<TNow, TSubUd>>,
59
60
    /// Substream used for outgoing pings.
61
    ///
62
    /// Initially contains `None` as the substream for pings isn't opened yet.
63
    ///
64
    /// Because of the API of [`substream::Substream`] concerning pings, there is no need to
65
    /// handle situations where the substream fails to negotiate, as this is handled by making
66
    /// outgoing pings error. This substream is therefore constant.
67
    ping_substream: Option<TSubId>,
68
    /// When to start the next ping attempt.
69
    next_ping: TNow,
70
    /// Source of randomness to generate ping payloads.
71
    ///
72
    /// Note that we use ChaCha20 because the rest of the code base also uses ChaCha20. This avoids
73
    /// unnecessary code being included in the binary and reduces the binary size.
74
    ping_payload_randomness: rand_chacha::ChaCha20Rng,
75
76
    /// See [`Config::max_inbound_substreams`].
77
    // TODO: not enforced at the moment
78
    _max_inbound_substreams: usize,
79
    /// See [`Config::max_protocol_name_len`].
80
    max_protocol_name_len: usize,
81
    /// See [`Config::ping_protocol`].
82
    ping_protocol: String,
83
    /// See [`Config::ping_interval`].
84
    ping_interval: Duration,
85
    /// See [`Config::ping_timeout`].
86
    ping_timeout: Duration,
87
}
88
89
struct Substream<TNow, TSubUd> {
90
    id: u32,
91
    /// Opaque data decided by the user. `None` if the substream doesn't exist on the API layer
92
    /// yet.
93
    user_data: Option<TSubUd>,
94
    /// Underlying state machine for the substream. Always `Some` while the substream is alive,
95
    /// and `None` if it has been reset.
96
    inner: Option<substream::Substream<TNow>>,
97
    /// State of the message frames.
98
    framing: webrtc_framing::WebRtcFraming,
99
}
100
101
const MAX_PENDING_EVENTS: usize = 4;
102
103
impl<TNow, TSubId, TSubUd> MultiStream<TNow, TSubId, TSubUd>
104
where
105
    TNow: Clone + Add<Duration, Output = TNow> + Sub<TNow, Output = Duration> + Ord,
106
    TSubId: Clone + PartialEq + Eq + Hash,
107
{
108
    /// Creates a new connection from the given configuration.
109
0
    pub fn webrtc(config: Config<TNow>) -> MultiStream<TNow, TSubId, TSubUd> {
110
0
        let mut randomness = rand_chacha::ChaCha20Rng::from_seed(config.randomness_seed);
111
0
112
0
        MultiStream {
113
0
            pending_events: {
114
0
                // Note that the capacity is higher than `MAX_PENDING_EVENTS` because resetting
115
0
                // substreams can unconditionally queue an event, and the API doesn't give the
116
0
                // possibility to not reset a substream (as that would introduce too much
117
0
                // complexity). For this reason, we reserve enough for the events that can happen
118
0
                // by reading/writing substreams plus events that can happen by resetting
119
0
                // substreams.
120
0
                let cap = MAX_PENDING_EVENTS + config.substreams_capacity;
121
0
                VecDeque::with_capacity(cap)
122
0
            },
123
0
            in_substreams: hashbrown::HashMap::with_capacity_and_hasher(
124
0
                config.substreams_capacity,
125
0
                util::SipHasherBuild::new({
126
0
                    let mut seed = [0; 16];
127
0
                    randomness.fill_bytes(&mut seed);
128
0
                    seed
129
0
                }),
130
0
            ),
131
0
            out_in_substreams_map: hashbrown::HashMap::with_capacity_and_hasher(
132
0
                config.substreams_capacity,
133
0
                Default::default(),
134
0
            ),
135
0
            next_out_substream_id: 0,
136
0
            desired_out_substreams: VecDeque::with_capacity(config.substreams_capacity),
137
0
            ping_substream: None,
138
0
            next_ping: config.first_out_ping,
139
0
            ping_payload_randomness: randomness,
140
0
            _max_inbound_substreams: config.max_inbound_substreams,
141
0
            max_protocol_name_len: config.max_protocol_name_len,
142
0
            ping_protocol: config.ping_protocol,
143
0
            ping_interval: config.ping_interval,
144
0
            ping_timeout: config.ping_timeout,
145
0
        }
146
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE6webrtcBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE6webrtcCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE6webrtcBa_
147
148
    /// Removes an event from the queue of events and returns it.
149
    ///
150
    /// This method should be called after [`MultiStream::substream_read_write`] or
151
    /// [`MultiStream::reset_substream`] is called.
152
0
    pub fn pull_event(&mut self) -> Option<Event<TSubUd>> {
153
0
        self.pending_events.pop_front()
154
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE10pull_eventBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE10pull_eventCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE10pull_eventBa_
155
156
    /// Modifies the value that was initially passed through [`Config::max_protocol_name_len`].
157
    ///
158
    /// The new value only applies to substreams opened after this function has been called.
159
0
    pub fn set_max_protocol_name_len(&mut self, new_value: usize) {
160
0
        self.max_protocol_name_len = new_value;
161
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE25set_max_protocol_name_lenBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE25set_max_protocol_name_lenCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE25set_max_protocol_name_lenBa_
162
163
    /// Returns the number of new outbound substreams that the state machine would like to see
164
    /// opened.
165
    ///
166
    /// This value doesn't change automatically over time but only after a call to
167
    /// [`MultiStream::substream_read_write`], [`MultiStream::add_substream`],
168
    /// [`MultiStream::reset_substream`], [`MultiStream::add_request`], or
169
    /// [`MultiStream::open_notifications_substream`].
170
    ///
171
    /// Note that the user is expected to track the number of substreams that are currently being
172
    /// opened. For example, if this function returns 2 and there are already 2 substreams
173
    /// currently being opened, then there is no need to open any additional one.
174
0
    pub fn desired_outbound_substreams(&self) -> u32 {
175
0
        u32::try_from(self.desired_out_substreams.len())
176
0
            .unwrap_or(u32::MAX)
177
0
            .saturating_add(if self.ping_substream.is_none() { 1 } else { 0 })
178
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE27desired_outbound_substreamsBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE27desired_outbound_substreamsCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE27desired_outbound_substreamsBa_
179
180
    /// Notifies the state machine that a new substream has been opened.
181
    ///
182
    /// `outbound` indicates whether the substream has been opened by the remote (`false`) or
183
    /// locally (`true`).
184
    ///
185
    /// If `outbound` is `true`, then the value returned by
186
    /// [`MultiStream::desired_outbound_substreams`] will decrease by one.
187
    ///
188
    /// # Panic
189
    ///
190
    /// Panics if there already exists a substream with an identical identifier.
191
    ///
192
0
    pub fn add_substream(&mut self, id: TSubId, outbound: bool) {
193
0
        let substream = if !outbound {
194
0
            let out_substream_id = self.next_out_substream_id;
195
0
            self.next_out_substream_id += 1;
196
0
197
0
            Substream {
198
0
                id: out_substream_id,
199
0
                inner: Some(substream::Substream::ingoing(self.max_protocol_name_len)),
200
0
                user_data: None,
201
0
                framing: webrtc_framing::WebRtcFraming::new(),
202
0
            }
203
0
        } else if self.ping_substream.is_none() {
204
0
            let out_substream_id = self.next_out_substream_id;
205
0
            self.next_out_substream_id += 1;
206
0
207
0
            self.ping_substream = Some(id.clone());
208
0
209
0
            Substream {
210
0
                id: out_substream_id,
211
0
                inner: Some(substream::Substream::ping_out(self.ping_protocol.clone())),
212
0
                user_data: None,
213
0
                framing: webrtc_framing::WebRtcFraming::new(),
214
0
            }
215
0
        } else if let Some(desired) = self.desired_out_substreams.pop_front() {
216
0
            desired
217
        } else {
218
            // TODO: reset the new substream
219
0
            todo!()
220
        };
221
222
0
        let _prev_val = self.out_in_substreams_map.insert(substream.id, id.clone());
223
0
        debug_assert!(_prev_val.is_none());
224
225
0
        let previous_value = self.in_substreams.insert(id, substream);
226
0
        if previous_value.is_some() {
227
            // There is already a substream with that identifier. This is forbidden by the API of
228
            // this function.
229
0
            panic!()
230
0
        }
231
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE13add_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE13add_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE13add_substreamBa_
232
233
    /// Immediately destroys the substream with the given identifier.
234
    ///
235
    /// The given identifier is now considered invalid by the state machine.
236
    ///
237
    /// # Panic
238
    ///
239
    /// Panics if there is no substream with that identifier.
240
    ///
241
0
    pub fn reset_substream(&mut self, substream_id: &TSubId) {
242
0
        let mut substream = self.in_substreams.remove(substream_id).unwrap();
243
0
        let _was_in = self.out_in_substreams_map.remove(&substream.id);
244
0
        debug_assert!(_was_in.is_none());
245
246
0
        if Some(substream_id) == self.ping_substream.as_ref() {
247
0
            self.ping_substream = None;
248
0
        }
249
250
0
        let maybe_event = substream.inner.unwrap().reset();
251
0
        if let Some(event) = maybe_event {
252
0
            Self::on_substream_event(
253
0
                &mut self.pending_events,
254
0
                substream.id,
255
0
                &mut substream.user_data,
256
0
                event,
257
0
            );
258
0
        }
259
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE15reset_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE15reset_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE15reset_substreamBa_
260
261
    /// Reads/writes data on the substream.
262
    ///
263
    /// If the method returns [`SubstreamFate::Reset`], then the substream is now considered dead
264
    /// according to the state machine and its identifier is now invalid. If the reading or
265
    /// writing side of the substream was still open, then the user should reset that substream.
266
    ///
267
    /// This method will refuse to accept data if too many events are already queued. Use
268
    /// [`MultiStream::pull_event`] to empty the queue of events between calls to this method.
269
    ///
270
    /// In the case of a WebRTC connection, the [`ReadWrite::incoming_buffer`] and
271
    /// [`ReadWrite::write_bytes_queueable`] must always be `Some`.
272
    ///
273
    /// # Panic
274
    ///
275
    /// Panics if there is no substream with that identifier.
276
    /// Panics if this is a WebRTC connection, and the reading or writing side is closed.
277
    ///
278
    #[must_use]
279
0
    pub fn substream_read_write(
280
0
        &mut self,
281
0
        substream_id: &TSubId,
282
0
        read_write: &'_ mut ReadWrite<TNow>,
283
0
    ) -> SubstreamFate {
284
0
        let substream = self.in_substreams.get_mut(substream_id).unwrap();
285
0
286
0
        // In WebRTC, the reading and writing side is never closed.
287
0
        assert!(
288
0
            read_write.expected_incoming_bytes.is_some()
289
0
                && read_write.write_bytes_queueable.is_some()
290
        );
291
292
        // Reading/writing the ping substream is used to queue new outgoing pings.
293
0
        if Some(substream_id) == self.ping_substream.as_ref() {
294
0
            if read_write.now >= self.next_ping {
295
0
                let mut payload = [0u8; 32];
296
0
                self.ping_payload_randomness.fill_bytes(&mut payload);
297
0
                substream.inner.as_mut().unwrap().queue_ping(
298
0
                    &payload,
299
0
                    read_write.now.clone(),
300
0
                    self.ping_timeout,
301
0
                );
302
0
                self.next_ping = read_write.now.clone() + self.ping_interval;
303
0
            }
304
305
0
            read_write.wake_up_after(&self.next_ping);
306
0
        }
307
308
        // Don't process any more data before events are pulled.
309
0
        if self.pending_events.len() >= MAX_PENDING_EVENTS {
310
0
            return SubstreamFate::Continue;
311
0
        }
312
313
        // Now process the substream.
314
0
        let event = match substream.framing.read_write(read_write) {
315
0
            Ok(mut framing) => {
316
0
                let (substream_update, event) =
317
0
                    substream.inner.take().unwrap().read_write(&mut framing);
318
0
                substream.inner = substream_update;
319
0
                event
320
            }
321
0
            Err(_) => substream.inner.take().unwrap().reset(),
322
        };
323
324
0
        if let Some(event) = event {
325
0
            read_write.wake_up_asap();
326
0
            Self::on_substream_event(
327
0
                &mut self.pending_events,
328
0
                substream.id,
329
0
                &mut substream.user_data,
330
0
                event,
331
0
            )
332
0
        }
333
334
        // The substream is `None` if it needs to be reset.
335
0
        if substream.inner.is_none() {
336
0
            if Some(substream_id) == self.ping_substream.as_ref() {
337
0
                self.ping_substream = None;
338
0
            }
339
0
            self.out_in_substreams_map.remove(&substream.id);
340
0
            self.in_substreams.remove(substream_id);
341
0
            SubstreamFate::Reset
342
        } else {
343
0
            SubstreamFate::Continue
344
        }
345
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE20substream_read_writeBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE20substream_read_writeCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE20substream_read_writeBa_
346
347
    /// Turns an event from the [`substream`] module into an [`Event`] and adds it to the queue.
348
0
    fn on_substream_event(
349
0
        pending_events: &mut VecDeque<Event<TSubUd>>,
350
0
        substream_id: u32,
351
0
        substream_user_data: &mut Option<TSubUd>,
352
0
        event: substream::Event,
353
0
    ) {
354
0
        pending_events.push_back(match event {
355
            substream::Event::InboundError {
356
0
                error,
357
0
                was_accepted: false,
358
0
            } => Event::InboundError(error),
359
            substream::Event::InboundError {
360
                was_accepted: true, ..
361
0
            } => Event::InboundAcceptedCancel {
362
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
363
0
                user_data: substream_user_data.take().unwrap(),
364
0
            },
365
0
            substream::Event::InboundNegotiated(protocol_name) => Event::InboundNegotiated {
366
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
367
0
                protocol_name,
368
0
            },
369
0
            substream::Event::InboundNegotiatedCancel => Event::InboundAcceptedCancel {
370
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
371
0
                user_data: substream_user_data.take().unwrap(),
372
0
            },
373
0
            substream::Event::RequestIn { request } => Event::RequestIn {
374
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
375
0
                request,
376
0
            },
377
0
            substream::Event::Response { response } => Event::Response {
378
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
379
0
                response,
380
0
                user_data: substream_user_data.take().unwrap(),
381
0
            },
382
0
            substream::Event::NotificationsInOpen { handshake } => Event::NotificationsInOpen {
383
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
384
0
                handshake,
385
0
            },
386
0
            substream::Event::NotificationsInOpenCancel => Event::NotificationsInOpenCancel {
387
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
388
0
            },
389
0
            substream::Event::NotificationIn { notification } => Event::NotificationIn {
390
0
                notification,
391
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
392
0
            },
393
0
            substream::Event::NotificationsInClose { outcome } => Event::NotificationsInClose {
394
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
395
0
                outcome,
396
0
                user_data: substream_user_data.take().unwrap(),
397
0
            },
398
0
            substream::Event::NotificationsOutResult { result } => Event::NotificationsOutResult {
399
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
400
0
                result: match result {
401
0
                    Ok(r) => Ok(r),
402
0
                    Err(err) => Err((err, substream_user_data.take().unwrap())),
403
                },
404
            },
405
            substream::Event::NotificationsOutCloseDemanded => {
406
0
                Event::NotificationsOutCloseDemanded {
407
0
                    id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
408
0
                }
409
            }
410
0
            substream::Event::NotificationsOutReset => Event::NotificationsOutReset {
411
0
                id: SubstreamId(SubstreamIdInner::MultiStream(substream_id)),
412
0
                user_data: substream_user_data.take().unwrap(),
413
0
            },
414
0
            substream::Event::PingOutSuccess { ping_time } => Event::PingOutSuccess { ping_time },
415
            substream::Event::PingOutError { .. } => {
416
                // Because ping events are automatically generated by the external API without any
417
                // guarantee, it is safe to merge multiple failed pings into one.
418
0
                Event::PingOutFailed
419
            }
420
        });
421
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18on_substream_eventBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE18on_substream_eventCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18on_substream_eventBa_
422
423
    /// Sends a request to the remote.
424
    ///
425
    /// This method only inserts the request into the connection object. The request will later
426
    /// be sent out through [`MultiStream::substream_read_write`].
427
    ///
428
    /// Assuming that the remote is using the same implementation, an [`Event::RequestIn`] will
429
    /// be generated on its side.
430
    ///
431
    /// If `request` is `None`, then no request is sent to the remote at all. If `request` is
432
    /// `Some`, then a (potentially-empty) request is sent. If `Some(&[])` is provided, a
433
    /// length-prefix containing a 0 is sent to the remote.
434
    ///
435
    /// After the remote has sent back a response, an [`Event::Response`] event will be generated
436
    /// locally. The `user_data` parameter will be passed back.
437
    ///
438
    /// The timeout is the time between the moment the substream is opened and the moment the
439
    /// response is sent back. If the emitter doesn't send the request or if the receiver doesn't
440
    /// answer during this time window, the request is considered failed.
441
0
    pub fn add_request(
442
0
        &mut self,
443
0
        protocol_name: String,
444
0
        request: Option<Vec<u8>>,
445
0
        timeout: TNow,
446
0
        max_response_size: usize,
447
0
        user_data: TSubUd,
448
0
    ) -> SubstreamId {
449
0
        let substream_id = self.next_out_substream_id;
450
0
        self.next_out_substream_id += 1;
451
0
452
0
        self.desired_out_substreams.push_back(Substream {
453
0
            id: substream_id,
454
0
            inner: Some(substream::Substream::request_out(
455
0
                protocol_name,
456
0
                timeout,
457
0
                request,
458
0
                max_response_size,
459
0
            )),
460
0
            user_data: Some(user_data),
461
0
            framing: webrtc_framing::WebRtcFraming::new(),
462
0
        });
463
0
464
0
        // TODO: ? do this? substream.reserve_window(128 * 1024 * 1024 + 128); // TODO: proper max size
465
0
466
0
        SubstreamId(SubstreamIdInner::MultiStream(substream_id))
467
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE11add_requestBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE11add_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE11add_requestBa_
468
469
    /// Returns the user data associated to a notifications substream.
470
    ///
471
    /// Returns `None` if the substream doesn't exist or isn't a notifications substream.
472
0
    pub fn notifications_substream_user_data_mut(
473
0
        &mut self,
474
0
        id: SubstreamId,
475
0
    ) -> Option<&mut TSubUd> {
476
0
        let id = match id.0 {
477
0
            SubstreamIdInner::MultiStream(id) => id,
478
0
            _ => return None,
479
        };
480
481
0
        let inner_substream_id = self.out_in_substreams_map.get(&id).unwrap();
482
0
483
0
        self.in_substreams
484
0
            .get_mut(inner_substream_id)
485
0
            .unwrap()
486
0
            .user_data
487
0
            .as_mut()
488
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE37notifications_substream_user_data_mutBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE37notifications_substream_user_data_mutBa_
489
490
    /// Opens a outgoing substream with the given protocol, destined for a stream of
491
    /// notifications.
492
    ///
493
    /// The remote must first accept (or reject) the substream before notifications can be sent
494
    /// on it.
495
    ///
496
    /// This method only inserts the opening handshake into the connection object. The handshake
497
    /// will later be sent out through [`MultiStream::substream_read_write`].
498
    ///
499
    /// Assuming that the remote is using the same implementation, an
500
    /// [`Event::NotificationsInOpen`] will be generated on its side.
501
    ///
502
0
    pub fn open_notifications_substream(
503
0
        &mut self,
504
0
        protocol_name: String,
505
0
        max_handshake_size: usize,
506
0
        handshake: Vec<u8>,
507
0
        timeout: TNow,
508
0
        user_data: TSubUd,
509
0
    ) -> SubstreamId {
510
0
        let substream_id = self.next_out_substream_id;
511
0
        self.next_out_substream_id += 1;
512
0
513
0
        self.desired_out_substreams.push_back(Substream {
514
0
            id: substream_id,
515
0
            inner: Some(substream::Substream::notifications_out(
516
0
                timeout,
517
0
                protocol_name,
518
0
                handshake,
519
0
                max_handshake_size,
520
0
            )),
521
0
            user_data: Some(user_data),
522
0
            framing: webrtc_framing::WebRtcFraming::new(),
523
0
        });
524
0
525
0
        SubstreamId(SubstreamIdInner::MultiStream(substream_id))
526
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28open_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE28open_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28open_notifications_substreamBa_
527
528
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to accept the protocol
529
    /// name and indicate the type of the protocol.
530
    ///
531
    /// # Panic
532
    ///
533
    /// Panics if the substream is not in the correct state.
534
    ///
535
0
    pub fn accept_inbound(&mut self, substream_id: SubstreamId, ty: InboundTy, user_data: TSubUd) {
536
0
        let substream_id = match substream_id.0 {
537
0
            SubstreamIdInner::MultiStream(id) => id,
538
0
            _ => panic!(),
539
        };
540
541
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
542
0
543
0
        let substream = self.in_substreams.get_mut(inner_substream_id).unwrap();
544
0
        substream.inner.as_mut().unwrap().accept_inbound(ty);
545
0
        debug_assert!(substream.user_data.is_none());
546
0
        substream.user_data = Some(user_data);
547
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14accept_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE14accept_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14accept_inboundBa_
548
549
    /// Call after an [`Event::InboundNegotiated`] has been emitted in order to reject the
550
    /// protocol name as not supported.
551
    ///
552
    /// # Panic
553
    ///
554
    /// Panics if the substream is not in the correct state.
555
    ///
556
0
    pub fn reject_inbound(&mut self, substream_id: SubstreamId) {
557
0
        let substream_id = match substream_id.0 {
558
0
            SubstreamIdInner::MultiStream(id) => id,
559
0
            _ => panic!(),
560
        };
561
562
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
563
0
564
0
        self.in_substreams
565
0
            .get_mut(inner_substream_id)
566
0
            .unwrap()
567
0
            .inner
568
0
            .as_mut()
569
0
            .unwrap()
570
0
            .reject_inbound();
571
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14reject_inboundBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE14reject_inboundCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE14reject_inboundBa_
572
573
    /// Accepts an inbound notifications protocol. Must be called in response to a
574
    /// [`Event::NotificationsInOpen`].
575
    ///
576
    /// # Panic
577
    ///
578
    /// Panics if the substream id is not valid or the substream is of the wrong type.
579
    ///
580
0
    pub fn accept_in_notifications_substream(
581
0
        &mut self,
582
0
        substream_id: SubstreamId,
583
0
        handshake: Vec<u8>,
584
0
        max_notification_size: usize,
585
0
    ) {
586
0
        let substream_id = match substream_id.0 {
587
0
            SubstreamIdInner::MultiStream(id) => id,
588
0
            _ => panic!(),
589
        };
590
591
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
592
0
593
0
        self.in_substreams
594
0
            .get_mut(inner_substream_id)
595
0
            .unwrap()
596
0
            .inner
597
0
            .as_mut()
598
0
            .unwrap()
599
0
            .accept_in_notifications_substream(handshake, max_notification_size);
600
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33accept_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33accept_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33accept_in_notifications_substreamBa_
601
602
    /// Rejects an inbound notifications protocol. Must be called in response to a
603
    /// [`Event::NotificationsInOpen`].
604
    ///
605
    /// # Panic
606
    ///
607
    /// Panics if the substream id is not valid or the substream is of the wrong type.
608
    ///
609
0
    pub fn reject_in_notifications_substream(&mut self, substream_id: SubstreamId) {
610
0
        let substream_id = match substream_id.0 {
611
0
            SubstreamIdInner::MultiStream(id) => id,
612
0
            _ => panic!(),
613
        };
614
615
        // TODO: can panic if pending event hasn't been processed
616
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
617
0
618
0
        self.in_substreams
619
0
            .get_mut(inner_substream_id)
620
0
            .unwrap()
621
0
            .inner
622
0
            .as_mut()
623
0
            .unwrap()
624
0
            .reject_in_notifications_substream();
625
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33reject_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33reject_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33reject_in_notifications_substreamBa_
626
627
    /// Queues a notification to be written out on the given substream.
628
    ///
629
    /// # About back-pressure
630
    ///
631
    /// This method unconditionally queues up data. You must be aware that the remote, however,
632
    /// can decide to delay indefinitely the sending of that data, which can potentially lead to
633
    /// an unbounded increase in memory.
634
    ///
635
    /// As such, you are encouraged to call this method only if the amount of queued data (as
636
    /// determined by calling [`MultiStream::notification_substream_queued_bytes`]) is below a
637
    /// certain threshold. If above, the notification should be silently discarded.
638
    ///
639
    /// # Panic
640
    ///
641
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
642
    /// notifications substream isn't in the appropriate state.
643
    ///
644
0
    pub fn write_notification_unbounded(
645
0
        &mut self,
646
0
        substream_id: SubstreamId,
647
0
        notification: Vec<u8>,
648
0
    ) {
649
0
        let substream_id = match substream_id.0 {
650
0
            SubstreamIdInner::MultiStream(id) => id,
651
0
            _ => panic!(),
652
        };
653
654
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
655
0
656
0
        self.in_substreams
657
0
            .get_mut(inner_substream_id)
658
0
            .unwrap()
659
0
            .inner
660
0
            .as_mut()
661
0
            .unwrap()
662
0
            .write_notification_unbounded(notification);
663
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28write_notification_unboundedBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE28write_notification_unboundedCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE28write_notification_unboundedBa_
664
665
    /// Returns the number of bytes waiting to be sent out on that substream.
666
    ///
667
    /// See the documentation of [`MultiStream::write_notification_unbounded`] for context.
668
    ///
669
    /// # Panic
670
    ///
671
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
672
    /// notifications substream isn't in the appropriate state.
673
    ///
674
0
    pub fn notification_substream_queued_bytes(&self, substream_id: SubstreamId) -> usize {
675
0
        let substream_id = match substream_id.0 {
676
0
            SubstreamIdInner::MultiStream(id) => id,
677
0
            _ => panic!(),
678
        };
679
680
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
681
0
682
0
        self.in_substreams
683
0
            .get(inner_substream_id)
684
0
            .unwrap()
685
0
            .inner
686
0
            .as_ref()
687
0
            .unwrap()
688
0
            .notification_substream_queued_bytes()
689
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE35notification_substream_queued_bytesBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE35notification_substream_queued_bytesBa_
690
691
    /// Closes a notifications substream opened after a successful
692
    /// [`Event::NotificationsOutResult`].
693
    ///
694
    /// This can be done even when in the negotiation phase, in other words before the remote has
695
    /// accepted/refused the substream.
696
    ///
697
    /// # Panic
698
    ///
699
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
700
    /// notifications substream isn't in the appropriate state.
701
    ///
702
0
    pub fn close_out_notifications_substream(&mut self, substream_id: SubstreamId) {
703
0
        let substream_id = match substream_id.0 {
704
0
            SubstreamIdInner::MultiStream(id) => id,
705
0
            _ => panic!(),
706
        };
707
708
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
709
0
710
0
        self.in_substreams
711
0
            .get_mut(inner_substream_id)
712
0
            .unwrap()
713
0
            .inner
714
0
            .as_mut()
715
0
            .unwrap()
716
0
            .close_out_notifications_substream();
717
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33close_out_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE33close_out_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE33close_out_notifications_substreamBa_
718
719
    /// Closes a notifications substream that was accepted using
720
    /// [`MultiStream::accept_in_notifications_substream`].
721
    ///
722
    /// # Panic
723
    ///
724
    /// Panics if the [`SubstreamId`] doesn't correspond to a notifications substream, or if the
725
    /// notifications substream isn't in the appropriate state.
726
    ///
727
0
    pub fn close_in_notifications_substream(&mut self, substream_id: SubstreamId, timeout: TNow) {
728
0
        let substream_id = match substream_id.0 {
729
0
            SubstreamIdInner::MultiStream(id) => id,
730
0
            _ => panic!(),
731
        };
732
733
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
734
0
735
0
        self.in_substreams
736
0
            .get_mut(inner_substream_id)
737
0
            .unwrap()
738
0
            .inner
739
0
            .as_mut()
740
0
            .unwrap()
741
0
            .close_in_notifications_substream(timeout);
742
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE32close_in_notifications_substreamBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE32close_in_notifications_substreamCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE32close_in_notifications_substreamBa_
743
744
    /// Responds to an incoming request. Must be called in response to a [`Event::RequestIn`].
745
    ///
746
    /// Returns an error if the [`SubstreamId`] is invalid.
747
0
    pub fn respond_in_request(
748
0
        &mut self,
749
0
        substream_id: SubstreamId,
750
0
        response: Result<Vec<u8>, ()>,
751
0
    ) -> Result<(), substream::RespondInRequestError> {
752
0
        let substream_id = match substream_id.0 {
753
0
            SubstreamIdInner::MultiStream(id) => id,
754
0
            _ => return Err(substream::RespondInRequestError::SubstreamClosed),
755
        };
756
757
0
        let inner_substream_id = self.out_in_substreams_map.get(&substream_id).unwrap();
758
0
759
0
        self.in_substreams
760
0
            .get_mut(inner_substream_id)
761
0
            .ok_or(substream::RespondInRequestError::SubstreamClosed)?
762
            .inner
763
0
            .as_mut()
764
0
            .unwrap()
765
0
            .respond_in_request(response)
766
0
    }
Unexecuted instantiation: _RNvMNtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18respond_in_requestBa_
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1D_6option6OptionNtNtB8_10collection11SubstreamIdEE18respond_in_requestCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvMNtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB2_11MultiStreampppE18respond_in_requestBa_
767
}
768
769
impl<TNow, TSubId, TSubUd> Index<SubstreamId> for MultiStream<TNow, TSubId, TSubUd>
770
where
771
    TSubId: Clone + PartialEq + Eq + Hash,
772
{
773
    type Output = TSubUd;
774
775
0
    fn index(&self, substream_id: SubstreamId) -> &Self::Output {
776
0
        let substream_id = match substream_id.0 {
777
0
            SubstreamIdInner::MultiStream(id) => id,
778
0
            _ => panic!(),
779
        };
780
781
0
        let inner_sub_id = self.out_in_substreams_map.get(&substream_id).unwrap();
782
0
783
0
        self.in_substreams
784
0
            .get(inner_sub_id)
785
0
            .unwrap()
786
0
            .user_data
787
0
            .as_ref()
788
0
            .unwrap()
789
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_
Unexecuted instantiation: _RNvXs_NtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streamINtB4_11MultiStreamNtNtCsaYZPK01V26L_4core4time8DurationjINtNtB1F_6option6OptionNtNtBa_10collection11SubstreamIdEEINtNtNtB1F_3ops5index5IndexNtB6_11SubstreamIdE5indexCsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index5IndexNtB7_11SubstreamIdE5indexBd_
790
}
791
792
impl<TNow, TSubId, TSubUd> IndexMut<SubstreamId> for MultiStream<TNow, TSubId, TSubUd>
793
where
794
    TSubId: Clone + PartialEq + Eq + Hash,
795
{
796
0
    fn index_mut(&mut self, substream_id: SubstreamId) -> &mut Self::Output {
797
0
        let substream_id = match substream_id.0 {
798
0
            SubstreamIdInner::MultiStream(id) => id,
799
0
            _ => panic!(),
800
        };
801
802
0
        let inner_sub_id = self.out_in_substreams_map.get(&substream_id).unwrap();
803
0
804
0
        self.in_substreams
805
0
            .get_mut(inner_sub_id)
806
0
            .unwrap()
807
0
            .user_data
808
0
            .as_mut()
809
0
            .unwrap()
810
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams0_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams0_0pppEINtB5_11MultiStreampppEINtNtNtCsaYZPK01V26L_4core3ops5index8IndexMutNtB7_11SubstreamIdE9index_mutBd_
811
}
812
813
impl<TNow, TSubId, TSubUd> fmt::Debug for MultiStream<TNow, TSubId, TSubUd> {
814
0
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
815
0
        f.debug_tuple("Established").finish()
816
0
    }
Unexecuted instantiation: _RNvXININtNtNtNtCsN16ciHI6Qf_7smoldot6libp2p10connection11established12multi_streams1_0pppEINtB5_11MultiStreampppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
Unexecuted instantiation: _RNvXININtNtNtNtCseuYC0Zibziv_7smoldot6libp2p10connection11established12multi_streams1_0pppEINtB5_11MultiStreampppENtNtCsaYZPK01V26L_4core3fmt5Debug3fmtBd_
817
}
818
819
/// Whether a substream should remain open or be killed.
820
#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
821
pub enum SubstreamFate {
822
    /// Substream remains open.
823
    Continue,
824
    /// Substream is now considered dead and has been removed from the state machine. Its
825
    /// identifier is now invalid.
826
    Reset,
827
}