Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/tests.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
#![cfg(test)]
19
20
use super::{
21
    Config, Event, InboundError, InboundTy, NotificationsOutErr, RequestError, SingleStream,
22
};
23
use crate::libp2p::read_write::ReadWrite;
24
use core::{cmp, mem, time::Duration};
25
26
struct TwoEstablished {
27
    alice: SingleStream<Duration, ()>,
28
    bob: SingleStream<Duration, ()>,
29
    alice_to_bob_buffer: Vec<u8>,
30
    bob_to_alice_buffer: Vec<u8>,
31
32
    alice_to_bob_buffer_size: usize,
33
    bob_to_alice_buffer_size: usize,
34
35
    /// Time that has elapsed since an unspecified epoch.
36
    now: Duration,
37
38
    /// Next time Alice or Bob needs to be polled.
39
    wake_up_after: Option<Duration>,
40
}
41
42
/// Performs a handshake between two peers, and returns the established connection objects.
43
7
fn perform_handshake(
44
7
    mut alice_to_bob_buffer_size: usize,
45
7
    mut bob_to_alice_buffer_size: usize,
46
7
    alice_config: Config<Duration>,
47
7
    bob_config: Config<Duration>,
48
7
) -> TwoEstablished {
49
7
    use super::super::{single_stream_handshake, NoiseKey};
50
7
51
7
    assert_ne!(alice_to_bob_buffer_size, 0);
52
7
    assert_ne!(bob_to_alice_buffer_size, 0);
53
54
7
    let alice_key = NoiseKey::new(&rand::random(), &rand::random());
55
7
    let bob_key = NoiseKey::new(&rand::random(), &rand::random());
56
7
57
7
    let mut alice =
58
7
        single_stream_handshake::Handshake::noise_yamux(&alice_key, &rand::random(), true);
59
7
    let mut bob = single_stream_handshake::Handshake::noise_yamux(&bob_key, &rand::random(), false);
60
7
61
7
    let mut alice_to_bob_buffer = Vec::new();
62
7
    let mut bob_to_alice_buffer = Vec::new();
63
64
35
    while !matches!(
65
42
        (&alice, &bob),
66
        (
67
            single_stream_handshake::Handshake::Success { .. },
68
            single_stream_handshake::Handshake::Success { .. }
69
        )
70
    ) {
71
35
        match alice {
72
0
            single_stream_handshake::Handshake::Success { .. } => {}
73
35
            single_stream_handshake::Handshake::Healthy(nego) => {
74
35
                let mut read_write = ReadWrite {
75
35
                    now: Duration::new(0, 0),
76
35
                    incoming_buffer: bob_to_alice_buffer,
77
35
                    expected_incoming_bytes: Some(0),
78
35
                    read_bytes: 0,
79
35
                    write_bytes_queued: alice_to_bob_buffer.len(),
80
35
                    write_bytes_queueable: Some(
81
35
                        alice_to_bob_buffer_size - alice_to_bob_buffer.len(),
82
35
                    ),
83
35
                    write_buffers: vec![mem::take(&mut alice_to_bob_buffer)],
84
35
                    wake_up_after: None,
85
35
                };
86
35
87
35
                alice = nego.read_write(&mut read_write).unwrap();
88
35
                bob_to_alice_buffer = read_write.incoming_buffer;
89
35
                alice_to_bob_buffer.extend(
90
35
                    read_write
91
35
                        .write_buffers
92
35
                        .drain(..)
93
105
                        .flat_map(|b| b.into_iter()),
94
35
                );
95
35
                bob_to_alice_buffer_size = cmp::max(
96
35
                    bob_to_alice_buffer_size,
97
35
                    read_write.expected_incoming_bytes.unwrap_or(0),
98
35
                );
99
35
            }
100
        }
101
102
35
        match bob {
103
14
            single_stream_handshake::Handshake::Success { .. } => {}
104
21
            single_stream_handshake::Handshake::Healthy(nego) => {
105
21
                let mut read_write = ReadWrite {
106
21
                    now: Duration::new(0, 0),
107
21
                    incoming_buffer: alice_to_bob_buffer,
108
21
                    expected_incoming_bytes: Some(0),
109
21
                    read_bytes: 0,
110
21
                    write_bytes_queued: bob_to_alice_buffer.len(),
111
21
                    write_bytes_queueable: Some(
112
21
                        bob_to_alice_buffer_size - bob_to_alice_buffer.len(),
113
21
                    ),
114
21
                    write_buffers: vec![mem::take(&mut bob_to_alice_buffer)],
115
21
                    wake_up_after: None,
116
21
                };
117
21
118
21
                bob = nego.read_write(&mut read_write).unwrap();
119
21
                alice_to_bob_buffer = read_write.incoming_buffer;
120
21
                bob_to_alice_buffer.extend(
121
21
                    read_write
122
21
                        .write_buffers
123
21
                        .drain(..)
124
119
                        .flat_map(|b| b.into_iter()),
125
21
                );
126
21
                alice_to_bob_buffer_size = cmp::max(
127
21
                    alice_to_bob_buffer_size,
128
21
                    read_write.expected_incoming_bytes.unwrap_or(0),
129
21
                );
130
21
            }
131
        }
132
    }
133
134
7
    let mut connections = TwoEstablished {
135
7
        alice: match alice {
136
7
            single_stream_handshake::Handshake::Success { connection, .. } => {
137
7
                connection.into_connection(alice_config)
138
            }
139
0
            _ => unreachable!(),
140
        },
141
7
        bob: match bob {
142
7
            single_stream_handshake::Handshake::Success { connection, .. } => {
143
7
                connection.into_connection(bob_config)
144
            }
145
0
            _ => unreachable!(),
146
        },
147
7
        alice_to_bob_buffer,
148
7
        bob_to_alice_buffer,
149
7
        alice_to_bob_buffer_size,
150
7
        bob_to_alice_buffer_size,
151
7
        now: Duration::new(0, 0),
152
7
        wake_up_after: None,
153
    };
154
155
21
    for _ in 0..2 {
156
14
        let (connections_update, event) = connections.run_until_event();
157
14
        connections = connections_update;
158
14
        match event {
159
7
            either::Left(Event::InboundNegotiated { id, .. }) => {
160
7
                connections.alice.accept_inbound(id, InboundTy::Ping, ());
161
7
            }
162
7
            either::Right(Event::InboundNegotiated { id, .. }) => {
163
7
                connections.bob.accept_inbound(id, InboundTy::Ping, ());
164
7
            }
165
0
            _ev => unreachable!("{:?}", _ev),
166
        }
167
    }
168
169
7
    connections
170
7
}
171
172
impl TwoEstablished {
173
2
    fn pass_time(&mut self, amount: Duration) {
174
2
        self.now += amount;
175
2
    }
176
177
34
    fn run_until_event(mut self) -> (Self, either::Either<Event<()>, Event<()>>) {
178
201
        loop {
179
201
            let mut alice_read_write = ReadWrite {
180
201
                now: self.now,
181
201
                incoming_buffer: self.bob_to_alice_buffer,
182
201
                expected_incoming_bytes: Some(0),
183
201
                read_bytes: 0,
184
201
                write_bytes_queued: self.alice_to_bob_buffer.len(),
185
201
                write_bytes_queueable: Some(
186
201
                    self.alice_to_bob_buffer_size - self.alice_to_bob_buffer.len(),
187
201
                ),
188
201
                write_buffers: vec![mem::take(&mut self.alice_to_bob_buffer)],
189
201
                wake_up_after: self.wake_up_after,
190
201
            };
191
201
192
201
            let (new_alice, alice_event) = self.alice.read_write(&mut alice_read_write).unwrap();
193
201
            self.bob_to_alice_buffer = alice_read_write.incoming_buffer;
194
201
            self.alice = new_alice;
195
201
            let alice_read_bytes = alice_read_write.read_bytes;
196
201
            let alice_written_bytes = alice_read_write.write_bytes_queued;
197
201
            self.alice_to_bob_buffer.extend(
198
201
                alice_read_write
199
201
                    .write_buffers
200
201
                    .drain(..)
201
333
                    .flat_map(|b| b.into_iter()),
202
201
            );
203
201
            self.bob_to_alice_buffer_size = cmp::max(
204
201
                self.bob_to_alice_buffer_size,
205
201
                alice_read_write.expected_incoming_bytes.unwrap_or(0),
206
201
            );
207
201
            self.wake_up_after = alice_read_write.wake_up_after;
208
209
201
            if let Some(
event13
) = alice_event {
210
13
                return (self, either::Left(event));
211
188
            }
212
188
213
188
            let mut bob_read_write = ReadWrite {
214
188
                now: self.now,
215
188
                incoming_buffer: self.alice_to_bob_buffer,
216
188
                expected_incoming_bytes: Some(0),
217
188
                read_bytes: 0,
218
188
                write_bytes_queued: self.bob_to_alice_buffer.len(),
219
188
                write_bytes_queueable: Some(
220
188
                    self.bob_to_alice_buffer_size - self.bob_to_alice_buffer.len(),
221
188
                ),
222
188
                write_buffers: vec![mem::take(&mut self.bob_to_alice_buffer)],
223
188
                wake_up_after: self.wake_up_after,
224
188
            };
225
188
226
188
            let (new_bob, bob_event) = self.bob.read_write(&mut bob_read_write).unwrap();
227
188
            self.alice_to_bob_buffer = bob_read_write.incoming_buffer;
228
188
            self.bob = new_bob;
229
188
            let bob_read_bytes = bob_read_write.read_bytes;
230
188
            let bob_written_bytes = bob_read_write.write_bytes_queued;
231
188
            self.bob_to_alice_buffer.extend(
232
188
                bob_read_write
233
188
                    .write_buffers
234
188
                    .drain(..)
235
332
                    .flat_map(|b| b.into_iter()),
236
188
            );
237
188
            self.alice_to_bob_buffer_size = cmp::max(
238
188
                self.alice_to_bob_buffer_size,
239
188
                bob_read_write.expected_incoming_bytes.unwrap_or(0),
240
188
            );
241
188
            self.wake_up_after = bob_read_write.wake_up_after;
242
243
188
            if let Some(
event21
) = bob_event {
244
21
                return (self, either::Right(event));
245
167
            }
246
167
247
167
            if bob_read_bytes != 0
248
135
                || bob_written_bytes != 0
249
98
                || alice_read_bytes != 0
250
75
                || alice_written_bytes != 0
251
            {
252
98
                continue;
253
69
            }
254
255
            // Nothing more will happen immediately. Advance time before looping again.
256
69
            if let Some(wake_up_after) = self.wake_up_after.take() {
257
69
                self.now = wake_up_after + Duration::new(0, 1); // TODO: adding 1 ns is a hack
258
69
            } else {
259
                // TODO: what to do here?! nothing more will happen
260
0
                panic!();
261
            }
262
        }
263
34
    }
264
}
265
266
#[test]
267
1
fn handshake_works() {
268
1
    fn test_with_buffer_sizes(size1: usize, size2: usize) {
269
1
        let config = Config {
270
1
            first_out_ping: Duration::new(0, 0),
271
1
            max_inbound_substreams: 64,
272
1
            substreams_capacity: 16,
273
1
            max_protocol_name_len: 128,
274
1
            ping_interval: Duration::from_secs(20),
275
1
            ping_protocol: "ping".to_owned(),
276
1
            ping_timeout: Duration::from_secs(20),
277
1
            randomness_seed: [0; 32],
278
1
        };
279
1
280
1
        perform_handshake(size1, size2, config.clone(), config);
281
1
    }
282
1
283
1
    test_with_buffer_sizes(256, 256);
284
1
    // TODO: doesn't work
285
1
    /*test_with_buffer_sizes(1, 1);
286
1
    test_with_buffer_sizes(1, 2048);
287
1
    test_with_buffer_sizes(2048, 1);*/
288
1
}
289
290
#[test]
291
#[ignore] // TODO: un-ignore
292
0
fn successful_request() {
293
0
    let config = Config {
294
0
        first_out_ping: Duration::new(60, 0),
295
0
        max_inbound_substreams: 64,
296
0
        substreams_capacity: 16,
297
0
        max_protocol_name_len: 128,
298
0
        ping_interval: Duration::from_secs(20),
299
0
        ping_protocol: "ping".to_owned(),
300
0
        ping_timeout: Duration::from_secs(20),
301
0
        randomness_seed: [0; 32],
302
0
    };
303
0
304
0
    let mut connections = perform_handshake(256, 256, config.clone(), config);
305
0
306
0
    let substream_id = connections.alice.add_request(
307
0
        "test-request-protocol".to_owned(),
308
0
        Some(b"request payload".to_vec()),
309
0
        Duration::from_secs(5),
310
0
        1024,
311
0
        (),
312
0
    );
313
0
314
0
    let (connections_update, event) = connections.run_until_event();
315
0
    connections = connections_update;
316
0
    match event {
317
0
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
318
0
            assert_eq!(protocol_name, "test-request-protocol");
319
0
            connections.bob.accept_inbound(
320
0
                id,
321
0
                InboundTy::Request {
322
0
                    request_max_size: Some(1024 * 1024),
323
0
                },
324
0
                (),
325
0
            );
326
        }
327
0
        _ev => unreachable!("{:?}", _ev),
328
    }
