/__w/smoldot/smoldot/repo/lib/src/libp2p/connection/established/tests.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | #![cfg(test)] |
19 | | |
20 | | use super::{ |
21 | | Config, Event, InboundError, InboundTy, NotificationsOutErr, RequestError, SingleStream, |
22 | | }; |
23 | | use crate::libp2p::read_write::ReadWrite; |
24 | | use core::{cmp, mem, time::Duration}; |
25 | | |
26 | | struct TwoEstablished { |
27 | | alice: SingleStream<Duration, ()>, |
28 | | bob: SingleStream<Duration, ()>, |
29 | | alice_to_bob_buffer: Vec<u8>, |
30 | | bob_to_alice_buffer: Vec<u8>, |
31 | | |
32 | | alice_to_bob_buffer_size: usize, |
33 | | bob_to_alice_buffer_size: usize, |
34 | | |
35 | | /// Time that has elapsed since an unspecified epoch. |
36 | | now: Duration, |
37 | | |
38 | | /// Next time Alice or Bob needs to be polled. |
39 | | wake_up_after: Option<Duration>, |
40 | | } |
41 | | |
42 | | /// Performs a handshake between two peers, and returns the established connection objects. |
43 | 7 | fn perform_handshake( |
44 | 7 | mut alice_to_bob_buffer_size: usize, |
45 | 7 | mut bob_to_alice_buffer_size: usize, |
46 | 7 | alice_config: Config<Duration>, |
47 | 7 | bob_config: Config<Duration>, |
48 | 7 | ) -> TwoEstablished { |
49 | 7 | use super::super::{single_stream_handshake, NoiseKey}; |
50 | 7 | |
51 | 7 | assert_ne!(alice_to_bob_buffer_size, 0); |
52 | 7 | assert_ne!(bob_to_alice_buffer_size, 0); |
53 | | |
54 | 7 | let alice_key = NoiseKey::new(&rand::random(), &rand::random()); |
55 | 7 | let bob_key = NoiseKey::new(&rand::random(), &rand::random()); |
56 | 7 | |
57 | 7 | let mut alice = |
58 | 7 | single_stream_handshake::Handshake::noise_yamux(&alice_key, &rand::random(), true); |
59 | 7 | let mut bob = single_stream_handshake::Handshake::noise_yamux(&bob_key, &rand::random(), false); |
60 | 7 | |
61 | 7 | let mut alice_to_bob_buffer = Vec::new(); |
62 | 7 | let mut bob_to_alice_buffer = Vec::new(); |
63 | | |
64 | 35 | while !matches!( |
65 | 42 | (&alice, &bob), |
66 | | ( |
67 | | single_stream_handshake::Handshake::Success { .. }, |
68 | | single_stream_handshake::Handshake::Success { .. } |
69 | | ) |
70 | | ) { |
71 | 35 | match alice { |
72 | 0 | single_stream_handshake::Handshake::Success { .. } => {} |
73 | 35 | single_stream_handshake::Handshake::Healthy(nego) => { |
74 | 35 | let mut read_write = ReadWrite { |
75 | 35 | now: Duration::new(0, 0), |
76 | 35 | incoming_buffer: bob_to_alice_buffer, |
77 | 35 | expected_incoming_bytes: Some(0), |
78 | 35 | read_bytes: 0, |
79 | 35 | write_bytes_queued: alice_to_bob_buffer.len(), |
80 | 35 | write_bytes_queueable: Some( |
81 | 35 | alice_to_bob_buffer_size - alice_to_bob_buffer.len(), |
82 | 35 | ), |
83 | 35 | write_buffers: vec![mem::take(&mut alice_to_bob_buffer)], |
84 | 35 | wake_up_after: None, |
85 | 35 | }; |
86 | 35 | |
87 | 35 | alice = nego.read_write(&mut read_write).unwrap(); |
88 | 35 | bob_to_alice_buffer = read_write.incoming_buffer; |
89 | 35 | alice_to_bob_buffer.extend( |
90 | 35 | read_write |
91 | 35 | .write_buffers |
92 | 35 | .drain(..) |
93 | 105 | .flat_map(|b| b.into_iter()), |
94 | 35 | ); |
95 | 35 | bob_to_alice_buffer_size = cmp::max( |
96 | 35 | bob_to_alice_buffer_size, |
97 | 35 | read_write.expected_incoming_bytes.unwrap_or(0), |
98 | 35 | ); |
99 | 35 | } |
100 | | } |
101 | | |
102 | 35 | match bob { |
103 | 14 | single_stream_handshake::Handshake::Success { .. } => {} |
104 | 21 | single_stream_handshake::Handshake::Healthy(nego) => { |
105 | 21 | let mut read_write = ReadWrite { |
106 | 21 | now: Duration::new(0, 0), |
107 | 21 | incoming_buffer: alice_to_bob_buffer, |
108 | 21 | expected_incoming_bytes: Some(0), |
109 | 21 | read_bytes: 0, |
110 | 21 | write_bytes_queued: bob_to_alice_buffer.len(), |
111 | 21 | write_bytes_queueable: Some( |
112 | 21 | bob_to_alice_buffer_size - bob_to_alice_buffer.len(), |
113 | 21 | ), |
114 | 21 | write_buffers: vec![mem::take(&mut bob_to_alice_buffer)], |
115 | 21 | wake_up_after: None, |
116 | 21 | }; |
117 | 21 | |
118 | 21 | bob = nego.read_write(&mut read_write).unwrap(); |
119 | 21 | alice_to_bob_buffer = read_write.incoming_buffer; |
120 | 21 | bob_to_alice_buffer.extend( |
121 | 21 | read_write |
122 | 21 | .write_buffers |
123 | 21 | .drain(..) |
124 | 119 | .flat_map(|b| b.into_iter()), |
125 | 21 | ); |
126 | 21 | alice_to_bob_buffer_size = cmp::max( |
127 | 21 | alice_to_bob_buffer_size, |
128 | 21 | read_write.expected_incoming_bytes.unwrap_or(0), |
129 | 21 | ); |
130 | 21 | } |
131 | | } |
132 | | } |
133 | | |
134 | 7 | let mut connections = TwoEstablished { |
135 | 7 | alice: match alice { |
136 | 7 | single_stream_handshake::Handshake::Success { connection, .. } => { |
137 | 7 | connection.into_connection(alice_config) |
138 | | } |
139 | 0 | _ => unreachable!(), |
140 | | }, |
141 | 7 | bob: match bob { |
142 | 7 | single_stream_handshake::Handshake::Success { connection, .. } => { |
143 | 7 | connection.into_connection(bob_config) |
144 | | } |
145 | 0 | _ => unreachable!(), |
146 | | }, |
147 | 7 | alice_to_bob_buffer, |
148 | 7 | bob_to_alice_buffer, |
149 | 7 | alice_to_bob_buffer_size, |
150 | 7 | bob_to_alice_buffer_size, |
151 | 7 | now: Duration::new(0, 0), |
152 | 7 | wake_up_after: None, |
153 | | }; |
154 | | |
155 | 21 | for _ in 0..2 { |
156 | 14 | let (connections_update, event) = connections.run_until_event(); |
157 | 14 | connections = connections_update; |
158 | 14 | match event { |
159 | 7 | either::Left(Event::InboundNegotiated { id, .. }) => { |
160 | 7 | connections.alice.accept_inbound(id, InboundTy::Ping, ()); |
161 | 7 | } |
162 | 7 | either::Right(Event::InboundNegotiated { id, .. }) => { |
163 | 7 | connections.bob.accept_inbound(id, InboundTy::Ping, ()); |
164 | 7 | } |
165 | 0 | _ev => unreachable!("{:?}", _ev), |
166 | | } |
167 | | } |
168 | | |
169 | 7 | connections |
170 | 7 | } |
171 | | |
172 | | impl TwoEstablished { |
173 | 2 | fn pass_time(&mut self, amount: Duration) { |
174 | 2 | self.now += amount; |
175 | 2 | } |
176 | | |
177 | 34 | fn run_until_event(mut self) -> (Self, either::Either<Event<()>, Event<()>>) { |
178 | 201 | loop { |
179 | 201 | let mut alice_read_write = ReadWrite { |
180 | 201 | now: self.now, |
181 | 201 | incoming_buffer: self.bob_to_alice_buffer, |
182 | 201 | expected_incoming_bytes: Some(0), |
183 | 201 | read_bytes: 0, |
184 | 201 | write_bytes_queued: self.alice_to_bob_buffer.len(), |
185 | 201 | write_bytes_queueable: Some( |
186 | 201 | self.alice_to_bob_buffer_size - self.alice_to_bob_buffer.len(), |
187 | 201 | ), |
188 | 201 | write_buffers: vec![mem::take(&mut self.alice_to_bob_buffer)], |
189 | 201 | wake_up_after: self.wake_up_after, |
190 | 201 | }; |
191 | 201 | |
192 | 201 | let (new_alice, alice_event) = self.alice.read_write(&mut alice_read_write).unwrap(); |
193 | 201 | self.bob_to_alice_buffer = alice_read_write.incoming_buffer; |
194 | 201 | self.alice = new_alice; |
195 | 201 | let alice_read_bytes = alice_read_write.read_bytes; |
196 | 201 | let alice_written_bytes = alice_read_write.write_bytes_queued; |
197 | 201 | self.alice_to_bob_buffer.extend( |
198 | 201 | alice_read_write |
199 | 201 | .write_buffers |
200 | 201 | .drain(..) |
201 | 333 | .flat_map(|b| b.into_iter()), |
202 | 201 | ); |
203 | 201 | self.bob_to_alice_buffer_size = cmp::max( |
204 | 201 | self.bob_to_alice_buffer_size, |
205 | 201 | alice_read_write.expected_incoming_bytes.unwrap_or(0), |
206 | 201 | ); |
207 | 201 | self.wake_up_after = alice_read_write.wake_up_after; |
208 | | |
209 | 201 | if let Some(event13 ) = alice_event { |
210 | 13 | return (self, either::Left(event)); |
211 | 188 | } |
212 | 188 | |
213 | 188 | let mut bob_read_write = ReadWrite { |
214 | 188 | now: self.now, |
215 | 188 | incoming_buffer: self.alice_to_bob_buffer, |
216 | 188 | expected_incoming_bytes: Some(0), |
217 | 188 | read_bytes: 0, |
218 | 188 | write_bytes_queued: self.bob_to_alice_buffer.len(), |
219 | 188 | write_bytes_queueable: Some( |
220 | 188 | self.bob_to_alice_buffer_size - self.bob_to_alice_buffer.len(), |
221 | 188 | ), |
222 | 188 | write_buffers: vec![mem::take(&mut self.bob_to_alice_buffer)], |
223 | 188 | wake_up_after: self.wake_up_after, |
224 | 188 | }; |
225 | 188 | |
226 | 188 | let (new_bob, bob_event) = self.bob.read_write(&mut bob_read_write).unwrap(); |
227 | 188 | self.alice_to_bob_buffer = bob_read_write.incoming_buffer; |
228 | 188 | self.bob = new_bob; |
229 | 188 | let bob_read_bytes = bob_read_write.read_bytes; |
230 | 188 | let bob_written_bytes = bob_read_write.write_bytes_queued; |
231 | 188 | self.bob_to_alice_buffer.extend( |
232 | 188 | bob_read_write |
233 | 188 | .write_buffers |
234 | 188 | .drain(..) |
235 | 332 | .flat_map(|b| b.into_iter()), |
236 | 188 | ); |
237 | 188 | self.alice_to_bob_buffer_size = cmp::max( |
238 | 188 | self.alice_to_bob_buffer_size, |
239 | 188 | bob_read_write.expected_incoming_bytes.unwrap_or(0), |
240 | 188 | ); |
241 | 188 | self.wake_up_after = bob_read_write.wake_up_after; |
242 | | |
243 | 188 | if let Some(event21 ) = bob_event { |
244 | 21 | return (self, either::Right(event)); |
245 | 167 | } |
246 | 167 | |
247 | 167 | if bob_read_bytes != 0 |
248 | 135 | || bob_written_bytes != 0 |
249 | 98 | || alice_read_bytes != 0 |
250 | 75 | || alice_written_bytes != 0 |
251 | | { |
252 | 98 | continue; |
253 | 69 | } |
254 | | |
255 | | // Nothing more will happen immediately. Advance time before looping again. |
256 | 69 | if let Some(wake_up_after) = self.wake_up_after.take() { |
257 | 69 | self.now = wake_up_after + Duration::new(0, 1); // TODO: adding 1 ns is a hack |
258 | 69 | } else { |
259 | | // TODO: what to do here?! nothing more will happen |
260 | 0 | panic!(); |
261 | | } |
262 | | } |
263 | 34 | } |
264 | | } |
265 | | |
266 | | #[test] |
267 | 1 | fn handshake_works() { |
268 | 1 | fn test_with_buffer_sizes(size1: usize, size2: usize) { |
269 | 1 | let config = Config { |
270 | 1 | first_out_ping: Duration::new(0, 0), |
271 | 1 | max_inbound_substreams: 64, |
272 | 1 | substreams_capacity: 16, |
273 | 1 | max_protocol_name_len: 128, |
274 | 1 | ping_interval: Duration::from_secs(20), |
275 | 1 | ping_protocol: "ping".to_owned(), |
276 | 1 | ping_timeout: Duration::from_secs(20), |
277 | 1 | randomness_seed: [0; 32], |
278 | 1 | }; |
279 | 1 | |
280 | 1 | perform_handshake(size1, size2, config.clone(), config); |
281 | 1 | } |
282 | 1 | |
283 | 1 | test_with_buffer_sizes(256, 256); |
284 | 1 | // TODO: doesn't work |
285 | 1 | /*test_with_buffer_sizes(1, 1); |
286 | 1 | test_with_buffer_sizes(1, 2048); |
287 | 1 | test_with_buffer_sizes(2048, 1);*/ |
288 | 1 | } |
289 | | |
290 | | #[test] |
291 | | #[ignore] // TODO: un-ignore |
292 | 0 | fn successful_request() { |
293 | 0 | let config = Config { |
294 | 0 | first_out_ping: Duration::new(60, 0), |
295 | 0 | max_inbound_substreams: 64, |
296 | 0 | substreams_capacity: 16, |
297 | 0 | max_protocol_name_len: 128, |
298 | 0 | ping_interval: Duration::from_secs(20), |
299 | 0 | ping_protocol: "ping".to_owned(), |
300 | 0 | ping_timeout: Duration::from_secs(20), |
301 | 0 | randomness_seed: [0; 32], |
302 | 0 | }; |
303 | 0 |
|
304 | 0 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
305 | 0 |
|
306 | 0 | let substream_id = connections.alice.add_request( |
307 | 0 | "test-request-protocol".to_owned(), |
308 | 0 | Some(b"request payload".to_vec()), |
309 | 0 | Duration::from_secs(5), |
310 | 0 | 1024, |
311 | 0 | (), |
312 | 0 | ); |
313 | 0 |
|
314 | 0 | let (connections_update, event) = connections.run_until_event(); |
315 | 0 | connections = connections_update; |
316 | 0 | match event { |
317 | 0 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
318 | 0 | assert_eq!(protocol_name, "test-request-protocol"); |
319 | 0 | connections.bob.accept_inbound( |
320 | 0 | id, |
321 | 0 | InboundTy::Request { |
322 | 0 | request_max_size: Some(1024 * 1024), |
323 | 0 | }, |
324 | 0 | (), |
325 | 0 | ); |
326 | | } |
327 | 0 | _ev => unreachable!("{:?}", _ev), |
328 | | } |
329 | | |
330 | 0 | let (connections_update, event) = connections.run_until_event(); |
331 | 0 | connections = connections_update; |
332 | 0 | match event { |
333 | 0 | either::Right(Event::RequestIn { id, request }) => { |
334 | 0 | assert_eq!(request, b"request payload"); |
335 | 0 | connections |
336 | 0 | .bob |
337 | 0 | .respond_in_request(id, Ok(b"response payload".to_vec())) |
338 | 0 | .unwrap(); |
339 | | } |
340 | 0 | _ev => unreachable!("{:?}", _ev), |
341 | | } |
342 | | |
343 | 0 | let (_, event) = connections.run_until_event(); |
344 | 0 | match event { |
345 | 0 | either::Left(Event::Response { id, response, .. }) => { |
346 | 0 | assert_eq!(id, substream_id); |
347 | 0 | assert_eq!(response.unwrap(), b"response payload".to_vec()); |
348 | | } |
349 | 0 | _ev => unreachable!("{:?}", _ev), |
350 | | } |
351 | 0 | } |
352 | | |
353 | | #[test] |
354 | 1 | fn refused_request() { |
355 | 1 | let config = Config { |
356 | 1 | first_out_ping: Duration::new(60, 0), |
357 | 1 | max_inbound_substreams: 64, |
358 | 1 | substreams_capacity: 16, |
359 | 1 | max_protocol_name_len: 128, |
360 | 1 | ping_interval: Duration::from_secs(20), |
361 | 1 | ping_protocol: "ping".to_owned(), |
362 | 1 | ping_timeout: Duration::from_secs(20), |
363 | 1 | randomness_seed: [0; 32], |
364 | 1 | }; |
365 | 1 | |
366 | 1 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
367 | 1 | |
368 | 1 | let substream_id = connections.alice.add_request( |
369 | 1 | "test-request-protocol".to_owned(), |
370 | 1 | Some(b"request payload".to_vec()), |
371 | 1 | Duration::from_secs(5), |
372 | 1 | 1024, |
373 | 1 | (), |
374 | 1 | ); |
375 | 1 | |
376 | 1 | let (connections_update, event) = connections.run_until_event(); |
377 | 1 | connections = connections_update; |
378 | 1 | match event { |
379 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
380 | 1 | assert_eq!(protocol_name, "test-request-protocol"); |
381 | 1 | connections.bob.accept_inbound( |
382 | 1 | id, |
383 | 1 | InboundTy::Request { |
384 | 1 | request_max_size: Some(1024 * 1024), |
385 | 1 | }, |
386 | 1 | (), |
387 | 1 | ); |
388 | | } |
389 | 0 | _ev => unreachable!("{:?}", _ev), |
390 | | } |
391 | | |
392 | 1 | let (connections_update, event) = connections.run_until_event(); |
393 | 1 | connections = connections_update; |
394 | 1 | match event { |
395 | 1 | either::Right(Event::RequestIn { id, request }) => { |
396 | 1 | assert_eq!(request, b"request payload"); |
397 | 1 | connections.bob.respond_in_request(id, Err(())).unwrap(); |
398 | | } |
399 | 0 | _ev => unreachable!("{:?}", _ev), |
400 | | } |
401 | | |
402 | 1 | let (_, event) = connections.run_until_event(); |
403 | 1 | match event { |
404 | 1 | either::Left(Event::Response { id, response, .. }) => { |
405 | 1 | assert_eq!(id, substream_id); |
406 | 1 | assert!(matches!0 (response, Err(RequestError::SubstreamClosed))); |
407 | | } |
408 | 0 | _ev => unreachable!("{:?}", _ev), |
409 | | } |
410 | 1 | } |
411 | | |
412 | | #[test] |
413 | 1 | fn request_protocol_not_supported() { |
414 | 1 | let alice_config = Config { |
415 | 1 | first_out_ping: Duration::new(60, 0), |
416 | 1 | max_inbound_substreams: 64, |
417 | 1 | substreams_capacity: 16, |
418 | 1 | max_protocol_name_len: 128, |
419 | 1 | ping_interval: Duration::from_secs(20), |
420 | 1 | ping_protocol: "ping".to_owned(), |
421 | 1 | ping_timeout: Duration::from_secs(20), |
422 | 1 | randomness_seed: [0; 32], |
423 | 1 | }; |
424 | 1 | |
425 | 1 | let bob_config = Config { |
426 | 1 | ..alice_config.clone() |
427 | 1 | }; |
428 | 1 | |
429 | 1 | let mut connections = perform_handshake(256, 256, alice_config, bob_config); |
430 | 1 | |
431 | 1 | let substream_id = connections.alice.add_request( |
432 | 1 | "test-request-protocol".to_owned(), |
433 | 1 | Some(b"request payload".to_vec()), |
434 | 1 | Duration::from_secs(5), |
435 | 1 | 1024, |
436 | 1 | (), |
437 | 1 | ); |
438 | 1 | |
439 | 1 | let (connections_update, event) = connections.run_until_event(); |
440 | 1 | connections = connections_update; |
441 | 1 | match event { |
442 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
443 | 1 | assert_eq!(protocol_name, "test-request-protocol"); |
444 | 1 | connections.bob.reject_inbound(id); |
445 | | } |
446 | 0 | _ev => unreachable!("{:?}", _ev), |
447 | | } |
448 | | |
449 | 1 | let (_, event) = connections.run_until_event(); |
450 | 1 | match event { |
451 | 1 | either::Left(Event::Response { id, response, .. }) => { |
452 | 1 | assert_eq!(id, substream_id); |
453 | 1 | assert!(matches!0 (response, Err(RequestError::ProtocolNotAvailable))); |
454 | | } |
455 | 0 | either::Right(Event::InboundError(InboundError::NegotiationError(_))) => {} |
456 | 0 | _ev => unreachable!("{:?}", _ev), |
457 | | } |
458 | 1 | } |
459 | | |
460 | | #[test] |
461 | 1 | fn request_timeout() { |
462 | 1 | let config = Config { |
463 | 1 | first_out_ping: Duration::new(60, 0), |
464 | 1 | max_inbound_substreams: 64, |
465 | 1 | substreams_capacity: 16, |
466 | 1 | max_protocol_name_len: 128, |
467 | 1 | ping_interval: Duration::from_secs(20), |
468 | 1 | ping_protocol: "ping".to_owned(), |
469 | 1 | ping_timeout: Duration::from_secs(20), |
470 | 1 | randomness_seed: [0; 32], |
471 | 1 | }; |
472 | 1 | |
473 | 1 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
474 | 1 | |
475 | 1 | let substream_id = connections.alice.add_request( |
476 | 1 | "test-request-protocol".to_owned(), |
477 | 1 | Some(b"request payload".to_vec()), |
478 | 1 | Duration::from_secs(5), |
479 | 1 | 1024, |
480 | 1 | (), |
481 | 1 | ); |
482 | 1 | |
483 | 1 | let (connections_update, event) = connections.run_until_event(); |
484 | 1 | connections = connections_update; |
485 | 1 | match event { |
486 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
487 | 1 | assert_eq!(protocol_name, "test-request-protocol"); |
488 | 1 | connections.bob.accept_inbound( |
489 | 1 | id, |
490 | 1 | InboundTy::Request { |
491 | 1 | request_max_size: Some(1024 * 1024), |
492 | 1 | }, |
493 | 1 | (), |
494 | 1 | ); |
495 | | } |
496 | 0 | _ev => unreachable!("{:?}", _ev), |
497 | | } |
498 | | |
499 | 1 | let (connections_update, event) = connections.run_until_event(); |
500 | 1 | connections = connections_update; |
501 | 1 | match event { |
502 | 1 | either::Right(Event::RequestIn { request, .. }) => { |
503 | 1 | assert_eq!(request, b"request payload"); |
504 | | // Don't answer. |
505 | | } |
506 | 0 | _ev => unreachable!("{:?}", _ev), |
507 | | } |
508 | | |
509 | 1 | connections.pass_time(Duration::from_secs(6)); |
510 | 1 | |
511 | 1 | let (_, event) = connections.run_until_event(); |
512 | 1 | match event { |
513 | 1 | either::Left(Event::Response { id, response, .. }) => { |
514 | 1 | assert_eq!(id, substream_id); |
515 | 1 | assert!(matches!0 (response, Err(RequestError::Timeout))); |
516 | | } |
517 | 0 | _ev => unreachable!("{:?}", _ev), |
518 | | } |
519 | 1 | } |
520 | | |
521 | | #[test] |
522 | 1 | fn outbound_substream_works() { |
523 | 1 | let config = Config { |
524 | 1 | first_out_ping: Duration::new(60, 0), |
525 | 1 | max_inbound_substreams: 64, |
526 | 1 | substreams_capacity: 16, |
527 | 1 | max_protocol_name_len: 128, |
528 | 1 | ping_interval: Duration::from_secs(20), |
529 | 1 | ping_protocol: "ping".to_owned(), |
530 | 1 | ping_timeout: Duration::from_secs(20), |
531 | 1 | randomness_seed: [0; 32], |
532 | 1 | }; |
533 | 1 | |
534 | 1 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
535 | 1 | |
536 | 1 | let substream_id = connections.alice.open_notifications_substream( |
537 | 1 | "test-notif-protocol".to_owned(), |
538 | 1 | b"hello".to_vec(), |
539 | 1 | 1024, |
540 | 1 | connections.now + Duration::from_secs(5), |
541 | 1 | (), |
542 | 1 | ); |
543 | 1 | |
544 | 1 | let (connections_update, event) = connections.run_until_event(); |
545 | 1 | connections = connections_update; |
546 | 1 | match event { |
547 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
548 | 1 | assert_eq!(protocol_name, "test-notif-protocol"); |
549 | 1 | connections.bob.accept_inbound( |
550 | 1 | id, |
551 | 1 | InboundTy::Notifications { |
552 | 1 | max_handshake_size: 1024, |
553 | 1 | }, |
554 | 1 | (), |
555 | 1 | ); |
556 | | } |
557 | 0 | _ev => unreachable!("{:?}", _ev), |
558 | | } |
559 | | |
560 | 1 | let (connections_update, event) = connections.run_until_event(); |
561 | 1 | connections = connections_update; |
562 | 1 | match event { |
563 | 1 | either::Right(Event::NotificationsInOpen { id, handshake }) => { |
564 | 1 | assert_eq!(handshake, b"hello"); |
565 | 1 | connections |
566 | 1 | .bob |
567 | 1 | .accept_in_notifications_substream(id, b"hello back".to_vec(), 4 * 1024); |
568 | | } |
569 | 0 | _ev => unreachable!("{:?}", _ev), |
570 | | } |
571 | | |
572 | 1 | let notifications_to_send = vec![ |
573 | 1 | b"notif 1".to_vec(), |
574 | 1 | b"notif 2".to_vec(), |
575 | 1 | b"notif 3".to_vec(), |
576 | 1 | ]; |
577 | 1 | let mut notifications_to_receive = notifications_to_send.clone(); |
578 | 1 | |
579 | 1 | let (connections_update, event) = connections.run_until_event(); |
580 | 1 | connections = connections_update; |
581 | 1 | match event { |
582 | | either::Left(Event::NotificationsOutResult { |
583 | 1 | id, |
584 | 1 | result: Ok(handshake), |
585 | 1 | }) => { |
586 | 1 | assert_eq!(id, substream_id); |
587 | 1 | assert_eq!(handshake, b"hello back"); |
588 | 4 | for notif3 in notifications_to_send { |
589 | 3 | connections.alice.write_notification_unbounded(id, notif); |
590 | 3 | } |
591 | | } |
592 | 0 | _ev => unreachable!("{:?}", _ev), |
593 | | } |
594 | | |
595 | 4 | while !notifications_to_receive.is_empty() { |
596 | 3 | let (connections_update, event) = connections.run_until_event(); |
597 | 3 | connections = connections_update; |
598 | 3 | match event { |
599 | 3 | either::Right(Event::NotificationIn { notification, .. }) => { |
600 | 3 | let pos = notifications_to_receive |
601 | 3 | .iter() |
602 | 3 | .position(|n| *n == notification) |
603 | 3 | .unwrap(); |
604 | 3 | notifications_to_receive.remove(pos); |
605 | 3 | } |
606 | 0 | _ev => unreachable!("{:?}", _ev), |
607 | | } |
608 | | } |
609 | 1 | } |
610 | | |
611 | | #[test] |
612 | 1 | fn outbound_substream_open_timeout() { |
613 | 1 | let config = Config { |
614 | 1 | first_out_ping: Duration::new(60, 0), |
615 | 1 | max_inbound_substreams: 64, |
616 | 1 | substreams_capacity: 16, |
617 | 1 | max_protocol_name_len: 128, |
618 | 1 | ping_interval: Duration::from_secs(20), |
619 | 1 | ping_protocol: "ping".to_owned(), |
620 | 1 | ping_timeout: Duration::from_secs(20), |
621 | 1 | randomness_seed: [0; 32], |
622 | 1 | }; |
623 | 1 | |
624 | 1 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
625 | 1 | |
626 | 1 | let substream_id = connections.alice.open_notifications_substream( |
627 | 1 | "test-notif-protocol".to_owned(), |
628 | 1 | b"hello".to_vec(), |
629 | 1 | 1024, |
630 | 1 | connections.now + Duration::from_secs(5), |
631 | 1 | (), |
632 | 1 | ); |
633 | 1 | |
634 | 1 | let (connections_update, event) = connections.run_until_event(); |
635 | 1 | connections = connections_update; |
636 | 1 | match event { |
637 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
638 | 1 | assert_eq!(protocol_name, "test-notif-protocol"); |
639 | 1 | connections.bob.accept_inbound( |
640 | 1 | id, |
641 | 1 | InboundTy::Notifications { |
642 | 1 | max_handshake_size: 1024, |
643 | 1 | }, |
644 | 1 | (), |
645 | 1 | ); |
646 | | } |
647 | 0 | _ev => unreachable!("{:?}", _ev), |
648 | | } |
649 | | |
650 | 1 | let (connections_update, event) = connections.run_until_event(); |
651 | 1 | connections = connections_update; |
652 | 1 | match event { |
653 | 1 | either::Right(Event::NotificationsInOpen { handshake, .. }) => { |
654 | 1 | assert_eq!(handshake, b"hello"); |
655 | | // Don't answer. |
656 | | } |
657 | 0 | _ev => unreachable!("{:?}", _ev), |
658 | | } |
659 | | |
660 | 1 | connections.pass_time(Duration::from_secs(10)); |
661 | 1 | |
662 | 1 | let (_, event) = connections.run_until_event(); |
663 | 1 | match event { |
664 | 1 | either::Left(Event::NotificationsOutResult { id, result, .. }) => { |
665 | 1 | assert_eq!(id, substream_id); |
666 | 1 | assert!(matches!0 (result, Err((NotificationsOutErr::Timeout, _)))); |
667 | | } |
668 | 0 | _ev => unreachable!("{:?}", _ev), |
669 | | } |
670 | 1 | } |
671 | | |
672 | | #[test] |
673 | 1 | fn outbound_substream_refuse() { |
674 | 1 | let config = Config { |
675 | 1 | first_out_ping: Duration::new(60, 0), |
676 | 1 | max_inbound_substreams: 64, |
677 | 1 | substreams_capacity: 16, |
678 | 1 | max_protocol_name_len: 128, |
679 | 1 | ping_interval: Duration::from_secs(20), |
680 | 1 | ping_protocol: "ping".to_owned(), |
681 | 1 | ping_timeout: Duration::from_secs(20), |
682 | 1 | randomness_seed: [0; 32], |
683 | 1 | }; |
684 | 1 | |
685 | 1 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
686 | 1 | |
687 | 1 | let substream_id = connections.alice.open_notifications_substream( |
688 | 1 | "test-notif-protocol".to_owned(), |
689 | 1 | b"hello".to_vec(), |
690 | 1 | 1024, |
691 | 1 | connections.now + Duration::from_secs(5), |
692 | 1 | (), |
693 | 1 | ); |
694 | 1 | |
695 | 1 | let (connections_update, event) = connections.run_until_event(); |
696 | 1 | connections = connections_update; |
697 | 1 | match event { |
698 | 1 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
699 | 1 | assert_eq!(protocol_name, "test-notif-protocol"); |
700 | 1 | connections.bob.accept_inbound( |
701 | 1 | id, |
702 | 1 | InboundTy::Notifications { |
703 | 1 | max_handshake_size: 1024, |
704 | 1 | }, |
705 | 1 | (), |
706 | 1 | ); |
707 | | } |
708 | 0 | _ev => unreachable!("{:?}", _ev), |
709 | | } |
710 | | |
711 | 1 | let (connections_update, event) = connections.run_until_event(); |
712 | 1 | connections = connections_update; |
713 | 1 | match event { |
714 | 1 | either::Right(Event::NotificationsInOpen { id, handshake }) => { |
715 | 1 | assert_eq!(handshake, b"hello"); |
716 | 1 | connections.bob.reject_in_notifications_substream(id); |
717 | | } |
718 | 0 | _ev => unreachable!("{:?}", _ev), |
719 | | } |
720 | | |
721 | 1 | let (_, event) = connections.run_until_event(); |
722 | 1 | match event { |
723 | | either::Left(Event::NotificationsOutResult { |
724 | 1 | id, |
725 | 1 | result: Err((NotificationsOutErr::RefusedHandshake, _)), |
726 | 1 | .. |
727 | 1 | }) => { |
728 | 1 | assert_eq!(id, substream_id); |
729 | | } |
730 | 0 | _ev => unreachable!("{:?}", _ev), |
731 | | } |
732 | 1 | } |
733 | | |
734 | | #[test] |
735 | | #[ignore] // TODO: un-ignore |
736 | 0 | fn outbound_substream_close_demanded() { |
737 | 0 | let config = Config { |
738 | 0 | first_out_ping: Duration::new(60, 0), |
739 | 0 | max_inbound_substreams: 64, |
740 | 0 | substreams_capacity: 16, |
741 | 0 | max_protocol_name_len: 128, |
742 | 0 | ping_interval: Duration::from_secs(20), |
743 | 0 | ping_protocol: "ping".to_owned(), |
744 | 0 | ping_timeout: Duration::from_secs(20), |
745 | 0 | randomness_seed: [0; 32], |
746 | 0 | }; |
747 | 0 |
|
748 | 0 | let mut connections = perform_handshake(256, 256, config.clone(), config); |
749 | 0 |
|
750 | 0 | let substream_id = connections.alice.open_notifications_substream( |
751 | 0 | "test-notif-protocol".to_owned(), |
752 | 0 | b"hello".to_vec(), |
753 | 0 | 1024, |
754 | 0 | connections.now + Duration::from_secs(5), |
755 | 0 | (), |
756 | 0 | ); |
757 | 0 |
|
758 | 0 | let (connections_update, event) = connections.run_until_event(); |
759 | 0 | connections = connections_update; |
760 | 0 | match event { |
761 | 0 | either::Right(Event::InboundNegotiated { id, protocol_name }) => { |
762 | 0 | assert_eq!(protocol_name, "test-notif-protocol"); |
763 | 0 | connections.bob.accept_inbound( |
764 | 0 | id, |
765 | 0 | InboundTy::Notifications { |
766 | 0 | max_handshake_size: 1024, |
767 | 0 | }, |
768 | 0 | (), |
769 | 0 | ); |
770 | | } |
771 | 0 | _ev => unreachable!("{:?}", _ev), |
772 | | } |
773 | | |
774 | 0 | let (connections_update, event) = connections.run_until_event(); |
775 | 0 | connections = connections_update; |
776 | 0 | match event { |
777 | 0 | either::Right(Event::NotificationsInOpen { id, handshake }) => { |
778 | 0 | assert_eq!(handshake, b"hello"); |
779 | 0 | connections |
780 | 0 | .bob |
781 | 0 | .accept_in_notifications_substream(id, b"hello back".to_vec(), 4 * 1024); |
782 | | } |
783 | 0 | _ev => unreachable!("{:?}", _ev), |
784 | | } |
785 | | |
786 | 0 | let (connections_update, event) = connections.run_until_event(); |
787 | 0 | connections = connections_update; |
788 | 0 | match event { |
789 | | either::Left(Event::NotificationsOutResult { |
790 | 0 | id, |
791 | 0 | result: Ok(handshake), |
792 | 0 | }) => { |
793 | 0 | assert_eq!(id, substream_id); |
794 | 0 | assert_eq!(handshake, b"hello back"); |
795 | 0 | connections |
796 | 0 | .alice |
797 | 0 | .write_notification_unbounded(id, b"notif".to_vec()); |
798 | | } |
799 | 0 | _ev => unreachable!("{:?}", _ev), |
800 | | } |
801 | | |
802 | 0 | let (connections_update, event) = connections.run_until_event(); |
803 | 0 | connections = connections_update; |
804 | 0 | match event { |
805 | 0 | either::Right(Event::NotificationIn { id, notification }) => { |
806 | 0 | assert_eq!(notification, b"notif"); |
807 | 0 | connections |
808 | 0 | .bob |
809 | 0 | .close_in_notifications_substream(id, Duration::from_secs(100)) |
810 | | } |
811 | 0 | _ev => unreachable!("{:?}", _ev), |
812 | | } |
813 | | |
814 | 0 | let (connections_update, event) = connections.run_until_event(); |
815 | 0 | connections = connections_update; |
816 | 0 | match event { |
817 | 0 | either::Left(Event::NotificationsOutCloseDemanded { id }) => { |
818 | 0 | connections.alice.close_out_notifications_substream(id); |
819 | 0 | } |
820 | 0 | _ev => unreachable!("{:?}", _ev), |
821 | | } |
822 | | |
823 | 0 | let (_, event) = connections.run_until_event(); |
824 | 0 | match event { |
825 | | either::Right(Event::NotificationsInClose { |
826 | | outcome: Ok(()), .. |
827 | 0 | }) => {} |
828 | 0 | _ev => unreachable!("{:?}", _ev), |
829 | | } |
830 | 0 | } |
831 | | |
832 | | // TODO: more tests |