329
330
0
    let (connections_update, event) = connections.run_until_event();
331
0
    connections = connections_update;
332
0
    match event {
333
0
        either::Right(Event::RequestIn { id, request }) => {
334
0
            assert_eq!(request, b"request payload");
335
0
            connections
336
0
                .bob
337
0
                .respond_in_request(id, Ok(b"response payload".to_vec()))
338
0
                .unwrap();
339
        }
340
0
        _ev => unreachable!("{:?}", _ev),
341
    }
342
343
0
    let (_, event) = connections.run_until_event();
344
0
    match event {
345
0
        either::Left(Event::Response { id, response, .. }) => {
346
0
            assert_eq!(id, substream_id);
347
0
            assert_eq!(response.unwrap(), b"response payload".to_vec());
348
        }
349
0
        _ev => unreachable!("{:?}", _ev),
350
    }
351
0
}
352
353
#[test]
354
1
fn refused_request() {
355
1
    let config = Config {
356
1
        first_out_ping: Duration::new(60, 0),
357
1
        max_inbound_substreams: 64,
358
1
        substreams_capacity: 16,
359
1
        max_protocol_name_len: 128,
360
1
        ping_interval: Duration::from_secs(20),
361
1
        ping_protocol: "ping".to_owned(),
362
1
        ping_timeout: Duration::from_secs(20),
363
1
        randomness_seed: [0; 32],
364
1
    };
365
1
366
1
    let mut connections = perform_handshake(256, 256, config.clone(), config);
367
1
368
1
    let substream_id = connections.alice.add_request(
369
1
        "test-request-protocol".to_owned(),
370
1
        Some(b"request payload".to_vec()),
371
1
        Duration::from_secs(5),
372
1
        1024,
373
1
        (),
374
1
    );
375
1
376
1
    let (connections_update, event) = connections.run_until_event();
377
1
    connections = connections_update;
378
1
    match event {
379
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
380
1
            assert_eq!(protocol_name, "test-request-protocol");
381
1
            connections.bob.accept_inbound(
382
1
                id,
383
1
                InboundTy::Request {
384
1
                    request_max_size: Some(1024 * 1024),
385
1
                },
386
1
                (),
387
1
            );
388
        }
389
0
        _ev => unreachable!("{:?}", _ev),
390
    }
391
392
1
    let (connections_update, event) = connections.run_until_event();
393
1
    connections = connections_update;
394
1
    match event {
395
1
        either::Right(Event::RequestIn { id, request }) => {
396
1
            assert_eq!(request, b"request payload");
397
1
            connections.bob.respond_in_request(id, Err(())).unwrap();
398
        }
399
0
        _ev => unreachable!("{:?}", _ev),
400
    }
401
402
1
    let (_, event) = connections.run_until_event();
403
1
    match event {
404
1
        either::Left(Event::Response { id, response, .. }) => {
405
1
            assert_eq!(id, substream_id);
406
1
            assert!(
matches!0
(response, Err(RequestError::SubstreamClosed)));
407
        }
408
0
        _ev => unreachable!("{:?}", _ev),
409
    }
410
1
}
411
412
#[test]
413
1
fn request_protocol_not_supported() {
414
1
    let alice_config = Config {
415
1
        first_out_ping: Duration::new(60, 0),
416
1
        max_inbound_substreams: 64,
417
1
        substreams_capacity: 16,
418
1
        max_protocol_name_len: 128,
419
1
        ping_interval: Duration::from_secs(20),
420
1
        ping_protocol: "ping".to_owned(),
421
1
        ping_timeout: Duration::from_secs(20),
422
1
        randomness_seed: [0; 32],
423
1
    };
424
1
425
1
    let bob_config = Config {
426
1
        ..alice_config.clone()
427
1
    };
428
1
429
1
    let mut connections = perform_handshake(256, 256, alice_config, bob_config);
430
1
431
1
    let substream_id = connections.alice.add_request(
432
1
        "test-request-protocol".to_owned(),
433
1
        Some(b"request payload".to_vec()),
434
1
        Duration::from_secs(5),
435
1
        1024,
436
1
        (),
437
1
    );
438
1
439
1
    let (connections_update, event) = connections.run_until_event();
440
1
    connections = connections_update;
441
1
    match event {
442
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
443
1
            assert_eq!(protocol_name, "test-request-protocol");
444
1
            connections.bob.reject_inbound(id);
445
        }
446
0
        _ev => unreachable!("{:?}", _ev),
447
    }
448
449
1
    let (_, event) = connections.run_until_event();
450
1
    match event {
451
1
        either::Left(Event::Response { id, response, .. }) => {
452
1
            assert_eq!(id, substream_id);
453
1
            assert!(
matches!0
(response, Err(RequestError::ProtocolNotAvailable)));
454
        }
455
0
        either::Right(Event::InboundError(InboundError::NegotiationError(_))) => {}
456
0
        _ev => unreachable!("{:?}", _ev),
457
    }
458
1
}
459
460
#[test]
461
1
fn request_timeout() {
462
1
    let config = Config {
463
1
        first_out_ping: Duration::new(60, 0),
464
1
        max_inbound_substreams: 64,
465
1
        substreams_capacity: 16,
466
1
        max_protocol_name_len: 128,
467
1
        ping_interval: Duration::from_secs(20),
468
1
        ping_protocol: "ping".to_owned(),
469
1
        ping_timeout: Duration::from_secs(20),
470
1
        randomness_seed: [0; 32],
471
1
    };
472
1
473
1
    let mut connections = perform_handshake(256, 256, config.clone(), config);
474
1
475
1
    let substream_id = connections.alice.add_request(
476
1
        "test-request-protocol".to_owned(),
477
1
        Some(b"request payload".to_vec()),
478
1
        Duration::from_secs(5),
479
1
        1024,
480
1
        (),
481
1
    );
482
1
483
1
    let (connections_update, event) = connections.run_until_event();
484
1
    connections = connections_update;
485
1
    match event {
486
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
487
1
            assert_eq!(protocol_name, "test-request-protocol");
488
1
            connections.bob.accept_inbound(
489
1
                id,
490
1
                InboundTy::Request {
491
1
                    request_max_size: Some(1024 * 1024),
492
1
                },
493
1
                (),
494
1
            );
495
        }
496
0
        _ev => unreachable!("{:?}", _ev),
497
    }
498
499
1
    let (connections_update, event) = connections.run_until_event();
500
1
    connections = connections_update;
501
1
    match event {
502
1
        either::Right(Event::RequestIn { request, .. }) => {
503
1
            assert_eq!(request, b"request payload");
504
            // Don't answer.
505
        }
506
0
        _ev => unreachable!("{:?}", _ev),
507
    }
508
509
1
    connections.pass_time(Duration::from_secs(6));
510
1
511
1
    let (_, event) = connections.run_until_event();
512
1
    match event {
513
1
        either::Left(Event::Response { id, response, .. }) => {
514
1
            assert_eq!(id, substream_id);
515
1
            assert!(
matches!0
(response, Err(RequestError::Timeout)));
516
        }
517
0
        _ev => unreachable!("{:?}", _ev),
518
    }
519
1
}
520
521
#[test]
522
1
fn outbound_substream_works() {
523
1
    let config = Config {
524
1
        first_out_ping: Duration::new(60, 0),
525
1
        max_inbound_substreams: 64,
526
1
        substreams_capacity: 16,
527
1
        max_protocol_name_len: 128,
528
1
        ping_interval: Duration::from_secs(20),
529
1
        ping_protocol: "ping".to_owned(),
530
1
        ping_timeout: Duration::from_secs(20),
531
1
        randomness_seed: [0; 32],
532
1
    };
533
1
534
1
    let mut connections = perform_handshake(256, 256, config.clone(), config);
535
1
536
1
    let substream_id = connections.alice.open_notifications_substream(
537
1
        "test-notif-protocol".to_owned(),
538
1
        b"hello".to_vec(),
539
1
        1024,
540
1
        connections.now + Duration::from_secs(5),
541
1
        (),
542
1
    );
543
1
544
1
    let (connections_update, event) = connections.run_until_event();
545
1
    connections = connections_update;
546
1
    match event {
547
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
548
1
            assert_eq!(protocol_name, "test-notif-protocol");
549
1
            connections.bob.accept_inbound(
550
1
                id,
551
1
                InboundTy::Notifications {
552
1
                    max_handshake_size: 1024,
553
1
                },
554
1
                (),
555
1
            );
556
        }
557
0
        _ev => unreachable!("{:?}", _ev),
558
    }
559
560
1
    let (connections_update, event) = connections.run_until_event();
561
1
    connections = connections_update;
562
1
    match event {
563
1
        either::Right(Event::NotificationsInOpen { id, handshake }) => {
564
1
            assert_eq!(handshake, b"hello");
565
1
            connections
566
1
                .bob
567
1
                .accept_in_notifications_substream(id, b"hello back".to_vec(), 4 * 1024);
568
        }
569
0
        _ev => unreachable!("{:?}", _ev),
570
    }
571
572
1
    let notifications_to_send = vec![
573
1
        b"notif 1".to_vec(),
574
1
        b"notif 2".to_vec(),
575
1
        b"notif 3".to_vec(),
576
1
    ];
577
1
    let mut notifications_to_receive = notifications_to_send.clone();
578
1
579
1
    let (connections_update, event) = connections.run_until_event();
580
1
    connections = connections_update;
581
1
    match event {
582
        either::Left(Event::NotificationsOutResult {
583
1
            id,
584
1
            result: Ok(handshake),
585
1
        }) => {
586
1
            assert_eq!(id, substream_id);
587
1
            assert_eq!(handshake, b"hello back");
588
4
            for 
notif3
in notifications_to_send {
589
3
                connections.alice.write_notification_unbounded(id, notif);
590
3
            }
591
        }
592
0
        _ev => unreachable!("{:?}", _ev),
593
    }
594
595
4
    while !notifications_to_receive.is_empty() {
596
3
        let (connections_update, event) = connections.run_until_event();
597
3
        connections = connections_update;
598
3
        match event {
599
3
            either::Right(Event::NotificationIn { notification, .. }) => {
600
3
                let pos = notifications_to_receive
601
3
                    .iter()
602
3
                    .position(|n| *n == notification)
603
3
                    .unwrap();
604
3
                notifications_to_receive.remove(pos);
605
3
            }
606
0
            _ev => unreachable!("{:?}", _ev),
607
        }
608
    }
609
1
}
610
611
#[test]
612
1
fn outbound_substream_open_timeout() {
613
1
    let config = Config {
614
1
        first_out_ping: Duration::new(60, 0),
615
1
        max_inbound_substreams: 64,
616
1
        substreams_capacity: 16,
617
1
        max_protocol_name_len: 128,
618
1
        ping_interval: Duration::from_secs(20),
619
1
        ping_protocol: "ping".to_owned(),
620
1
        ping_timeout: Duration::from_secs(20),
621
1
        randomness_seed: [0; 32],
622
1
    };
623
1
624
1
    let mut connections = perform_handshake(256, 256, config.clone(), config);
625
1
626
1
    let substream_id = connections.alice.open_notifications_substream(
627
1
        "test-notif-protocol".to_owned(),
628
1
        b"hello".to_vec(),
629
1
        1024,
630
1
        connections.now + Duration::from_secs(5),
631
1
        (),
632
1
    );
633
1
634
1
    let (connections_update, event) = connections.run_until_event();
635
1
    connections = connections_update;
636
1
    match event {
637
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
638
1
            assert_eq!(protocol_name, "test-notif-protocol");
639
1
            connections.bob.accept_inbound(
640
1
                id,
641
1
                InboundTy::Notifications {
642
1
                    max_handshake_size: 1024,
643
1
                },
644
1
                (),
645
1
            );
646
        }
647
0
        _ev => unreachable!("{:?}", _ev),
648
    }
649
650
1
    let (connections_update, event) = connections.run_until_event();
651
1
    connections = connections_update;
652
1
    match event {
653
1
        either::Right(Event::NotificationsInOpen { handshake, .. }) => {
654
1
            assert_eq!(handshake, b"hello");
655
            // Don't answer.
656
        }
657
0
        _ev => unreachable!("{:?}", _ev),
658
    }
659
660
1
    connections.pass_time(Duration::from_secs(10));
661
1
662
1
    let (_, event) = connections.run_until_event();
663
1
    match event {
664
1
        either::Left(Event::NotificationsOutResult { id, result, .. }) => {
665
1
            assert_eq!(id, substream_id);
666
1
            assert!(
matches!0
(result, Err((NotificationsOutErr::Timeout, _))));
667
        }
668
0
        _ev => unreachable!("{:?}", _ev),
669
    }
670
1
}
671
672
#[test]
673
1
fn outbound_substream_refuse() {
674
1
    let config = Config {
675
1
        first_out_ping: Duration::new(60, 0),
676
1
        max_inbound_substreams: 64,
677
1
        substreams_capacity: 16,
678
1
        max_protocol_name_len: 128,
679
1
        ping_interval: Duration::from_secs(20),
680
1
        ping_protocol: "ping".to_owned(),
681
1
        ping_timeout: Duration::from_secs(20),
682
1
        randomness_seed: [0; 32],
683
1
    };
684
1
685
1
    let mut connections = perform_handshake(256, 256, config.clone(), config);
686
1
687
1
    let substream_id = connections.alice.open_notifications_substream(
688
1
        "test-notif-protocol".to_owned(),
689
1
        b"hello".to_vec(),
690
1
        1024,
691
1
        connections.now + Duration::from_secs(5),
692
1
        (),
693
1
    );
694
1
695
1
    let (connections_update, event) = connections.run_until_event();
696
1
    connections = connections_update;
697
1
    match event {
698
1
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
699
1
            assert_eq!(protocol_name, "test-notif-protocol");
700
1
            connections.bob.accept_inbound(
701
1
                id,
702
1
                InboundTy::Notifications {
703
1
                    max_handshake_size: 1024,
704
1
                },
705
1
                (),
706
1
            );
707
        }
708
0
        _ev => unreachable!("{:?}", _ev),
709
    }
710
711
1
    let (connections_update, event) = connections.run_until_event();
712
1
    connections = connections_update;
713
1
    match event {
714
1
        either::Right(Event::NotificationsInOpen { id, handshake }) => {
715
1
            assert_eq!(handshake, b"hello");
716
1
            connections.bob.reject_in_notifications_substream(id);
717
        }
718
0
        _ev => unreachable!("{:?}", _ev),
719
    }
720
721
1
    let (_, event) = connections.run_until_event();
722
1
    match event {
723
        either::Left(Event::NotificationsOutResult {
724
1
            id,
725
1
            result: Err((NotificationsOutErr::RefusedHandshake, _)),
726
1
            ..
727
1
        }) => {
728
1
            assert_eq!(id, substream_id);
729
        }
730
0
        _ev => unreachable!("{:?}", _ev),
731
    }
732
1
}
733
734
#[test]
735
#[ignore] // TODO: un-ignore
736
0
fn outbound_substream_close_demanded() {
737
0
    let config = Config {
738
0
        first_out_ping: Duration::new(60, 0),
739
0
        max_inbound_substreams: 64,
740
0
        substreams_capacity: 16,
741
0
        max_protocol_name_len: 128,
742
0
        ping_interval: Duration::from_secs(20),
743
0
        ping_protocol: "ping".to_owned(),
744
0
        ping_timeout: Duration::from_secs(20),
745
0
        randomness_seed: [0; 32],
746
0
    };
747
0
748
0
    let mut connections = perform_handshake(256, 256, config.clone(), config);
749
0
750
0
    let substream_id = connections.alice.open_notifications_substream(
751
0
        "test-notif-protocol".to_owned(),
752
0
        b"hello".to_vec(),
753
0
        1024,
754
0
        connections.now + Duration::from_secs(5),
755
0
        (),
756
0
    );
757
0
758
0
    let (connections_update, event) = connections.run_until_event();
759
0
    connections = connections_update;
760
0
    match event {
761
0
        either::Right(Event::InboundNegotiated { id, protocol_name }) => {
762
0
            assert_eq!(protocol_name, "test-notif-protocol");
763
0
            connections.bob.accept_inbound(
764
0
                id,
765
0
                InboundTy::Notifications {
766
0
                    max_handshake_size: 1024,
767
0
                },
768
0
                (),
769
0
            );
770
        }
771
0
        _ev => unreachable!("{:?}", _ev),
772
    }
773
774
0
    let (connections_update, event) = connections.run_until_event();
775
0
    connections = connections_update;
776
0
    match event {
777
0
        either::Right(Event::NotificationsInOpen { id, handshake }) => {
778
0
            assert_eq!(handshake, b"hello");
779
0
            connections
780
0
                .bob
781
0
                .accept_in_notifications_substream(id, b"hello back".to_vec(), 4 * 1024);
782
        }
783
0
        _ev => unreachable!("{:?}", _ev),
784
    }
785
786
0
    let (connections_update, event) = connections.run_until_event();
787
0
    connections = connections_update;
788
0
    match event {
789
        either::Left(Event::NotificationsOutResult {
790
0
            id,
791
0
            result: Ok(handshake),
792
0
        }) => {
793
0
            assert_eq!(id, substream_id);
794
0
            assert_eq!(handshake, b"hello back");
795
0
            connections
796
0
                .alice
797
0
                .write_notification_unbounded(id, b"notif".to_vec());
798
        }
799
0
        _ev => unreachable!("{:?}", _ev),
800
    }
801
802
0
    let (connections_update, event) = connections.run_until_event();
803
0
    connections = connections_update;
804
0
    match event {
805
0
        either::Right(Event::NotificationIn { id, notification }) => {
806
0
            assert_eq!(notification, b"notif");
807
0
            connections
808
0
                .bob
809
0
                .close_in_notifications_substream(id, Duration::from_secs(100))
810
        }
811
0
        _ev => unreachable!("{:?}", _ev),
812
    }
813
814
0
    let (connections_update, event) = connections.run_until_event();
815
0
    connections = connections_update;
816
0
    match event {
817
0
        either::Left(Event::NotificationsOutCloseDemanded { id }) => {
818
0
            connections.alice.close_out_notifications_substream(id);
819
0
        }
820
0
        _ev => unreachable!("{:?}", _ev),
821
    }
822
823
0
    let (_, event) = connections.run_until_event();
824
0
    match event {
825
        either::Right(Event::NotificationsInClose {
826
            outcome: Ok(()), ..
827
0
        }) => {}
828
0
        _ev => unreachable!("{:?}", _ev),
829
    }
830
0
}
831
832
// TODO: more tests