Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/network_service.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Background network service.
19
//!
20
//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21
//! Importantly, its design is oriented towards the particular use case of the full node.
22
//!
23
//! The [`NetworkService`] spawns one background task (using the [`Config::tasks_executor`]) for
24
//! each active TCP socket, plus one for each TCP listening socket. Messages are exchanged between
25
//! the service and these background tasks.
26
27
// TODO: doc
28
// TODO: re-review this once finished
29
30
use crate::{database_thread, jaeger_service, LogCallback, LogLevel};
31
32
use core::{cmp, future::Future, mem, pin::Pin, task::Poll, time::Duration};
33
use futures_channel::oneshot;
34
use futures_lite::FutureExt as _;
35
use futures_util::stream::{self, SelectAll};
36
use hashbrown::HashMap;
37
use smol::{
38
    channel, future,
39
    lock::Mutex,
40
    net::TcpStream,
41
    stream::{Stream, StreamExt as _},
42
};
43
use smoldot::{
44
    database::full_sqlite,
45
    header,
46
    informant::{BytesDisplay, HashDisplay},
47
    libp2p::{
48
        connection,
49
        multiaddr::{self, Multiaddr, Protocol},
50
        peer_id::{self, PeerId},
51
    },
52
    network::{basic_peering_strategy, codec, service},
53
};
54
use std::{
55
    io,
56
    net::{IpAddr, SocketAddr},
57
    sync::Arc,
58
    time::Instant,
59
    vec,
60
};
61
62
pub use smoldot::network::service::ChainId;
63
64
mod tasks;
65
66
/// Configuration for a [`NetworkService`].
67
pub struct Config {
68
    /// Closure that spawns background tasks.
69
    pub tasks_executor: Box<dyn FnMut(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
70
71
    /// Function called in order to notify of something.
72
    pub log_callback: Arc<dyn LogCallback + Send + Sync>,
73
74
    /// Number of event receivers returned by [`NetworkService::new`].
75
    pub num_events_receivers: usize,
76
77
    /// Addresses to listen for incoming connections.
78
    pub listen_addresses: Vec<Multiaddr>,
79
80
    /// List of block chains to be connected to.
81
    pub chains: Vec<ChainConfig>,
82
83
    /// Value sent back for the agent version when receiving an identification request.
84
    pub identify_agent_version: String,
85
86
    /// Key used for the encryption layer.
87
    /// This is a Noise static key, according to the Noise specification.
88
    /// Signed using the actual libp2p key.
89
    pub noise_key: connection::NoiseKey,
90
91
    /// Service to use to report traces.
92
    pub jaeger_service: Arc<jaeger_service::JaegerService>,
93
}
94
95
/// Configuration for one chain.
96
pub struct ChainConfig {
97
    /// Name of the chain to use for logging purposes.
98
    pub log_name: String,
99
100
    /// List of node identities and addresses that are known to belong to the chain's peer-to-pee
101
    /// network.
102
    pub bootstrap_nodes: Vec<(PeerId, Multiaddr)>,
103
104
    /// Database to use to read blocks from when answering requests.
105
    pub database: Arc<database_thread::DatabaseThread>,
106
107
    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
108
    /// the chains match.
109
    pub genesis_block_hash: [u8; 32],
110
111
    /// Number and hash of the current best block. Can later be updated with // TODO: which function?
112
    pub best_block: (u64, [u8; 32]),
113
114
    /// Optional identifier to insert into the networking protocol names. Used to differentiate
115
    /// between chains with the same genesis hash.
116
    pub fork_id: Option<String>,
117
118
    /// Number of bytes of the block number in the networking protocol.
119
    pub block_number_bytes: usize,
120
121
    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
122
    /// number of the finalized block at the time of the initialization.
123
    pub grandpa_protocol_finalized_block_height: Option<u64>,
124
125
    /// Maximum number of peers that have slots attributed to them.
126
    pub max_slots: usize,
127
128
    /// Maximum number of peers that have gossip links open but without having slots attributed
129
    /// to them.
130
    pub max_in_peers: usize,
131
}
132
133
/// Event generated by the events reporters returned by [`NetworkService::new`].
134
#[derive(Debug, Clone)]
135
pub enum Event {
136
    Connected {
137
        chain_id: ChainId,
138
        peer_id: PeerId,
139
        best_block_number: u64,
140
        best_block_hash: [u8; 32],
141
    },
142
    Disconnected {
143
        chain_id: ChainId,
144
        peer_id: PeerId,
145
    },
146
    BlockAnnounce {
147
        chain_id: ChainId,
148
        peer_id: PeerId,
149
        scale_encoded_header: Vec<u8>,
150
        is_best: bool,
151
    },
152
    GrandpaNeighborPacket {
153
        chain_id: ChainId,
154
        peer_id: PeerId,
155
        finalized_block_height: u64,
156
    },
157
}
158
159
pub struct NetworkService {
160
    /// Identity of the local node.
161
    local_peer_id: PeerId,
162
163
    /// Service to use to report traces.
164
    // TODO: unused
165
    _jaeger_service: Arc<jaeger_service::JaegerService>,
166
167
    /// Channel to send messages to the background task.
168
    to_background_tx: Mutex<channel::Sender<ToBackground>>,
169
170
    /// Name of all the chains that have been registered, for logging purposes.
171
    chain_names: hashbrown::HashMap<ChainId, String, fnv::FnvBuildHasher>,
172
}
173
174
enum ToBackground {
175
    ForegroundDisconnectAndBan {
176
        peer_id: PeerId,
177
        chain_id: ChainId,
178
        severity: BanSeverity,
179
        reason: &'static str,
180
    },
181
    ForegroundAnnounceBlock {
182
        target: PeerId,
183
        chain_id: ChainId,
184
        scale_encoded_header: Vec<u8>,
185
        is_best: bool,
186
        result_tx: oneshot::Sender<Result<(), service::QueueNotificationError>>,
187
    },
188
    ForegroundSetLocalBestBlock {
189
        chain_id: ChainId,
190
        best_hash: [u8; 32],
191
        best_number: u64,
192
    },
193
    ForegroundBlocksRequest {
194
        target: PeerId,
195
        chain_id: ChainId,
196
        config: codec::BlocksRequestConfig,
197
        result_tx: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
198
    },
199
    ForegroundWarpSyncRequest {
200
        target: PeerId,
201
        chain_id: ChainId,
202
        begin_hash: [u8; 32],
203
        result_tx:
204
            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
205
    },
206
    ForegroundStorageProofRequest {
207
        target: PeerId,
208
        chain_id: ChainId,
209
        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
210
        result_tx: oneshot::Sender<Result<service::EncodedMerkleProof, ()>>,
211
    },
212
    ForegroundCallProofRequest {
213
        target: PeerId, // TODO: takes by value because of futures longevity issue
214
        chain_id: ChainId,
215
        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
216
        result_tx: oneshot::Sender<Result<service::EncodedMerkleProof, ()>>,
217
    },
218
    ForegroundGetNumConnections {
219
        result_tx: oneshot::Sender<usize>,
220
    },
221
    ForegroundGetNumPeers {
222
        chain_id: ChainId,
223
        result_tx: oneshot::Sender<usize>,
224
    },
225
    ForegroundGetNumTotalPeers {
226
        result_tx: oneshot::Sender<usize>,
227
    },
228
}
229
230
struct Inner {
231
    /// Value provided through [`Config::identify_agent_version`].
232
    identify_agent_version: String,
233
234
    /// Sending events through the public API.
235
    ///
236
    /// Contains either senders, or a `Future` that is currently sending an event and will yield
237
    /// the senders back once it is finished.
238
    event_senders: either::Either<
239
        Vec<channel::Sender<Event>>,
240
        Pin<Box<dyn Future<Output = Vec<channel::Sender<Event>>> + Send>>,
241
    >,
242
243
    /// Event about to be sent on the senders of [`Inner::event_senders`].
244
    event_pending_send: Option<Event>,
245
246
    /// Identity of the local node.
247
    noise_key: service::NoiseKey,
248
249
    /// Identity of the local node. Can be derived from [`Inner::noise_key`].
250
    local_peer_id: PeerId,
251
252
    /// Service to use to report traces.
253
    jaeger_service: Arc<jaeger_service::JaegerService>,
254
255
    /// Data structure holding the entire state of the networking.
256
    network:
257
        service::ChainNetwork<Chain, channel::Sender<service::CoordinatorToConnection>, Instant>,
258
259
    /// Data structure holding the addresses and assigned slots.
260
    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, Instant>,
261
262
    /// Current number of outgoing connection attempts.
263
    ///
264
    /// This counter is used to limit the number of simultaneous connection attempts, as some
265
    /// ISPs/cloud providers don't like seeing too many dialing connections at the same time.
266
    num_pending_out_attempts: usize,
267
268
    /// Stream of incoming connections.
269
    incoming_connections: SelectAll<Pin<Box<dyn Stream<Item = (TcpStream, SocketAddr)> + Send>>>,
270
271
    /// See [`Config::tasks_executor`].
272
    tasks_executor: Box<dyn FnMut(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>,
273
274
    /// See [`Config::log_callback`].
275
    log_callback: Arc<dyn LogCallback + Send + Sync>,
276
277
    /// Channel for the frontend to send messages to the background task.
278
    to_background_rx: Pin<Box<channel::Receiver<ToBackground>>>,
279
280
    /// Channel where connections send messages destined to the coordinator.
281
    from_connections_rx: Pin<
282
        Box<
283
            channel::Receiver<(
284
                service::ConnectionId,
285
                Option<service::ConnectionToCoordinator>,
286
            )>,
287
        >,
288
    >,
289
290
    /// Sending side of [`Inner::from_connections_rx`].
291
    from_connections_tx: channel::Sender<(
292
        service::ConnectionId,
293
        Option<service::ConnectionToCoordinator>,
294
    )>,
295
296
    /// List of all block requests that have been started but not finished yet.
297
    blocks_requests: HashMap<
298
        service::SubstreamId,
299
        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
300
        fnv::FnvBuildHasher,
301
    >,
302
303
    /// List of all warp sync requests that have been started but not finished yet.
304
    warp_sync_requests: HashMap<
305
        service::SubstreamId,
306
        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
307
        fnv::FnvBuildHasher,
308
    >,
309
310
    /// List of all storage requests that have been started but not finished yet.
311
    storage_requests: HashMap<
312
        service::SubstreamId,
313
        oneshot::Sender<Result<service::EncodedMerkleProof, ()>>,
314
        fnv::FnvBuildHasher,
315
    >,
316
317
    /// List of all call proof requests that have been started but not finished yet.
318
    call_proof_requests: HashMap<
319
        service::SubstreamId,
320
        oneshot::Sender<Result<service::EncodedMerkleProof, ()>>,
321
        fnv::FnvBuildHasher,
322
    >,
323
324
    /// When to start the next discovery process.
325
    next_discovery: smol::Timer,
326
327
    /// Time between [`Inner::next_discovery`] and the follow-up discovery.
328
    next_discovery_period: Duration,
329
}
330
331
/// Extra information of a chain.
332
struct Chain {
333
    /// Name of the chain to use for logging purposes.
334
    log_name: String,
335
336
    /// How to access data to answer requests from the remotes.
337
    database: Arc<database_thread::DatabaseThread>,
338
339
    /// Maximum number of peers that have slots attributed to them.
340
    max_slots: usize,
341
342
    /// Maximum number of peers that have gossip links open but without having slots attributed
343
    /// to them.
344
    max_in_peers: usize,
345
}
346
347
/// Severity of a ban. See [`NetworkService::ban_and_disconnect`].
348
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
349
pub enum BanSeverity {
350
    Low,
351
    High,
352
}
353
354
impl NetworkService {
355
    /// Initializes the network service with the given configuration.
356
21
    pub async fn new(
357
21
        config: Config,
358
21
    ) -> Result<
359
21
        (
360
21
            Arc<Self>,
361
21
            Vec<ChainId>,
362
21
            Vec<Pin<Box<dyn Stream<Item = Event> + Send>>>,
363
21
        ),
364
21
        InitError,
365
21
    > {
_RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService3new
Line
Count
Source
356
21
    pub async fn new(
357
21
        config: Config,
358
21
    ) -> Result<
359
21
        (
360
21
            Arc<Self>,
361
21
            Vec<ChainId>,
362
21
            Vec<Pin<Box<dyn Stream<Item = Event> + Send>>>,
363
21
        ),
364
21
        InitError,
365
21
    > {
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService3new
366
21
        let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers)
367
42
            .map(|_| channel::bounded(16))
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
367
4
            .map(|_| channel::bounded(16))
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00B8_
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CscDgN54JpMGG_6author
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
367
38
            .map(|_| channel::bounded(16))
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00B8_
368
21
            .unzip();
369
21
370
21
        let mut network = service::ChainNetwork::new(service::Config {
371
21
            chains_capacity: config.chains.len(),
372
21
            connections_capacity: 100, // TODO: ?
373
21
            handshake_timeout: Duration::from_secs(8),
374
21
            randomness_seed: rand::random(),
375
21
        });
376
21
377
21
        let mut peering_strategy =
378
21
            basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config {
379
21
                randomness_seed: rand::random(),
380
21
                peers_capacity: 200, // TODO: ?
381
21
                chains_capacity: config.chains.len(),
382
21
            });
383
21
384
21
        let mut chain_names =
385
21
            hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default());
386
387
42
        for 
chain21
in config.chains {
388
21
            let chain_id = network
389
21
                .add_chain(service::ChainConfig {
390
21
                    fork_id: chain.fork_id.clone(),
391
21
                    block_number_bytes: chain.block_number_bytes,
392
21
                    best_hash: chain.best_block.1,
393
21
                    best_number: chain.best_block.0,
394
21
                    genesis_hash: chain.genesis_block_hash,
395
21
                    role: codec::Role::Full,
396
21
                    grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map(
397
21
                        // TODO: dummy values
398
21
                        |commit_finalized_height| service::GrandpaState {
399
21
                            commit_finalized_height,
400
21
                            round_number: 1,
401
21
                            set_id: 0,
402
21
                        },
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
398
2
                        |commit_finalized_height| service::GrandpaState {
399
2
                            commit_finalized_height,
400
2
                            round_number: 1,
401
2
                            set_id: 0,
402
2
                        },
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0B8_
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CscDgN54JpMGG_6author
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
398
19
                        |commit_finalized_height| service::GrandpaState {
399
19
                            commit_finalized_height,
400
19
                            round_number: 1,
401
19
                            set_id: 0,
402
19
                        },
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0B8_
403
21
                    ),
404
21
                    allow_inbound_block_requests: true,
405
21
                    user_data: Chain {
406
21
                        log_name: chain.log_name.clone(),
407
21
                        database: chain.database,
408
21
                        max_in_peers: chain.max_in_peers,
409
21
                        max_slots: chain.max_slots,
410
21
                    },
411
21
                })
412
21
                .unwrap(); // TODO: don't unwrap?
413
414
21
            for (
peer_id, addr0
) in chain.bootstrap_nodes {
415
0
                // Note that we must call this function before `insert_address`, as documented
416
0
                // in `basic_peering_strategy`.
417
0
                peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX);
418
0
                peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX);
419
0
            }
420
421
21
            chain_names.insert(chain_id, chain.log_name);
422
        }
423
424
21
        let (to_background_tx, to_background_rx) = channel::bounded(16);
425
21
        let (from_connections_tx, from_connections_rx) = channel::bounded(64);
426
21
427
21
        let local_peer_id =
428
21
            peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key())
429
21
                .into_peer_id();
430
21
431
21
        // For each listening address in the configuration, create a background task dedicated to
432
21
        // listening on that address.
433
21
        let mut incoming_connections = SelectAll::new();
434
21
        for 
listen_address0
in config.listen_addresses {
435
0
            // Try to parse the requested address and create the corresponding listening socket.
436
0
            let tcp_listener: smol::net::TcpListener = {
437
0
                let addr = {
438
0
                    let mut iter = listen_address.iter();
439
0
                    let proto1 = iter.next();
440
0
                    let proto2 = iter.next();
441
0
                    let proto3 = iter.next();
442
0
                    match (proto1, proto2, proto3) {
443
0
                        (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => {
444
0
                            Some(SocketAddr::from((ip, port)))
445
                        }
446
0
                        (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => {
447
0
                            Some(SocketAddr::from((ip, port)))
448
                        }
449
0
                        _ => None,
450
                    }
451
                };
452
453
0
                if let Some(addr) = addr {
454
0
                    match smol::net::TcpListener::bind(addr).await {
455
0
                        Ok(l) => l,
456
0
                        Err(err) => {
457
0
                            return Err(InitError::ListenerIo(listen_address, err));
458
                        }
459
                    }
460
                } else {
461
                    // TODO: support WebSocket server
462
0
                    return Err(InitError::BadListenMultiaddr(listen_address));
463
                }
464
            };
465
466
            // Add a task dedicated to this listener.
467
0
            let log_callback = config.log_callback.clone();
468
0
            incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| {
469
0
                let log_callback = log_callback.clone();
470
0
                async move {
471
                    loop {
472
0
                        match tcp_listener.accept().await {
473
0
                            Ok((socket, socket_addr)) => {
474
0
                                break Some(((socket, socket_addr), tcp_listener))
475
                            }
476
0
                            Err(error) => {
477
0
                                // Errors here can happen if the accept failed, for example
478
0
                                // if no file descriptor is available.
479
0
                                // A wait is added in order to avoid having a busy-loop
480
0
                                // failing to accept connections.
481
0
                                log_callback.log(
482
0
                                    LogLevel::Warn,
483
0
                                    format!("tcp-accept-error; error={}", error),
484
0
                                );
485
0
                                smol::Timer::after(Duration::from_secs(2)).await;
486
                            }
487
                        }
488
                    }
489
0
                }
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00Ba_
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00Ba_
490
0
            })) as Pin<Box<_>>);
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0B8_
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0B8_
491
        }
492
493
        // Initialize the inner network service.
494
21
        run(Inner {
495
21
            local_peer_id: local_peer_id.clone(),
496
21
            identify_agent_version: config.identify_agent_version,
497
21
            event_senders: either::Left(event_senders),
498
21
            event_pending_send: None,
499
21
            num_pending_out_attempts: 0,
500
21
            to_background_rx: Box::pin(to_background_rx),
501
21
            from_connections_rx: Box::pin(from_connections_rx),
502
21
            from_connections_tx,
503
21
            tasks_executor: config.tasks_executor,
504
21
            log_callback: config.log_callback,
505
21
            network,
506
21
            noise_key: config.noise_key,
507
21
            peering_strategy,
508
21
            blocks_requests: hashbrown::HashMap::with_capacity_and_hasher(
509
21
                50, // TODO: ?
510
21
                Default::default(),
511
21
            ),
512
21
            warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher(
513
21
                2, // TODO: ?
514
21
                Default::default(),
515
21
            ),
516
21
            storage_requests: hashbrown::HashMap::with_capacity_and_hasher(
517
21
                5, // TODO: ?
518
21
                Default::default(),
519
21
            ),
520
21
            call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher(
521
21
                5, // TODO: ?
522
21
                Default::default(),
523
21
            ),
524
21
            jaeger_service: config.jaeger_service.clone(),
525
21
            next_discovery: smol::Timer::after(Duration::from_secs(1)),
526
21
            next_discovery_period: Duration::from_secs(1),
527
21
            incoming_connections,
528
21
        });
529
21
530
21
        // Build the final network service.
531
21
        let network_service = Arc::new(NetworkService {
532
21
            local_peer_id,
533
21
            chain_names,
534
21
            _jaeger_service: config.jaeger_service,
535
21
            to_background_tx: Mutex::new(to_background_tx),
536
21
        });
537
21
538
21
        // Adjust the receivers to keep the `network_service` alive.
539
21
        // TODO: no, hacky
540
21
        let receivers = event_receivers
541
21
            .into_iter()
542
42
            .map(|rx| {
543
42
                let mut network_service = Some(network_service.clone());
544
42
                rx.chain(smol::stream::poll_fn(move |_| {
545
0
                    drop(network_service.take());
546
0
                    Poll::Ready(None)
547
42
                }))
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00Ba_
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00Ba_
548
42
                .boxed()
549
42
            })
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
542
4
            .map(|rx| {
543
4
                let mut network_service = Some(network_service.clone());
544
4
                rx.chain(smol::stream::poll_fn(move |_| {
545
                    drop(network_service.take());
546
                    Poll::Ready(None)
547
4
                }))
548
4
                .boxed()
549
4
            })
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0B8_
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CscDgN54JpMGG_6author
_RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
542
38
            .map(|rx| {
543
38
                let mut network_service = Some(network_service.clone());
544
38
                rx.chain(smol::stream::poll_fn(move |_| {
545
                    drop(network_service.take());
546
                    Poll::Ready(None)
547
38
                }))
548
38
                .boxed()
549
38
            })
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0B8_
550
21
            .collect();
551
21
552
21
        let chain_ids = network_service.chain_names.keys().cloned().collect();
553
21
        Ok((network_service, chain_ids, receivers))
554
21
    }
_RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
365
2
    > {
366
2
        let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers)
367
2
            .map(|_| channel::bounded(16))
368
2
            .unzip();
369
2
370
2
        let mut network = service::ChainNetwork::new(service::Config {
371
2
            chains_capacity: config.chains.len(),
372
2
            connections_capacity: 100, // TODO: ?
373
2
            handshake_timeout: Duration::from_secs(8),
374
2
            randomness_seed: rand::random(),
375
2
        });
376
2
377
2
        let mut peering_strategy =
378
2
            basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config {
379
2
                randomness_seed: rand::random(),
380
2
                peers_capacity: 200, // TODO: ?
381
2
                chains_capacity: config.chains.len(),
382
2
            });
383
2
384
2
        let mut chain_names =
385
2
            hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default());
386
387
4
        for 
chain2
in config.chains {
388
2
            let chain_id = network
389
2
                .add_chain(service::ChainConfig {
390
2
                    fork_id: chain.fork_id.clone(),
391
2
                    block_number_bytes: chain.block_number_bytes,
392
2
                    best_hash: chain.best_block.1,
393
2
                    best_number: chain.best_block.0,
394
2
                    genesis_hash: chain.genesis_block_hash,
395
2
                    role: codec::Role::Full,
396
2
                    grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map(
397
2
                        // TODO: dummy values
398
2
                        |commit_finalized_height| service::GrandpaState {
399
                            commit_finalized_height,
400
                            round_number: 1,
401
                            set_id: 0,
402
2
                        },
403
2
                    ),
404
2
                    allow_inbound_block_requests: true,
405
2
                    user_data: Chain {
406
2
                        log_name: chain.log_name.clone(),
407
2
                        database: chain.database,
408
2
                        max_in_peers: chain.max_in_peers,
409
2
                        max_slots: chain.max_slots,
410
2
                    },
411
2
                })
412
2
                .unwrap(); // TODO: don't unwrap?
413
414
2
            for (
peer_id, addr0
) in chain.bootstrap_nodes {
415
0
                // Note that we must call this function before `insert_address`, as documented
416
0
                // in `basic_peering_strategy`.
417
0
                peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX);
418
0
                peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX);
419
0
            }
420
421
2
            chain_names.insert(chain_id, chain.log_name);
422
        }
423
424
2
        let (to_background_tx, to_background_rx) = channel::bounded(16);
425
2
        let (from_connections_tx, from_connections_rx) = channel::bounded(64);
426
2
427
2
        let local_peer_id =
428
2
            peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key())
429
2
                .into_peer_id();
430
2
431
2
        // For each listening address in the configuration, create a background task dedicated to
432
2
        // listening on that address.
433
2
        let mut incoming_connections = SelectAll::new();
434
2
        for 
listen_address0
in config.listen_addresses {
435
0
            // Try to parse the requested address and create the corresponding listening socket.
436
0
            let tcp_listener: smol::net::TcpListener = {
437
0
                let addr = {
438
0
                    let mut iter = listen_address.iter();
439
0
                    let proto1 = iter.next();
440
0
                    let proto2 = iter.next();
441
0
                    let proto3 = iter.next();
442
0
                    match (proto1, proto2, proto3) {
443
0
                        (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => {
444
0
                            Some(SocketAddr::from((ip, port)))
445
                        }
446
0
                        (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => {
447
0
                            Some(SocketAddr::from((ip, port)))
448
                        }
449
0
                        _ => None,
450
                    }
451
                };
452
453
0
                if let Some(addr) = addr {
454
0
                    match smol::net::TcpListener::bind(addr).await {
455
0
                        Ok(l) => l,
456
0
                        Err(err) => {
457
0
                            return Err(InitError::ListenerIo(listen_address, err));
458
                        }
459
                    }
460
                } else {
461
                    // TODO: support WebSocket server
462
0
                    return Err(InitError::BadListenMultiaddr(listen_address));
463
                }
464
            };
465
466
            // Add a task dedicated to this listener.
467
0
            let log_callback = config.log_callback.clone();
468
0
            incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| {
469
                let log_callback = log_callback.clone();
470
                async move {
471
                    loop {
472
                        match tcp_listener.accept().await {
473
                            Ok((socket, socket_addr)) => {
474
                                break Some(((socket, socket_addr), tcp_listener))
475
                            }
476
                            Err(error) => {
477
                                // Errors here can happen if the accept failed, for example
478
                                // if no file descriptor is available.
479
                                // A wait is added in order to avoid having a busy-loop
480
                                // failing to accept connections.
481
                                log_callback.log(
482
                                    LogLevel::Warn,
483
                                    format!("tcp-accept-error; error={}", error),
484
                                );
485
                                smol::Timer::after(Duration::from_secs(2)).await;
486
                            }
487
                        }
488
                    }
489
                }
490
0
            })) as Pin<Box<_>>);
491
        }
492
493
        // Initialize the inner network service.
494
2
        run(Inner {
495
2
            local_peer_id: local_peer_id.clone(),
496
2
            identify_agent_version: config.identify_agent_version,
497
2
            event_senders: either::Left(event_senders),
498
2
            event_pending_send: None,
499
2
            num_pending_out_attempts: 0,
500
2
            to_background_rx: Box::pin(to_background_rx),
501
2
            from_connections_rx: Box::pin(from_connections_rx),
502
2
            from_connections_tx,
503
2
            tasks_executor: config.tasks_executor,
504
2
            log_callback: config.log_callback,
505
2
            network,
506
2
            noise_key: config.noise_key,
507
2
            peering_strategy,
508
2
            blocks_requests: hashbrown::HashMap::with_capacity_and_hasher(
509
2
                50, // TODO: ?
510
2
                Default::default(),
511
2
            ),
512
2
            warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher(
513
2
                2, // TODO: ?
514
2
                Default::default(),
515
2
            ),
516
2
            storage_requests: hashbrown::HashMap::with_capacity_and_hasher(
517
2
                5, // TODO: ?
518
2
                Default::default(),
519
2
            ),
520
2
            call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher(
521
2
                5, // TODO: ?
522
2
                Default::default(),
523
2
            ),
524
2
            jaeger_service: config.jaeger_service.clone(),
525
2
            next_discovery: smol::Timer::after(Duration::from_secs(1)),
526
2
            next_discovery_period: Duration::from_secs(1),
527
2
            incoming_connections,
528
2
        });
529
2
530
2
        // Build the final network service.
531
2
        let network_service = Arc::new(NetworkService {
532
2
            local_peer_id,
533
2
            chain_names,
534
2
            _jaeger_service: config.jaeger_service,
535
2
            to_background_tx: Mutex::new(to_background_tx),
536
2
        });
537
2
538
2
        // Adjust the receivers to keep the `network_service` alive.
539
2
        // TODO: no, hacky
540
2
        let receivers = event_receivers
541
2
            .into_iter()
542
2
            .map(|rx| {
543
                let mut network_service = Some(network_service.clone());
544
                rx.chain(smol::stream::poll_fn(move |_| {
545
                    drop(network_service.take());
546
                    Poll::Ready(None)
547
                }))
548
                .boxed()
549
2
            })
550
2
            .collect();
551
2
552
2
        let chain_ids = network_service.chain_names.keys().cloned().collect();
553
2
        Ok((network_service, chain_ids, receivers))
554
2
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CscDgN54JpMGG_6author
_RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
365
19
    > {
366
19
        let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers)
367
19
            .map(|_| channel::bounded(16))
368
19
            .unzip();
369
19
370
19
        let mut network = service::ChainNetwork::new(service::Config {
371
19
            chains_capacity: config.chains.len(),
372
19
            connections_capacity: 100, // TODO: ?
373
19
            handshake_timeout: Duration::from_secs(8),
374
19
            randomness_seed: rand::random(),
375
19
        });
376
19
377
19
        let mut peering_strategy =
378
19
            basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config {
379
19
                randomness_seed: rand::random(),
380
19
                peers_capacity: 200, // TODO: ?
381
19
                chains_capacity: config.chains.len(),
382
19
            });
383
19
384
19
        let mut chain_names =
385
19
            hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default());
386
387
38
        for 
chain19
in config.chains {
388
19
            let chain_id = network
389
19
                .add_chain(service::ChainConfig {
390
19
                    fork_id: chain.fork_id.clone(),
391
19
                    block_number_bytes: chain.block_number_bytes,
392
19
                    best_hash: chain.best_block.1,
393
19
                    best_number: chain.best_block.0,
394
19
                    genesis_hash: chain.genesis_block_hash,
395
19
                    role: codec::Role::Full,
396
19
                    grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map(
397
19
                        // TODO: dummy values
398
19
                        |commit_finalized_height| service::GrandpaState {
399
                            commit_finalized_height,
400
                            round_number: 1,
401
                            set_id: 0,
402
19
                        },
403
19
                    ),
404
19
                    allow_inbound_block_requests: true,
405
19
                    user_data: Chain {
406
19
                        log_name: chain.log_name.clone(),
407
19
                        database: chain.database,
408
19
                        max_in_peers: chain.max_in_peers,
409
19
                        max_slots: chain.max_slots,
410
19
                    },
411
19
                })
412
19
                .unwrap(); // TODO: don't unwrap?
413
414
19
            for (
peer_id, addr0
) in chain.bootstrap_nodes {
415
0
                // Note that we must call this function before `insert_address`, as documented
416
0
                // in `basic_peering_strategy`.
417
0
                peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX);
418
0
                peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX);
419
0
            }
420
421
19
            chain_names.insert(chain_id, chain.log_name);
422
        }
423
424
19
        let (to_background_tx, to_background_rx) = channel::bounded(16);
425
19
        let (from_connections_tx, from_connections_rx) = channel::bounded(64);
426
19
427
19
        let local_peer_id =
428
19
            peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key())
429
19
                .into_peer_id();
430
19
431
19
        // For each listening address in the configuration, create a background task dedicated to
432
19
        // listening on that address.
433
19
        let mut incoming_connections = SelectAll::new();
434
19
        for 
listen_address0
in config.listen_addresses {
435
0
            // Try to parse the requested address and create the corresponding listening socket.
436
0
            let tcp_listener: smol::net::TcpListener = {
437
0
                let addr = {
438
0
                    let mut iter = listen_address.iter();
439
0
                    let proto1 = iter.next();
440
0
                    let proto2 = iter.next();
441
0
                    let proto3 = iter.next();
442
0
                    match (proto1, proto2, proto3) {
443
0
                        (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => {
444
0
                            Some(SocketAddr::from((ip, port)))
445
                        }
446
0
                        (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => {
447
0
                            Some(SocketAddr::from((ip, port)))
448
                        }
449
0
                        _ => None,
450
                    }
451
                };
452
453
0
                if let Some(addr) = addr {
454
0
                    match smol::net::TcpListener::bind(addr).await {
455
0
                        Ok(l) => l,
456
0
                        Err(err) => {
457
0
                            return Err(InitError::ListenerIo(listen_address, err));
458
                        }
459
                    }
460
                } else {
461
                    // TODO: support WebSocket server
462
0
                    return Err(InitError::BadListenMultiaddr(listen_address));
463
                }
464
            };
465
466
            // Add a task dedicated to this listener.
467
0
            let log_callback = config.log_callback.clone();
468
0
            incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| {
469
                let log_callback = log_callback.clone();
470
                async move {
471
                    loop {
472
                        match tcp_listener.accept().await {
473
                            Ok((socket, socket_addr)) => {
474
                                break Some(((socket, socket_addr), tcp_listener))
475
                            }
476
                            Err(error) => {
477
                                // Errors here can happen if the accept failed, for example
478
                                // if no file descriptor is available.
479
                                // A wait is added in order to avoid having a busy-loop
480
                                // failing to accept connections.
481
                                log_callback.log(
482
                                    LogLevel::Warn,
483
                                    format!("tcp-accept-error; error={}", error),
484
                                );
485
                                smol::Timer::after(Duration::from_secs(2)).await;
486
                            }
487
                        }
488
                    }
489
                }
490
0
            })) as Pin<Box<_>>);
491
        }
492
493
        // Initialize the inner network service.
494
19
        run(Inner {
495
19
            local_peer_id: local_peer_id.clone(),
496
19
            identify_agent_version: config.identify_agent_version,
497
19
            event_senders: either::Left(event_senders),
498
19
            event_pending_send: None,
499
19
            num_pending_out_attempts: 0,
500
19
            to_background_rx: Box::pin(to_background_rx),
501
19
            from_connections_rx: Box::pin(from_connections_rx),
502
19
            from_connections_tx,
503
19
            tasks_executor: config.tasks_executor,
504
19
            log_callback: config.log_callback,
505
19
            network,
506
19
            noise_key: config.noise_key,
507
19
            peering_strategy,
508
19
            blocks_requests: hashbrown::HashMap::with_capacity_and_hasher(
509
19
                50, // TODO: ?
510
19
                Default::default(),
511
19
            ),
512
19
            warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher(
513
19
                2, // TODO: ?
514
19
                Default::default(),
515
19
            ),
516
19
            storage_requests: hashbrown::HashMap::with_capacity_and_hasher(
517
19
                5, // TODO: ?
518
19
                Default::default(),
519
19
            ),
520
19
            call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher(
521
19
                5, // TODO: ?
522
19
                Default::default(),
523
19
            ),
524
19
            jaeger_service: config.jaeger_service.clone(),
525
19
            next_discovery: smol::Timer::after(Duration::from_secs(1)),
526
19
            next_discovery_period: Duration::from_secs(1),
527
19
            incoming_connections,
528
19
        });
529
19
530
19
        // Build the final network service.
531
19
        let network_service = Arc::new(NetworkService {
532
19
            local_peer_id,
533
19
            chain_names,
534
19
            _jaeger_service: config.jaeger_service,
535
19
            to_background_tx: Mutex::new(to_background_tx),
536
19
        });
537
19
538
19
        // Adjust the receivers to keep the `network_service` alive.
539
19
        // TODO: no, hacky
540
19
        let receivers = event_receivers
541
19
            .into_iter()
542
19
            .map(|rx| {
543
                let mut network_service = Some(network_service.clone());
544
                rx.chain(smol::stream::poll_fn(move |_| {
545
                    drop(network_service.take());
546
                    Poll::Ready(None)
547
                }))
548
                .boxed()
549
19
            })
550
19
            .collect();
551
19
552
19
        let chain_ids = network_service.chain_names.keys().cloned().collect();
553
19
        Ok((network_service, chain_ids, receivers))
554
19
    }
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0B6_
555
556
    /// Returns the peer ID of the local node.
557
1
    pub fn local_peer_id(&self) -> &PeerId {
558
1
        &self.local_peer_id
559
1
    }
_RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService13local_peer_id
Line
Count
Source
557
1
    pub fn local_peer_id(&self) -> &PeerId {
558
1
        &self.local_peer_id
559
1
    }
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService13local_peer_id
560
561
    /// Returns the number of connections, both handshaking or established, both incoming and
562
    /// outgoing.
563
0
    pub async fn num_connections(&self) -> usize {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_connections
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_connections
564
0
        let (result_tx, result_rx) = oneshot::channel();
565
0
566
0
        let _ = self
567
0
            .to_background_tx
568
0
            .lock()
569
0
            .await
570
0
            .send(ToBackground::ForegroundGetNumConnections { result_tx })
571
0
            .await;
572
573
0
        result_rx.await.unwrap()
574
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_connections0B6_
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_connections0B6_
575
576
    /// Returns the number of peers we have a substream with,.
577
1
    pub async fn num_peers(&self, chain_id: ChainId) -> usize {
_RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService9num_peers
Line
Count
Source
577
1
    pub async fn num_peers(&self, chain_id: ChainId) -> usize {
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService9num_peers
578
1
        let (result_tx, result_rx) = oneshot::channel();
579
1
580
1
        let _ = self
581
1
            .to_background_tx
582
1
            .lock()
583
0
            .await
584
1
            .send(ToBackground::ForegroundGetNumPeers {
585
1
                chain_id,
586
1
                result_tx,
587
1
            })
588
0
            .await;
589
590
1
        result_rx.await.unwrap()
591
1
    }
_RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService9num_peers0B6_
Line
Count
Source
577
1
    pub async fn num_peers(&self, chain_id: ChainId) -> usize {
578
1
        let (result_tx, result_rx) = oneshot::channel();
579
1
580
1
        let _ = self
581
1
            .to_background_tx
582
1
            .lock()
583
0
            .await
584
1
            .send(ToBackground::ForegroundGetNumPeers {
585
1
                chain_id,
586
1
                result_tx,
587
1
            })
588
0
            .await;
589
590
1
        result_rx.await.unwrap()
591
1
    }
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService9num_peers0B6_
592
593
    /// Returns the number of peers we have a substream with, all chains added together.
594
0
    pub async fn num_total_peers(&self) -> usize {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_total_peers
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_total_peers
595
0
        let (result_tx, result_rx) = oneshot::channel();
596
0
597
0
        let _ = self
598
0
            .to_background_tx
599
0
            .lock()
600
0
            .await
601
0
            .send(ToBackground::ForegroundGetNumTotalPeers { result_tx })
602
0
            .await;
603
604
0
        result_rx.await.unwrap()
605
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_total_peers0B6_
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_total_peers0B6_
606
607
21
    pub async fn set_local_best_block(
608
21
        &self,
609
21
        chain_id: ChainId,
610
21
        best_hash: [u8; 32],
611
21
        best_number: u64,
612
21
    ) {
_RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService20set_local_best_block
Line
Count
Source
607
21
    pub async fn set_local_best_block(
608
21
        &self,
609
21
        chain_id: ChainId,
610
21
        best_hash: [u8; 32],
611
21
        best_number: u64,
612
21
    ) {
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService20set_local_best_block
613
21
        let _ = self
614
21
            .to_background_tx
615
21
            .lock()
616
0
            .await
617
21
            .send(ToBackground::ForegroundSetLocalBestBlock {
618
21
                chain_id,
619
21
                best_hash,
620
21
                best_number,
621
21
            })
622
0
            .await;
623
21
    }
_RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CsiLzmwikkc22_14json_rpc_basic
Line
Count
Source
612
2
    ) {
613
2
        let _ = self
614
2
            .to_background_tx
615
2
            .lock()
616
0
            .await
617
2
            .send(ToBackground::ForegroundSetLocalBestBlock {
618
2
                chain_id,
619
2
                best_hash,
620
2
                best_number,
621
2
            })
622
0
            .await;
623
2
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CscDgN54JpMGG_6author
_RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CsibGXYHQB8Ea_25json_rpc_general_requests
Line
Count
Source
612
19
    ) {
613
19
        let _ = self
614
19
            .to_background_tx
615
19
            .lock()
616
0
            .await
617
19
            .send(ToBackground::ForegroundSetLocalBestBlock {
618
19
                chain_id,
619
19
                best_hash,
620
19
                best_number,
621
19
            })
622
0
            .await;
623
19
    }
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0B6_
624
625
    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
626
    /// generated. Prevents a new gossip link with the same peer from being reopened for a
627
    /// little while.
628
    ///
629
    /// `reason` is a human-readable string printed in the logs.
630
    ///
631
    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
632
    /// reconnection was already happening as the call to this function is still being processed.
633
    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
634
    /// words, this function guarantees that we will be disconnected in the future rather than
635
    /// guarantees that we will disconnect.
636
0
    pub async fn ban_and_disconnect(
637
0
        &self,
638
0
        peer_id: PeerId,
639
0
        chain_id: ChainId,
640
0
        severity: BanSeverity,
641
0
        reason: &'static str,
642
0
    ) {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService18ban_and_disconnect
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService18ban_and_disconnect
643
0
        let _ = self
644
0
            .to_background_tx
645
0
            .lock()
646
0
            .await
647
0
            .send(ToBackground::ForegroundDisconnectAndBan {
648
0
                peer_id,
649
0
                chain_id,
650
0
                severity,
651
0
                reason,
652
0
            })
653
0
            .await;
654
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0B6_
655
656
0
    pub async fn send_block_announce(
657
0
        self: Arc<Self>,
658
0
        target: PeerId,
659
0
        chain_id: ChainId,
660
0
        scale_encoded_header: Vec<u8>,
661
0
        is_best: bool,
662
0
    ) -> Result<(), service::QueueNotificationError> {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService19send_block_announce
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService19send_block_announce
663
0
        let (result_tx, result_rx) = oneshot::channel();
664
0
665
0
        let _ = self
666
0
            .to_background_tx
667
0
            .lock()
668
0
            .await
669
0
            .send(ToBackground::ForegroundAnnounceBlock {
670
0
                target,
671
0
                chain_id,
672
0
                scale_encoded_header,
673
0
                is_best,
674
0
                result_tx,
675
0
            })
676
0
            .await;
677
678
0
        result_rx.await.unwrap()
679
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0B6_
680
681
    /// Sends a blocks request to the given peer.
682
    // TODO: more docs
683
    // TODO: proper error type
684
0
    pub async fn blocks_request(
685
0
        self: Arc<Self>,
686
0
        target: PeerId, // TODO: by value?
687
0
        chain_id: ChainId,
688
0
        config: codec::BlocksRequestConfig,
689
0
    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService14blocks_request
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService14blocks_request
690
0
        let (result_tx, result_rx) = oneshot::channel();
691
0
692
0
        let _ = self
693
0
            .to_background_tx
694
0
            .lock()
695
0
            .await
696
0
            .send(ToBackground::ForegroundBlocksRequest {
697
0
                target: target.clone(),
698
0
                chain_id,
699
0
                config,
700
0
                result_tx,
701
0
            })
702
0
            .await;
703
704
0
        result_rx.await.unwrap()
705
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0B6_
706
707
    /// Sends a warp sync request to the given peer.
708
    // TODO: more docs
709
    // TODO: proper error type
710
0
    pub async fn warp_sync_request(
711
0
        self: Arc<Self>,
712
0
        target: PeerId, // TODO: by value?
713
0
        chain_id: ChainId,
714
0
        begin_hash: [u8; 32],
715
0
    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService17warp_sync_request
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService17warp_sync_request
716
0
        let (result_tx, result_rx) = oneshot::channel();
717
0
718
0
        let _ = self
719
0
            .to_background_tx
720
0
            .lock()
721
0
            .await
722
0
            .send(ToBackground::ForegroundWarpSyncRequest {
723
0
                target: target.clone(),
724
0
                chain_id,
725
0
                begin_hash,
726
0
                result_tx,
727
0
            })
728
0
            .await;
729
730
0
        result_rx.await.unwrap()
731
0
    }
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0B6_
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0B6_
732
733
    /// Sends a storage proof request to the given peer.
734
    // TODO: more docs
735
    // TODO: proper error type
736
0
    pub async fn storage_request(
737
0
        self: Arc<Self>,
738
0
        target: PeerId, // TODO: by value?
739
0
        chain_id: ChainId,
740
0
        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
741
0
    ) -> Result<service::EncodedMerkleProof, ()> {
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestppEB5_
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECscDgN54JpMGG_6author
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestppEB5_
742
0
        // TODO: logs and jaeger integration
743
0
        let (result_tx, result_rx) = oneshot::channel();
744
0
745
0
        let _ = self
746
0
            .to_background_tx
747
0
            .lock()
748
0
            .await
749
0
            .send(ToBackground::ForegroundStorageProofRequest {
750
0
                target: target.clone(),
751
0
                chain_id,
752
0
                config: codec::StorageProofRequestConfig {
753
0
                    block_hash: config.block_hash,
754
0
                    keys: config
755
0
                        .keys
756
0
                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestppE00B9_
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestppE00B9_
757
0
                        .collect::<Vec<_>>()
758
0
                        .into_iter(),
759
0
                },
760
0
                result_tx,
761
0
            })
762
0
            .await;
763
764
0
        result_rx.await.unwrap()
765
0
    }
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestppE0B7_
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestppE0B7_
766
767
    /// Sends a call proof request to the given peer.
768
    // TODO: more docs
769
    // TODO: proper error type
770
0
    pub async fn call_proof_request(
771
0
        self: Arc<Self>,
772
0
        target: PeerId, // TODO: by value?
773
0
        chain_id: ChainId,
774
0
        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
775
0
    ) -> Result<service::EncodedMerkleProof, ()> {
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestppEB5_
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECscDgN54JpMGG_6author
Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestppEB5_
776
0
        // TODO: logs and jaeger integration
777
0
        let (result_tx, result_rx) = oneshot::channel();
778
0
779
0
        let _ = self
780
0
            .to_background_tx
781
0
            .lock()
782
0
            .await
783
0
            .send(ToBackground::ForegroundCallProofRequest {
784
0
                target: target.clone(),
785
0
                chain_id,
786
0
                config: codec::CallProofRequestConfig {
787
0
                    block_hash: config.block_hash,
788
0
                    method: config.method.into_owned().into(),
789
0
                    parameter_vectored: config
790
0
                        .parameter_vectored
791
0
                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestppE00B9_
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestppE00B9_
792
0
                        .collect::<Vec<_>>()
793
0
                        .into_iter(),
794
0
                },
795
0
                result_tx,
796
0
            })
797
0
            .await;
798
799
0
        result_rx.await.unwrap()
800
0
    }
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CsiLzmwikkc22_14json_rpc_basic
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestppE0B7_
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CscDgN54JpMGG_6author
Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CsibGXYHQB8Ea_25json_rpc_general_requests
Unexecuted instantiation: _RNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestppE0B7_
801
}
802
803
/// Error when initializing the network service.
804
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXs8_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs8_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
805
pub enum InitError {
806
    /// I/O error when initializing a listener.
807
    #[display(fmt = "I/O error when creating listener for {_0}: {_1}")]
808
    ListenerIo(Multiaddr, io::Error),
809
    /// A listening address passed through the configuration isn't valid.
810
    #[display(fmt = "A listening address passed through the configuration isn't valid: {_0}")]
811
    BadListenMultiaddr(Multiaddr),
812
}
813
814
/// Error returned by [`NetworkService::blocks_request`].
815
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsa_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_18BlocksRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsa_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_18BlocksRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
816
pub enum BlocksRequestError {
817
    /// No established connection with the target.
818
    NoConnection,
819
    /// Error during the request.
820
    #[display(fmt = "{_0}")]
821
    Request(service::BlocksRequestError),
822
}
823
824
/// Error returned by [`NetworkService::warp_sync_request`].
825
0
#[derive(Debug, derive_more::Display)]
Unexecuted instantiation: _RNvXsc_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_20WarpSyncRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXsc_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_20WarpSyncRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
826
pub enum WarpSyncRequestError {
827
    /// No established connection with the target.
828
    NoConnection,
829
    /// Error during the request.
830
    #[display(fmt = "{_0}")]
831
    Request(service::GrandpaWarpSyncRequestError),
832
}
833
834
21
fn run(mut inner: Inner) {
835
21
    // This function is a small hack because I didn't find a better way to store the executor
836
21
    // within `Inner` while at the same time spawning the `Inner` using said executor.
837
21
    let mut actual_executor = mem::replace(&mut inner.tasks_executor, Box::new(|_| 
unreachable!()0
));
Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3run0B5_
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3run0B5_
838
21
    let (tx, rx) = oneshot::channel();
839
21
    actual_executor(Box::pin(async move {
840
21
        let actual_executor = rx.
await0
.unwrap();
841
21
        inner.tasks_executor = actual_executor;
842
122
        background_task(inner).await;
843
21
    
}0
));
_RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3runs_0B5_
Line
Count
Source
839
21
    actual_executor(Box::pin(async move {
840
21
        let actual_executor = rx.
await0
.unwrap();
841
21
        inner.tasks_executor = actual_executor;
842
122
        background_task(inner).await;
843
0
    }));
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3runs_0B5_
844
21
    tx.send(actual_executor).unwrap_or_else(|_| 
panic!()0
);
Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3runs0_0B5_
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3runs0_0B5_
845
21
}
_RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3run
Line
Count
Source
834
21
fn run(mut inner: Inner) {
835
21
    // This function is a small hack because I didn't find a better way to store the executor
836
21
    // within `Inner` while at the same time spawning the `Inner` using said executor.
837
21
    let mut actual_executor = mem::replace(&mut inner.tasks_executor, Box::new(|_| unreachable!()));
838
21
    let (tx, rx) = oneshot::channel();
839
21
    actual_executor(Box::pin(async move {
840
        let actual_executor = rx.await.unwrap();
841
        inner.tasks_executor = actual_executor;
842
        background_task(inner).await;
843
21
    }));
844
21
    tx.send(actual_executor).unwrap_or_else(|_| panic!());
845
21
}
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3run
846
847
21
async fn background_task(mut inner: Inner) {
_RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task
Line
Count
Source
847
21
async fn background_task(mut inner: Inner) {
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task
848
    loop {
849
        enum WakeUpReason {
850
            IncomingConnection {
851
                socket: TcpStream,
852
                socket_addr: SocketAddr,
853
            },
854
            NetworkEvent(service::Event<channel::Sender<service::CoordinatorToConnection>>),
855
            Message(ToBackground),
856
            ForegroundClosed,
857
            FromConnectionTask {
858
                connection_id: service::ConnectionId,
859
                // TODO: this Option is weird
860
                message: Option<service::ConnectionToCoordinator>,
861
            },
862
            EventSendersReady,
863
            CanAssignSlot(PeerId, ChainId),
864
            CanStartConnect(PeerId),
865
            CanOpenGossip(PeerId, ChainId),
866
            StartKademliaDiscoveries,
867
            MessageToConnection {
868
                connection_id: service::ConnectionId,
869
                message: service::CoordinatorToConnection,
870
            },
871
        }
872
873
147
        let 
wake_up_reason = 126
async {
874
147
            inner
875
147
                .to_background_rx
876
147
                .next()
877
122
                .await
878
22
                .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
879
22
        }
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task00B7_
Line
Count
Source
873
147
        let wake_up_reason = async {
874
147
            inner
875
147
                .to_background_rx
876
147
                .next()
877
122
                .await
878
22
                .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
879
22
        }
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task00B7_
880
147
        .or({
881
147
            let event_senders_ready = 
matches!0
(inner.event_senders, either::Left(_));
882
147
            let event_pending_send = &inner.event_pending_send;
883
147
            let network = &mut inner.network;
884
147
            let peering_strategy = &mut inner.peering_strategy;
885
147
            let num_pending_out_attempts = &inner.num_pending_out_attempts;
886
147
            async move {
887
147
                if let Some(
event0
) = (event_senders_ready && event_pending_send.is_none())
888
147
                    .then(|| network.next_event())
_RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_00B9_
Line
Count
Source
888
147
                    .then(|| network.next_event())
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_00B9_
889
147
                    .flatten()
890
                {
891
0
                    WakeUpReason::NetworkEvent(event)
892
147
                } else if let Some((
connection_id, message0
)) = network.pull_message_to_connection()
893
                {
894
0
                    WakeUpReason::MessageToConnection {
895
0
                        connection_id,
896
0
                        message,
897
0
                    }
898
147
                } else if let Some((
peer_id, chain_id0
)) = network
899
147
                    .connected_unopened_gossip_desired()
900
147
                    .next()
901
147
                    .map(|(peer_id, chain_id, _)| 
(peer_id.clone(), chain_id)0
)
Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s_0B9_
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s_0B9_
902
                {
903
0
                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
904
147
                } else if let Some(
peer_id0
) = (*num_pending_out_attempts < 16)
905
147
                    .then(|| network.unconnected_desired().next().cloned())
_RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s0_0B9_
Line
Count
Source
905
147
                    .then(|| network.unconnected_desired().next().cloned())
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s0_0B9_
906
147
                    .flatten()
907
                {
908
0
                    WakeUpReason::CanStartConnect(peer_id)
909
                } else {
910
147
                    'search: loop {
911
147
                        let mut earlier_unban = None;
912
913
147
                        for chain_id in network.chains().collect::<Vec<_>>() {
914
147
                            if network.gossip_desired_num(
915
147
                                chain_id,
916
147
                                service::GossipKind::ConsensusTransactions,
917
147
                            ) >= network[chain_id].max_slots
918
                            {
919
0
                                continue;
920
147
                            }
921
147
922
147
                            match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now())
923
                            {
924
0
                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
925
0
                                    break 'search WakeUpReason::CanAssignSlot(
926
0
                                        peer_id.clone(),
927
0
                                        chain_id,
928
0
                                    )
929
                                }
930
                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
931
0
                                    next_unban,
932
0
                                } => {
933
0
                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s1_0B9_
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s1_0B9_
934
0
                                        earlier_unban = Some(next_unban.clone());
935
0
                                    }
936
                                }
937
147
                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
938
                            }
939
                        }
940
941
147
                        if let Some(
earlier_unban0
) = earlier_unban {
942
0
                            smol::Timer::at(earlier_unban).await;
943
                        } else {
944
147
                            future::pending::<()>().
await100
;
945
                        }
946
                    }
947
                }
948
0
            }
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0B7_
Line
Count
Source
886
147
            async move {
887
147
                if let Some(
event0
) = (event_senders_ready && event_pending_send.is_none())
888
147
                    .then(|| network.next_event())
889
147
                    .flatten()
890
                {
891
0
                    WakeUpReason::NetworkEvent(event)
892
147
                } else if let Some((
connection_id, message0
)) = network.pull_message_to_connection()
893
                {
894
0
                    WakeUpReason::MessageToConnection {
895
0
                        connection_id,
896
0
                        message,
897
0
                    }
898
147
                } else if let Some((
peer_id, chain_id0
)) = network
899
147
                    .connected_unopened_gossip_desired()
900
147
                    .next()
901
147
                    .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id))
902
                {
903
0
                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
904
147
                } else if let Some(
peer_id0
) = (*num_pending_out_attempts < 16)
905
147
                    .then(|| network.unconnected_desired().next().cloned())
906
147
                    .flatten()
907
                {
908
0
                    WakeUpReason::CanStartConnect(peer_id)
909
                } else {
910
147
                    'search: loop {
911
147
                        let mut earlier_unban = None;
912
913
147
                        for chain_id in network.chains().collect::<Vec<_>>() {
914
147
                            if network.gossip_desired_num(
915
147
                                chain_id,
916
147
                                service::GossipKind::ConsensusTransactions,
917
147
                            ) >= network[chain_id].max_slots
918
                            {
919
0
                                continue;
920
147
                            }
921
147
922
147
                            match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now())
923
                            {
924
0
                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
925
0
                                    break 'search WakeUpReason::CanAssignSlot(
926
0
                                        peer_id.clone(),
927
0
                                        chain_id,
928
0
                                    )
929
                                }
930
                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
931
0
                                    next_unban,
932
0
                                } => {
933
0
                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
934
0
                                        earlier_unban = Some(next_unban.clone());
935
0
                                    }
936
                                }
937
147
                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
938
                            }
939
                        }
940
941
147
                        if let Some(
earlier_unban0
) = earlier_unban {
942
0
                            smol::Timer::at(earlier_unban).await;
943
                        } else {
944
147
                            future::pending::<()>().
await100
;
945
                        }
946
                    }
947
                }
948
0
            }
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0B7_
949
        })
950
147
        .or(async {
951
147
            if let either::Right(
sending0
) = &mut inner.event_senders {
952
0
                let event_senders = sending.await;
953
0
                inner.event_senders = either::Left(event_senders);
954
0
                WakeUpReason::EventSendersReady
955
147
            } else if inner.event_pending_send.is_some() {
956
0
                WakeUpReason::EventSendersReady
957
            } else {
958
147
                future::pending().
await100
959
            }
960
147
        
}0
)
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s0_0B7_
Line
Count
Source
950
147
        .or(async {
951
147
            if let either::Right(
sending0
) = &mut inner.event_senders {
952
0
                let event_senders = sending.await;
953
0
                inner.event_senders = either::Left(event_senders);
954
0
                WakeUpReason::EventSendersReady
955
147
            } else if inner.event_pending_send.is_some() {
956
0
                WakeUpReason::EventSendersReady
957
            } else {
958
147
                future::pending().
await100
959
            }
960
0
        })
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s0_0B7_
961
147
        .or(async {
962
147
            (&mut inner.next_discovery).
await100
;
963
104
            inner.next_discovery = smol::Timer::after(inner.next_discovery_period);
964
104
            inner.next_discovery_period =
965
104
                cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120));
966
104
            WakeUpReason::StartKademliaDiscoveries
967
147
        })
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s1_0B7_
Line
Count
Source
961
147
        .or(async {
962
147
            (&mut inner.next_discovery).
await100
;
963
104
            inner.next_discovery = smol::Timer::after(inner.next_discovery_period);
964
104
            inner.next_discovery_period =
965
104
                cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120));
966
104
            WakeUpReason::StartKademliaDiscoveries
967
104
        })
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s1_0B7_
968
147
        .or(async 
{143
969
143
            let (
connection_id, message0
) = inner.from_connections_rx.next().
await0
.unwrap();
970
0
            WakeUpReason::FromConnectionTask {
971
0
                connection_id,
972
0
                message,
973
0
            }
974
147
        })
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s2_0B7_
Line
Count
Source
968
143
        .or(async {
969
143
            let (
connection_id, message0
) = inner.from_connections_rx.next().
await0
.unwrap();
970
0
            WakeUpReason::FromConnectionTask {
971
0
                connection_id,
972
0
                message,
973
0
            }
974
0
        })
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s2_0B7_
975
147
        .or(async 
{143
976
143
            let Some((
socket, socket_addr0
)) = inner.incoming_connections.next().
await0
else {
977
143
                future::pending().
await0
978
            };
979
0
            WakeUpReason::IncomingConnection {
980
0
                socket,
981
0
                socket_addr,
982
0
            }
983
147
        })
_RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s3_0B7_
Line
Count
Source
975
143
        .or(async {
976
143
            let Some((
socket, socket_addr0
)) = inner.incoming_connections.next().
await0
else {
977
143
                future::pending().
await0
978
            };
979
0
            WakeUpReason::IncomingConnection {
980
0
                socket,
981
0
                socket_addr,
982
0
            }
983
0
        })
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s3_0B7_
984
122
        .await;
985
986
0
        match wake_up_reason {
987
            WakeUpReason::MessageToConnection {
988
0
                connection_id,
989
0
                message,
990
0
            } => {
991
0
                // Note that it is critical for the sending to not take too long here, in order to
992
0
                // not block the process of the network service.
993
0
                // In particular, if sending the message to the connection is blocked due to
994
0
                // sending a message on the connection-to-coordinator channel, this will result
995
0
                // in a deadlock.
996
0
                // For this reason, the connection task is always ready to immediately accept a
997
0
                // message on the coordinator-to-connection channel.
998
0
                inner.network[connection_id].send(message).await.unwrap();
999
            }
1000
1001
            WakeUpReason::FromConnectionTask {
1002
0
                connection_id,
1003
0
                message,
1004
            } => {
1005
0
                if let Some(message) = message {
1006
0
                    inner
1007
0
                        .network
1008
0
                        .inject_connection_message(connection_id, message);
1009
0
                }
1010
            }
1011
1012
            WakeUpReason::IncomingConnection {
1013
0
                socket,
1014
0
                socket_addr,
1015
0
            } => {
1016
0
                // The Nagle algorithm, implemented in the kernel, consists in buffering the
1017
0
                // data to be sent out and waiting a bit before actually sending it out, in
1018
0
                // order to potentially merge multiple writes in a row into one packet. In
1019
0
                // the implementation below, it is guaranteed that the buffer in `WithBuffers`
1020
0
                // is filled with as much data as possible before the operating system gets
1021
0
                // involved. As such, we disable the Nagle algorithm, in order to avoid adding
1022
0
                // an artificial delay to all sends.
1023
0
                let _ = socket.set_nodelay(true);
1024
1025
0
                let multiaddr = [
1026
0
                    match socket_addr.ip() {
1027
0
                        IpAddr::V4(ip) => Protocol::<&[u8]>::Ip4(ip.octets()),
1028
0
                        IpAddr::V6(ip) => Protocol::Ip6(ip.octets()),
1029
                    },
1030
0
                    Protocol::Tcp(socket_addr.port()),
1031
0
                ]
1032
0
                .into_iter()
1033
0
                .collect::<Multiaddr>();
1034
0
1035
0
                inner.log_callback.log(
1036
0
                    LogLevel::Debug,
1037
0
                    format!("incoming-connection; multiaddr={}", multiaddr),
1038
0
                );
1039
0
1040
0
                let (tx, rx) = channel::bounded(16); // TODO: ?!
1041
0
1042
0
                let (connection_id, connection_task) = inner.network.add_single_stream_connection(
1043
0
                    Instant::now(),
1044
0
                    service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
1045
0
                        is_initiator: false,
1046
0
                        noise_key: &inner.noise_key,
1047
0
                    },
1048
0
                    multiaddr.clone().into_bytes(),
1049
0
                    None,
1050
0
                    tx,
1051
0
                );
1052
0
1053
0
                (inner.tasks_executor)(Box::pin(tasks::connection_task(
1054
0
                    inner.log_callback.clone(),
1055
0
                    multiaddr.to_string(),
1056
0
                    async move { Ok(socket) },
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s4_0B7_
1057
0
                    connection_id,
1058
0
                    connection_task,
1059
0
                    rx,
1060
0
                    inner.from_connections_tx.clone(),
1061
0
                )));
1062
            }
1063
1064
            WakeUpReason::StartKademliaDiscoveries => {
1065
104
                for chain_id in inner.network.chains().collect::<Vec<_>>() {
1066
104
                    let random_peer_id =
1067
104
                        PeerId::from_public_key(&peer_id::PublicKey::Ed25519(rand::random()));
1068
104
1069
104
                    // TODO: select target closest to the random peer instead
1070
104
                    let target = inner
1071
104
                        .network
1072
104
                        .gossip_connected_peers(
1073
104
                            chain_id,
1074
104
                            service::GossipKind::ConsensusTransactions,
1075
104
                        )
1076
104
                        .next()
1077
104
                        .cloned();
1078
1079
104
                    if let Some(
target0
) = target {
1080
0
                        match inner.network.start_kademlia_find_node_request(
1081
0
                            &target,
1082
0
                            chain_id,
1083
0
                            &random_peer_id,
1084
0
                            Duration::from_secs(20),
1085
0
                        ) {
1086
0
                            Ok(_) => {}
1087
0
                            Err(service::StartRequestError::NoConnection) => unreachable!(),
1088
                        };
1089
104
                    } else {
1090
104
                        // TODO: log message
1091
104
                    }
1092
                }
1093
            }
1094
1095
            WakeUpReason::ForegroundClosed => {
1096
                // TODO: do a clean shutdown of all the connections
1097
0
                return;
1098
            }
1099
1100
            WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan {
1101
0
                peer_id,
1102
0
                chain_id,
1103
0
                severity,
1104
0
                reason,
1105
0
            }) => {
1106
0
                // Note that peer doesn't necessarily have an out slot.
1107
0
                inner.peering_strategy.unassign_slot_and_ban(
1108
0
                    &chain_id,
1109
0
                    &peer_id,
1110
0
                    Instant::now()
1111
0
                        + Duration::from_secs(match severity {
1112
0
                            BanSeverity::Low => 10,
1113
0
                            BanSeverity::High => 40,
1114
                        }),
1115
                );
1116
0
                if inner.network.gossip_remove_desired(
1117
0
                    chain_id,
1118
0
                    &peer_id,
1119
0
                    service::GossipKind::ConsensusTransactions,
1120
0
                ) {
1121
0
                    inner.log_callback.log(
1122
0
                        LogLevel::Debug,
1123
0
                        format!(
1124
0
                            "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}",
1125
0
                            peer_id, inner.network[chain_id].log_name, reason
1126
0
                        ),
1127
0
                    );
1128
0
                }
1129
1130
0
                if inner.network.gossip_is_connected(
1131
0
                    chain_id,
1132
0
                    &peer_id,
1133
0
                    service::GossipKind::ConsensusTransactions,
1134
0
                ) {
1135
0
                    let _close_result = inner.network.gossip_close(
1136
0
                        chain_id,
1137
0
                        &peer_id,
1138
0
                        service::GossipKind::ConsensusTransactions,
1139
0
                    );
1140
0
                    debug_assert!(_close_result.is_ok());
1141
1142
0
                    inner.log_callback.log(
1143
0
                        LogLevel::Debug,
1144
0
                        format!(
1145
0
                            "chain-disconnected; peer_id={}; chain={}",
1146
0
                            peer_id, inner.network[chain_id].log_name
1147
0
                        ),
1148
0
                    );
1149
0
1150
0
                    debug_assert!(inner.event_pending_send.is_none());
1151
0
                    inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id });
1152
0
                }
1153
            }
1154
1155
            WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock {
1156
0
                target,
1157
0
                chain_id,
1158
0
                scale_encoded_header,
1159
0
                is_best,
1160
0
                result_tx,
1161
0
            }) => {
1162
0
                let _ = result_tx.send(inner.network.gossip_send_block_announce(
1163
0
                    &target,
1164
0
                    chain_id,
1165
0
                    &scale_encoded_header,
1166
0
                    is_best,
1167
0
                ));
1168
0
            }
1169
            WakeUpReason::Message(ToBackground::ForegroundSetLocalBestBlock {
1170
21
                chain_id,
1171
21
                best_hash,
1172
21
                best_number,
1173
21
            }) => {
1174
21
                inner
1175
21
                    .network
1176
21
                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1177
21
            }
1178
            WakeUpReason::Message(ToBackground::ForegroundBlocksRequest {
1179
0
                target,
1180
0
                chain_id,
1181
0
                config,
1182
0
                result_tx,
1183
0
            }) => {
1184
0
                inner.log_callback.log(
1185
0
                    LogLevel::Debug,
1186
0
                    format!(
1187
0
                        "blocks-request-start; peer_id={}; chain={}; start={}; desired_count={}; direction={}",
1188
0
                        target,
1189
0
                        inner.network[chain_id].log_name,
1190
0
                        match &config.start {
1191
0
                            codec::BlocksRequestConfigStart::Hash(h) => either::Left(HashDisplay(h)),
1192
0
                            codec::BlocksRequestConfigStart::Number(n) => either::Right(n),
1193
                        },
1194
                        config.desired_count,
1195
0
                        match config.direction {
1196
0
                            codec::BlocksRequestDirection::Ascending => "ascending",
1197
0
                            codec::BlocksRequestDirection::Descending => "descending",
1198
                        },
1199
                    ),
1200
                );
1201
1202
0
                match inner.network.start_blocks_request(
1203
0
                    &target,
1204
0
                    chain_id,
1205
0
                    config,
1206
0
                    Duration::from_secs(12),
1207
0
                ) {
1208
0
                    Ok(request_id) => {
1209
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1210
0
                        inner.blocks_requests.insert(request_id, result_tx);
1211
0
                    }
1212
0
                    Err(service::StartRequestError::NoConnection) => {
1213
0
                        inner.log_callback.log(
1214
0
                            LogLevel::Debug,
1215
0
                            format!(
1216
0
                                "blocks-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1217
0
                                target,
1218
0
                                inner.network[chain_id].log_name,
1219
0
                            ),
1220
0
                        );
1221
0
                        let _ = result_tx.send(Err(BlocksRequestError::NoConnection));
1222
0
                    }
1223
                }
1224
            }
1225
            WakeUpReason::Message(ToBackground::ForegroundWarpSyncRequest {
1226
0
                target,
1227
0
                chain_id,
1228
0
                begin_hash,
1229
0
                result_tx,
1230
0
            }) => {
1231
0
                inner.log_callback.log(
1232
0
                    LogLevel::Debug,
1233
0
                    format!(
1234
0
                        "warp-sync-request-start; peer_id={}; chain={}; begin-hash={}",
1235
0
                        target,
1236
0
                        inner.network[chain_id].log_name,
1237
0
                        HashDisplay(&begin_hash)
1238
0
                    ),
1239
0
                );
1240
0
1241
0
                match inner.network.start_grandpa_warp_sync_request(
1242
0
                    &target,
1243
0
                    chain_id,
1244
0
                    begin_hash,
1245
0
                    Duration::from_secs(12),
1246
0
                ) {
1247
0
                    Ok(request_id) => {
1248
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1249
0
                        inner.warp_sync_requests.insert(request_id, result_tx);
1250
0
                    }
1251
0
                    Err(service::StartRequestError::NoConnection) => {
1252
0
                        inner.log_callback.log(
1253
0
                            LogLevel::Debug,
1254
0
                            format!(
1255
0
                                "warp-sync-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1256
0
                                target,
1257
0
                                inner.network[chain_id].log_name,
1258
0
                            ),
1259
0
                        );
1260
0
                        let _ = result_tx.send(Err(WarpSyncRequestError::NoConnection));
1261
0
                    }
1262
                }
1263
            }
1264
            WakeUpReason::Message(ToBackground::ForegroundStorageProofRequest {
1265
0
                target,
1266
0
                chain_id,
1267
0
                config,
1268
0
                result_tx,
1269
0
            }) => {
1270
0
                inner.log_callback.log(
1271
0
                    LogLevel::Debug,
1272
0
                    format!(
1273
0
                        "storage-request-start; peer_id={}; chain={}; block-hash={}; num-keys={}",
1274
0
                        target,
1275
0
                        inner.network[chain_id].log_name,
1276
0
                        HashDisplay(&config.block_hash),
1277
0
                        config.keys.len()
1278
0
                    ),
1279
0
                );
1280
0
1281
0
                match inner.network.start_storage_proof_request(
1282
0
                    &target,
1283
0
                    chain_id,
1284
0
                    config,
1285
0
                    Duration::from_secs(12),
1286
0
                ) {
1287
0
                    Ok(request_id) => {
1288
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1289
0
                        inner.storage_requests.insert(request_id, result_tx);
1290
0
                    }
1291
0
                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1292
0
                        inner.log_callback.log(
1293
0
                            LogLevel::Debug,
1294
0
                            format!(
1295
0
                                "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1296
0
                                target,
1297
0
                                inner.network[chain_id].log_name,
1298
0
                            ),
1299
0
                        );
1300
0
                        let _ = result_tx.send(Err(()));
1301
0
                    }
1302
0
                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1303
0
                        inner.log_callback.log(
1304
0
                            LogLevel::Debug,
1305
0
                            format!(
1306
0
                                "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large",
1307
0
                                target,
1308
0
                                inner.network[chain_id].log_name,
1309
0
                            ),
1310
0
                        );
1311
0
                        let _ = result_tx.send(Err(()));
1312
0
                    }
1313
                }
1314
            }
1315
            WakeUpReason::Message(ToBackground::ForegroundCallProofRequest {
1316
0
                target,
1317
0
                chain_id,
1318
0
                config,
1319
0
                result_tx,
1320
0
            }) => {
1321
0
                inner.log_callback.log(
1322
0
                    LogLevel::Debug,
1323
0
                    format!(
1324
0
                        "call-proof-request-start; peer_id={}; chain={}; block-hash={}; function={}",
1325
0
                        target,
1326
0
                        inner.network[chain_id].log_name,
1327
0
                        HashDisplay(&config.block_hash),
1328
0
                        config.method
1329
0
                    ),
1330
0
                );
1331
0
1332
0
                match inner.network.start_call_proof_request(
1333
0
                    &target,
1334
0
                    chain_id,
1335
0
                    config,
1336
0
                    Duration::from_secs(12),
1337
0
                ) {
1338
0
                    Ok(request_id) => {
1339
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1340
0
                        inner.call_proof_requests.insert(request_id, result_tx);
1341
0
                    }
1342
0
                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1343
0
                        inner.log_callback.log(
1344
0
                            LogLevel::Debug,
1345
0
                            format!(
1346
0
                                "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1347
0
                                target,
1348
0
                                inner.network[chain_id].log_name,
1349
0
                            ),
1350
0
                        );
1351
0
                        let _ = result_tx.send(Err(()));
1352
0
                    }
1353
0
                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1354
0
                        inner.log_callback.log(
1355
0
                            LogLevel::Debug,
1356
0
                            format!(
1357
0
                                "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large",
1358
0
                                target,
1359
0
                                inner.network[chain_id].log_name,
1360
0
                            ),
1361
0
                        );
1362
0
                        let _ = result_tx.send(Err(()));
1363
0
                    }
1364
                }
1365
            }
1366
0
            WakeUpReason::Message(ToBackground::ForegroundGetNumConnections { result_tx }) => {
1367
0
                let _ = result_tx.send(inner.network.num_connections());
1368
0
            }
1369
            WakeUpReason::Message(ToBackground::ForegroundGetNumPeers {
1370
1
                chain_id,
1371
1
                result_tx,
1372
1
            }) => {
1373
1
                // TODO: optimize?
1374
1
                let _ = result_tx.send(
1375
1
                    inner
1376
1
                        .network
1377
1
                        .gossip_connected_peers(
1378
1
                            chain_id,
1379
1
                            service::GossipKind::ConsensusTransactions,
1380
1
                        )
1381
1
                        .count(),
1382
1
                );
1383
1
            }
1384
0
            WakeUpReason::Message(ToBackground::ForegroundGetNumTotalPeers { result_tx }) => {
1385
0
                // TODO: optimize?
1386
0
                let total = inner
1387
0
                    .network
1388
0
                    .chains()
1389
0
                    .map(|chain_id| {
1390
0
                        inner
1391
0
                            .network
1392
0
                            .gossip_connected_peers(
1393
0
                                chain_id,
1394
0
                                service::GossipKind::ConsensusTransactions,
1395
0
                            )
1396
0
                            .count()
1397
0
                    })
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s5_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s5_0B7_
1398
0
                    .sum();
1399
0
                let _ = result_tx.send(total);
1400
0
            }
1401
1402
            WakeUpReason::EventSendersReady => {
1403
                // Dispatch the pending event, if any, to the various senders.
1404
1405
                // We made sure that the senders were ready before generating an event.
1406
0
                let either::Left(event_senders) = &mut inner.event_senders else {
1407
0
                    unreachable!()
1408
                };
1409
1410
0
                if let Some(event_to_dispatch) = inner.event_pending_send.take() {
1411
0
                    let mut event_senders = mem::take(event_senders);
1412
0
                    inner.event_senders = either::Right(Box::pin(async move {
1413
                        // Elements in `event_senders` are removed one by one and inserted
1414
                        // back if the channel is still open.
1415
0
                        for index in (0..event_senders.len()).rev() {
1416
0
                            let event_sender = event_senders.swap_remove(index);
1417
0
                            if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1418
0
                                continue;
1419
0
                            }
1420
0
1421
0
                            event_senders.push(event_sender);
1422
                        }
1423
0
                        event_senders
1424
0
                    }));
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s6_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s6_0B7_
1425
0
                }
1426
            }
1427
1428
            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1429
0
                id,
1430
0
                expected_peer_id,
1431
0
                peer_id,
1432
0
                ..
1433
0
            }) => {
1434
0
                inner.num_pending_out_attempts -= 1;
1435
0
1436
0
                let remote_addr =
1437
0
                    Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned())
1438
0
                        .unwrap(); // TODO: review this unwrap
1439
0
                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s7_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s7_0B7_
1440
                {
1441
0
                    inner
1442
0
                    .log_callback
1443
0
                    .log(LogLevel::Debug, format!("connected-peer-id-mismatch; expected_peer_id={}; actual_peer_id={}; address={}", expected_peer_id, peer_id, remote_addr));
1444
0
1445
0
                    let _was_in = inner
1446
0
                        .peering_strategy
1447
0
                        .decrease_address_connections_and_remove_if_zero(
1448
0
                            expected_peer_id,
1449
0
                            remote_addr.as_ref(),
1450
0
                        );
1451
0
                    debug_assert!(_was_in.is_ok());
1452
                    if let basic_peering_strategy::InsertAddressConnectionsResult::Inserted {
1453
0
                        address_removed: Some(addr_rm),
1454
0
                    } = inner.peering_strategy.increase_address_connections(
1455
0
                        &peer_id,
1456
0
                        remote_addr.into_bytes().to_owned(),
1457
0
                        10, // TODO: constant
1458
0
                    ) {
1459
0
                        let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap();
1460
0
                        inner.log_callback.log(
1461
0
                            LogLevel::Debug,
1462
0
                            format!("address-purged; peer_id={}; address={}", peer_id, addr_rm),
1463
0
                        );
1464
0
                    }
1465
0
                } else {
1466
0
                    inner
1467
0
                        .log_callback
1468
0
                        .log(LogLevel::Debug, format!("connected; peer_id={}", peer_id));
1469
0
                }
1470
            }
1471
1472
            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1473
                expected_peer_id: Some(_),
1474
                ..
1475
            })
1476
            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1477
0
                let (address, peer_id, handshake_finished) = match wake_up_reason {
1478
                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1479
0
                        address,
1480
0
                        expected_peer_id: Some(peer_id),
1481
0
                        ..
1482
0
                    }) => (address, peer_id, false),
1483
                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
1484
0
                        address,
1485
0
                        peer_id,
1486
0
                        ..
1487
0
                    }) => (address, peer_id, true),
1488
0
                    _ => unreachable!(),
1489
                };
1490
1491
0
                if !handshake_finished {
1492
0
                    inner.num_pending_out_attempts -= 1;
1493
0
                }
1494
1495
0
                inner
1496
0
                    .peering_strategy
1497
0
                    .decrease_address_connections(&peer_id, &address)
1498
0
                    .unwrap();
1499
0
                let address = Multiaddr::from_bytes(&address).unwrap();
1500
0
                inner.log_callback.log(
1501
0
                    LogLevel::Debug,
1502
0
                    format!(
1503
0
                        "disconnected; handshake-finished={}; peer_id={}; address={}",
1504
0
                        handshake_finished, peer_id, address
1505
0
                    ),
1506
0
                );
1507
0
1508
0
                // Ban the peer in order to avoid trying over and over again the same address(es).
1509
0
                // Even if the handshake was finished, it is possible that the peer simply shuts
1510
0
                // down connections immediately after it has been opened, hence the ban.
1511
0
                // Due to race conditions and peerid mismatches, it is possible that there is
1512
0
                // another existing connection or connection attempt with that same peer. However,
1513
0
                // it is not possible to be sure that we will reach 0 connections or connection
1514
0
                // attempts, and thus we ban the peer every time.
1515
0
                let ban_duration = Duration::from_secs(5);
1516
0
                inner.network.gossip_remove_desired_all(
1517
0
                    &peer_id,
1518
0
                    service::GossipKind::ConsensusTransactions,
1519
0
                );
1520
0
                for (&chain_id, what_happened) in inner
1521
0
                    .peering_strategy
1522
0
                    .unassign_slots_and_ban(&peer_id, Instant::now() + ban_duration)
1523
                {
1524
0
                    if matches!(
1525
0
                        what_happened,
1526
                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1527
0
                    ) {
1528
0
                        inner.log_callback.log(
1529
0
                            LogLevel::Debug,
1530
0
                            format!(
1531
0
                                "slot-unassigned; peer_id={}; chain={}; reason=disconnected",
1532
0
                                peer_id, inner.network[chain_id].log_name
1533
0
                            ),
1534
0
                        );
1535
0
                    }
1536
                }
1537
            }
1538
1539
            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1540
                expected_peer_id: None,
1541
0
                address,
1542
0
                ..
1543
0
            }) => {
1544
0
                inner.log_callback.log(
1545
0
                    LogLevel::Debug,
1546
0
                    format!(
1547
0
                        "disconnected; handshake-finished=false; address={}",
1548
0
                        Multiaddr::from_bytes(&address).unwrap()
1549
0
                    ),
1550
0
                );
1551
0
            }
1552
1553
            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1554
0
                id,
1555
0
                peer_id,
1556
0
                ping_time,
1557
0
            }) => {
1558
0
                let remote_addr =
1559
0
                    Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned())
1560
0
                        .unwrap(); // TODO: review this unwrap
1561
0
                inner.log_callback.log(
1562
0
                    LogLevel::Debug,
1563
0
                    format!("ping; peer_id={peer_id}; remote_addr={remote_addr}); ping-time={ping_time:?}"),
1564
0
                );
1565
0
            }
1566
1567
            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1568
0
                chain_id,
1569
0
                peer_id,
1570
0
                announce,
1571
0
            }) => {
1572
0
                let decoded = announce.decode();
1573
0
                let header_hash =
1574
0
                    header::hash_from_scale_encoded_header(decoded.scale_encoded_header);
1575
0
                match header::decode(
1576
0
                    decoded.scale_encoded_header,
1577
0
                    inner.network.block_number_bytes(chain_id),
1578
0
                ) {
1579
0
                    Ok(decoded_header) => {
1580
0
                        let mut _jaeger_span = inner.jaeger_service.block_announce_receive_span(
1581
0
                            &inner.local_peer_id,
1582
0
                            &peer_id,
1583
0
                            decoded_header.number,
1584
0
                            &decoded_header.hash(inner.network.block_number_bytes(chain_id)),
1585
0
                        );
1586
0
1587
0
                        inner.log_callback.log(LogLevel::Debug, format!(
1588
0
                            "block-announce; peer_id={}; chain={}; hash={}; number={}; is_best={:?}",
1589
0
                            peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded_header.number, decoded.is_best
1590
0
                        ));
1591
0
1592
0
                        debug_assert!(inner.event_pending_send.is_none());
1593
0
                        inner.event_pending_send = Some(Event::BlockAnnounce {
1594
0
                            chain_id,
1595
0
                            peer_id,
1596
0
                            is_best: decoded.is_best,
1597
0
                            scale_encoded_header: decoded.scale_encoded_header.to_owned(), // TODO: somewhat wasteful to copy here, could pass the entire announce
1598
0
                        });
1599
                    }
1600
0
                    Err(error) => {
1601
0
                        inner.log_callback.log(LogLevel::Warn, format!(
1602
0
                            "block-announce-bad-header; peer_id={}; chain={}; hash={}; is_best={:?}; error={}",
1603
0
                            peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded.is_best, error
1604
0
                        ));
1605
0
1606
0
                        if inner.network.gossip_remove_desired(
1607
0
                            chain_id,
1608
0
                            &peer_id,
1609
0
                            service::GossipKind::ConsensusTransactions,
1610
0
                        ) {
1611
0
                            inner.peering_strategy.unassign_slot_and_ban(
1612
0
                                &chain_id,
1613
0
                                &peer_id,
1614
0
                                Instant::now() + Duration::from_secs(10),
1615
0
                            );
1616
0
                            inner.log_callback.log(
1617
0
                                LogLevel::Debug,
1618
0
                                format!(
1619
0
                                    "slot-unassigned; peer_id={}; chain={}; reason=bad-block-announce",
1620
0
                                    peer_id, inner.network[chain_id].log_name
1621
0
                                ),
1622
0
                            );
1623
0
                        }
1624
0
                        let _ = inner.network.gossip_close(
1625
0
                            chain_id,
1626
0
                            &peer_id,
1627
0
                            service::GossipKind::ConsensusTransactions,
1628
0
                        ); // TODO: what is the return value?
1629
                    }
1630
                }
1631
            }
1632
            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
1633
0
                peer_id,
1634
0
                chain_id,
1635
0
                best_number,
1636
0
                best_hash,
1637
0
                ..
1638
0
            }) => {
1639
0
                inner.log_callback.log(
1640
0
                    LogLevel::Debug,
1641
0
                    format!(
1642
0
                        "chain-connected; peer_id={}; chain={}; best_number={}; best_hash={}",
1643
0
                        peer_id,
1644
0
                        inner.network[chain_id].log_name,
1645
0
                        best_number,
1646
0
                        HashDisplay(&best_hash),
1647
0
                    ),
1648
0
                );
1649
0
                debug_assert!(inner.event_pending_send.is_none());
1650
0
                inner.event_pending_send = Some(Event::Connected {
1651
0
                    peer_id,
1652
0
                    chain_id,
1653
0
                    best_block_number: best_number,
1654
0
                    best_block_hash: best_hash,
1655
0
                });
1656
            }
1657
            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
1658
0
                peer_id,
1659
0
                chain_id,
1660
0
                ..
1661
0
            }) => {
1662
0
                inner.log_callback.log(
1663
0
                    LogLevel::Debug,
1664
0
                    format!(
1665
0
                        "chain-disconnected; peer_id={}; chain={}",
1666
0
                        peer_id, inner.network[chain_id].log_name
1667
0
                    ),
1668
0
                );
1669
0
1670
0
                // Note that peer doesn't necessarily have an out slot, as this event
1671
0
                // might happen as a result of an inbound gossip connection.
1672
0
                inner.peering_strategy.unassign_slot_and_ban(
1673
0
                    &chain_id,
1674
0
                    &peer_id,
1675
0
                    Instant::now() + Duration::from_secs(10),
1676
0
                );
1677
0
                if inner.network.gossip_remove_desired(
1678
0
                    chain_id,
1679
0
                    &peer_id,
1680
0
                    service::GossipKind::ConsensusTransactions,
1681
0
                ) {
1682
0
                    inner.log_callback.log(
1683
0
                        LogLevel::Debug,
1684
0
                        format!(
1685
0
                            "slot-unassigned; peer_id={}; chain={}; reason=gossip-disconnected",
1686
0
                            peer_id, inner.network[chain_id].log_name
1687
0
                        ),
1688
0
                    );
1689
0
                }
1690
1691
0
                debug_assert!(inner.event_pending_send.is_none());
1692
0
                inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id });
1693
            }
1694
            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
1695
0
                chain_id,
1696
0
                peer_id,
1697
0
                error,
1698
0
                ..
1699
0
            }) => {
1700
0
                inner.log_callback.log(
1701
0
                    LogLevel::Debug,
1702
0
                    format!(
1703
0
                        "chain-connect-attempt-failed; peer_id={}; chain={}; error={}",
1704
0
                        peer_id, inner.network[chain_id].log_name, error
1705
0
                    ),
1706
0
                );
1707
0
1708
0
                // Note that peer doesn't necessarily have an out slot, as this event
1709
0
                // might happen as a result of an inbound gossip connection.
1710
0
                if inner.network.gossip_remove_desired(
1711
0
                    chain_id,
1712
0
                    &peer_id,
1713
0
                    service::GossipKind::ConsensusTransactions,
1714
0
                ) {
1715
0
                    inner.log_callback.log(
1716
0
                        LogLevel::Debug,
1717
0
                        format!(
1718
0
                            "slot-unassigned; peer_id={}; chain={}; reason=gossip-open-failed",
1719
0
                            peer_id, inner.network[chain_id].log_name
1720
0
                        ),
1721
0
                    );
1722
0
                }
1723
1724
0
                if let service::GossipConnectError::GenesisMismatch { .. } = error {
1725
0
                    inner
1726
0
                        .peering_strategy
1727
0
                        .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id);
1728
0
                } else {
1729
0
                    inner.peering_strategy.unassign_slot_and_ban(
1730
0
                        &chain_id,
1731
0
                        &peer_id,
1732
0
                        Instant::now() + Duration::from_secs(15),
1733
0
                    );
1734
0
                }
1735
            }
1736
            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
1737
0
                chain_id,
1738
0
                peer_id,
1739
0
                kind: service::GossipKind::ConsensusTransactions,
1740
0
            }) => {
1741
0
                // TODO: log this
1742
0
                // The networking state machine guarantees that `GossipInDesired`
1743
0
                // can't happen if we are already opening an out slot, which we do
1744
0
                // immediately.
1745
0
                // TODO: add debug_assert! ^
1746
0
                if inner
1747
0
                    .network
1748
0
                    .opened_gossip_undesired_by_chain(chain_id)
1749
0
                    .count()
1750
0
                    < inner.network[chain_id].max_in_peers
1751
0
                {
1752
0
                    inner
1753
0
                        .network
1754
0
                        .gossip_open(
1755
0
                            chain_id,
1756
0
                            &peer_id,
1757
0
                            service::GossipKind::ConsensusTransactions,
1758
0
                        )
1759
0
                        .unwrap();
1760
0
                } else {
1761
0
                    inner
1762
0
                        .network
1763
0
                        .gossip_close(
1764
0
                            chain_id,
1765
0
                            &peer_id,
1766
0
                            service::GossipKind::ConsensusTransactions,
1767
0
                        )
1768
0
                        .unwrap();
1769
0
                }
1770
            }
1771
            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
1772
                // All `GossipInDesired` are immediately accepted or rejected, meaning
1773
                // that this event can't happen.
1774
0
                unreachable!()
1775
            }
1776
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1777
0
                substream_id,
1778
0
                peer_id,
1779
0
                chain_id,
1780
0
                response: service::RequestResult::Blocks(response),
1781
0
            }) => {
1782
0
                match &response {
1783
0
                    Ok(success) => {
1784
0
                        inner.log_callback.log(
1785
0
                            LogLevel::Debug,
1786
0
                            format!(
1787
0
                                "blocks-request-ended; outcome=success; peer_id={peer_id}; chain={}; response-blocks={}",
1788
0
                                inner.network[chain_id].log_name,
1789
0
                                success.len()
1790
0
                            ),
1791
0
                        );
1792
0
                    }
1793
0
                    Err(err) => {
1794
0
                        inner.log_callback.log(
1795
0
                            LogLevel::Debug,
1796
0
                            format!("blocks-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1797
0
                            inner.network[chain_id].log_name, err),
1798
0
                        );
1799
0
                    }
1800
                }
1801
1802
0
                let _ = inner
1803
0
                    .blocks_requests
1804
0
                    .remove(&substream_id)
1805
0
                    .unwrap()
1806
0
                    .send(response.map_err(BlocksRequestError::Request));
1807
            }
1808
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1809
0
                substream_id,
1810
0
                peer_id,
1811
0
                chain_id,
1812
0
                response: service::RequestResult::GrandpaWarpSync(response),
1813
0
            }) => {
1814
0
                match &response {
1815
0
                    Ok(success) => {
1816
0
                        let decoded = success.decode();
1817
0
                        inner.log_callback.log(
1818
0
                            LogLevel::Debug,
1819
0
                            format!(
1820
0
                                "warp-sync-request-ended; outcome=success; peer_id={peer_id}; chain={}; num-fragments={}; is-finished={:?}",
1821
0
                                inner.network[chain_id].log_name,
1822
0
                                decoded.fragments.len(), decoded.is_finished,
1823
0
                            ),
1824
0
                        );
1825
0
                    }
1826
0
                    Err(err) => {
1827
0
                        inner.log_callback.log(
1828
0
                            LogLevel::Debug,
1829
0
                            format!("warp-sync-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1830
0
                            inner.network[chain_id].log_name, err),
1831
0
                        );
1832
0
                    }
1833
                }
1834
1835
0
                let _ = inner
1836
0
                    .warp_sync_requests
1837
0
                    .remove(&substream_id)
1838
0
                    .unwrap()
1839
0
                    .send(response.map_err(WarpSyncRequestError::Request));
1840
            }
1841
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1842
0
                substream_id,
1843
0
                peer_id,
1844
0
                chain_id,
1845
0
                response: service::RequestResult::StorageProof(response),
1846
0
            }) => {
1847
0
                match &response {
1848
0
                    Ok(success) => {
1849
0
                        inner.log_callback.log(
1850
0
                            LogLevel::Debug,
1851
0
                            format!(
1852
0
                                "storage-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}",
1853
0
                                inner.network[chain_id].log_name,
1854
0
                                BytesDisplay(u64::try_from(success.decode().len()).unwrap()),
1855
0
                            ),
1856
0
                        );
1857
0
                    }
1858
0
                    Err(err) => {
1859
0
                        inner.log_callback.log(
1860
0
                            LogLevel::Debug,
1861
0
                            format!(
1862
0
                                "storage-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1863
0
                                inner.network[chain_id].log_name, err
1864
0
                            ),
1865
0
                        );
1866
0
                    }
1867
                }
1868
1869
0
                let _ = inner
1870
0
                    .storage_requests
1871
0
                    .remove(&substream_id)
1872
0
                    .unwrap()
1873
0
                    .send(response.map_err(|_| ()));
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s8_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s8_0B7_
1874
            }
1875
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1876
0
                substream_id,
1877
0
                peer_id,
1878
0
                chain_id,
1879
0
                response: service::RequestResult::CallProof(response),
1880
0
            }) => {
1881
0
                match &response {
1882
0
                    Ok(success) => {
1883
0
                        inner.log_callback.log(
1884
0
                            LogLevel::Debug,
1885
0
                            format!(
1886
0
                                "call-proof-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}",
1887
0
                                inner.network[chain_id].log_name,
1888
0
                                BytesDisplay(u64::try_from(success.decode().len()).unwrap()),
1889
0
                            ),
1890
0
                        );
1891
0
                    }
1892
0
                    Err(err) => {
1893
0
                        inner.log_callback.log(
1894
0
                            LogLevel::Debug,
1895
0
                            format!(
1896
0
                                "call-proof-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1897
0
                                inner.network[chain_id].log_name,
1898
0
                                err
1899
0
                            ),
1900
0
                        );
1901
0
                    }
1902
                }
1903
1904
0
                let _ = inner
1905
0
                    .call_proof_requests
1906
0
                    .remove(&substream_id)
1907
0
                    .unwrap()
1908
0
                    .send(response.map_err(|_| ()));
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s9_0B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s9_0B7_
1909
            }
1910
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1911
0
                peer_id: kademlia_request_target,
1912
0
                chain_id,
1913
0
                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
1914
                ..
1915
            }) => {
1916
0
                for (peer_id, addrs) in nodes {
1917
0
                    let mut valid_addrs = Vec::with_capacity(addrs.len());
1918
0
                    for addr in addrs {
1919
0
                        match Multiaddr::from_bytes(addr) {
1920
0
                            Ok(a) => valid_addrs.push(a),
1921
0
                            Err((error, addr)) => {
1922
0
                                inner.log_callback.log(
1923
0
                                    LogLevel::Debug,
1924
0
                                    format!(
1925
0
                                        "discovery-invalid-address; error={error}, addr={}, discovered_from={kademlia_request_target}",
1926
0
                                        hex::encode(&addr)
1927
0
                                    ),
1928
0
                                );
1929
0
                                continue;
1930
                            }
1931
                        }
1932
                    }
1933
1934
0
                    if !valid_addrs.is_empty() {
1935
                        // Note that we must call this function before `insert_address`,
1936
                        // as documented in `basic_peering_strategy`.
1937
                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
1938
0
                            peer_removed: Some(peer_removed),
1939
0
                        } = inner.peering_strategy.insert_chain_peer(
1940
0
                            chain_id,
1941
0
                            peer_id.clone(),
1942
0
                            100, // TODO: constant
1943
0
                        ) {
1944
0
                            inner.log_callback.log(
1945
0
                                LogLevel::Debug,
1946
0
                                format!(
1947
0
                                    "peer-forgotten; peer_id={}; chain={}",
1948
0
                                    peer_removed, inner.network[chain_id].log_name
1949
0
                                ),
1950
0
                            );
1951
0
                        }
1952
0
                    }
1953
1954
0
                    for addr in valid_addrs {
1955
0
                        inner.log_callback.log(
1956
0
                            LogLevel::Debug,
1957
0
                            format!(
1958
0
                                "discovered; chain={}; peer_id={peer_id}; address={addr}; discovered_from={kademlia_request_target}",
1959
0
                                inner.network[chain_id].log_name
1960
0
                            ),
1961
0
                        );
1962
0
1963
0
                        match inner
1964
0
                            .peering_strategy
1965
0
                             .insert_address(&peer_id, addr.into_bytes(), 10) // TODO: constant
1966
                            {
1967
0
                                basic_peering_strategy::InsertAddressResult::Inserted { address_removed: Some(addr_rm) } => {
1968
0
                                    let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap();
1969
0
                                    inner
1970
0
                                        .log_callback
1971
0
                                        .log(LogLevel::Debug, format!("address-purged; peer_id={}; address={}", peer_id, addr_rm));
1972
0
                                }
1973
0
                                basic_peering_strategy::InsertAddressResult::UnknownPeer => unreachable!(),
1974
0
                                _ => {}
1975
                            }
1976
                    }
1977
                }
1978
            }
1979
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1980
0
                peer_id,
1981
0
                chain_id,
1982
0
                response: service::RequestResult::KademliaFindNode(Err(error)),
1983
0
                ..
1984
0
            }) => {
1985
0
                inner.log_callback.log(
1986
0
                    LogLevel::Debug,
1987
0
                    format!(
1988
0
                        "discovery-error; chain={}; peer_id={peer_id}; error={error}",
1989
0
                        inner.network[chain_id].log_name
1990
0
                    ),
1991
0
                );
1992
0
            }
1993
            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
1994
                // We never start a request of any other kind.
1995
0
                unreachable!()
1996
            }
1997
            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
1998
                // Requests are answered immediately, and thus cancelling events can't happen.
1999
0
                unreachable!()
2000
            }
2001
            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2002
0
                peer_id,
2003
0
                substream_id,
2004
0
            }) => {
2005
0
                inner.log_callback.log(
2006
0
                    LogLevel::Debug,
2007
0
                    format!("identify-request; peer_id={}", peer_id),
2008
0
                );
2009
0
                inner
2010
0
                    .network
2011
0
                    .respond_identify(substream_id, &inner.identify_agent_version);
2012
0
            }
2013
            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn {
2014
0
                peer_id,
2015
0
                chain_id,
2016
0
                config,
2017
0
                substream_id,
2018
0
            }) => {
2019
0
                inner.log_callback.log(
2020
0
                    LogLevel::Debug,
2021
0
                    format!(
2022
0
                        "incoming-blocks-request; peer_id={}; chain={}",
2023
0
                        peer_id, inner.network[chain_id].log_name
2024
0
                    ),
2025
0
                );
2026
0
                let mut _jaeger_span = inner.jaeger_service.incoming_block_request_span(
2027
0
                    &inner.local_peer_id,
2028
0
                    &peer_id,
2029
0
                    config.desired_count.get(),
2030
0
                    if let (1, codec::BlocksRequestConfigStart::Hash(block_hash)) =
2031
0
                        (config.desired_count.get(), &config.start)
2032
                    {
2033
0
                        Some(block_hash)
2034
                    } else {
2035
0
                        None
2036
                    },
2037
                );
2038
2039
                // TODO: is it a good idea to await here while the lock is held and freezing the entire networking background task?
2040
0
                let response = blocks_request_response(
2041
0
                    &inner.network[chain_id].database,
2042
0
                    inner.network.block_number_bytes(chain_id),
2043
0
                    config,
2044
0
                )
2045
0
                .await;
2046
0
                inner.network.respond_blocks(
2047
0
                    substream_id,
2048
0
                    match response {
2049
0
                        Ok(b) => Some(b),
2050
0
                        Err(error) => {
2051
0
                            inner.log_callback.log(
2052
0
                                LogLevel::Warn,
2053
0
                                format!("incoming-blocks-request-error; error={}", error),
2054
0
                            );
2055
0
                            None
2056
                        }
2057
                    },
2058
                );
2059
            }
2060
            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2061
0
                chain_id,
2062
0
                peer_id,
2063
0
                state,
2064
0
            }) => {
2065
0
                inner.log_callback.log(LogLevel::Debug, format!(
2066
0
                    "grandpa-neighbor-packet; peer_id={}; chain={}; round_number={}; set_id={}; commit_finalized_height={}",
2067
0
                    peer_id,
2068
0
                    inner.network[chain_id].log_name,
2069
0
                    state.round_number,
2070
0
                    state.set_id,
2071
0
                    state.commit_finalized_height,
2072
0
                ));
2073
0
2074
0
                debug_assert!(inner.event_pending_send.is_none());
2075
0
                inner.event_pending_send = Some(Event::GrandpaNeighborPacket {
2076
0
                    chain_id,
2077
0
                    peer_id,
2078
0
                    finalized_block_height: state.commit_finalized_height,
2079
0
                });
2080
            }
2081
            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2082
0
                chain_id,
2083
0
                peer_id,
2084
0
                message,
2085
0
            }) => {
2086
0
                inner.log_callback.log(
2087
0
                    LogLevel::Debug,
2088
0
                    format!(
2089
0
                        "grandpa-commit-message; peer_id={}; chain={}; target_hash={}",
2090
0
                        peer_id,
2091
0
                        inner.network[chain_id].log_name,
2092
0
                        HashDisplay(message.decode().target_hash),
2093
0
                    ),
2094
0
                );
2095
0
            }
2096
0
            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2097
0
                inner.log_callback.log(
2098
0
                    LogLevel::Warn,
2099
0
                    format!("protocol-error; peer_id={}; error={}", peer_id, error),
2100
0
                );
2101
0
                inner
2102
0
                    .peering_strategy
2103
0
                    .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(5));
2104
0
                // TODO: log chain names?
2105
0
                inner.log_callback.log(
2106
0
                    LogLevel::Debug,
2107
0
                    format!(
2108
0
                        "all-slots-unassigned; reason=no-address; peer_id={}",
2109
0
                        peer_id
2110
0
                    ),
2111
0
                );
2112
0
            }
2113
2114
0
            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2115
0
                inner.peering_strategy.assign_slot(&chain_id, &peer_id);
2116
0
2117
0
                inner.log_callback.log(
2118
0
                    LogLevel::Debug,
2119
0
                    format!(
2120
0
                        "slot-assigned; peer_id={}; chain={}",
2121
0
                        peer_id, inner.network[chain_id].log_name
2122
0
                    ),
2123
0
                );
2124
0
2125
0
                inner.network.gossip_insert_desired(
2126
0
                    chain_id,
2127
0
                    peer_id,
2128
0
                    service::GossipKind::ConsensusTransactions,
2129
0
                );
2130
0
            }
2131
2132
0
            WakeUpReason::CanStartConnect(peer_id) => {
2133
0
                inner.num_pending_out_attempts += 1;
2134
2135
0
                let Some(multiaddr) = inner
2136
0
                    .peering_strategy
2137
0
                    .pick_address_and_add_connection(&peer_id)
2138
                else {
2139
                    // There is no address for that peer in the address book.
2140
0
                    inner.network.gossip_remove_desired_all(
2141
0
                        &peer_id,
2142
0
                        service::GossipKind::ConsensusTransactions,
2143
0
                    );
2144
0
                    for (chain_id, what_happened) in inner
2145
0
                        .peering_strategy
2146
0
                        .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(10))
2147
                    {
2148
0
                        if matches!(
2149
0
                            what_happened,
2150
                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2151
0
                        ) {
2152
0
                            inner.log_callback.log(
2153
0
                                LogLevel::Debug,
2154
0
                                format!(
2155
0
                                    "slot-unassigned; peer_id={}; chain={}; reason=no-address",
2156
0
                                    peer_id, inner.network[*chain_id].log_name
2157
0
                                ),
2158
0
                            );
2159
0
                        }
2160
                    }
2161
0
                    continue;
2162
                };
2163
2164
0
                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2165
0
                    Ok(a) => a,
2166
0
                    Err((multiaddr::FromBytesError, multiaddr)) => {
2167
0
                        // Address is in an invalid format.
2168
0
                        inner.log_callback.log(
2169
0
                            LogLevel::Debug,
2170
0
                            format!(
2171
0
                                "invalid-address; peer_id={}; address={:?}",
2172
0
                                peer_id, multiaddr
2173
0
                            ),
2174
0
                        );
2175
0
                        let _was_in = inner
2176
0
                            .peering_strategy
2177
0
                            .decrease_address_connections_and_remove_if_zero(&peer_id, &multiaddr);
2178
0
                        debug_assert!(_was_in.is_ok());
2179
0
                        continue;
2180
                    }
2181
                };
2182
2183
                // Convert the `multiaddr` (typically of the form `/ip4/a.b.c.d/tcp/d`) into
2184
                // a `Future<dyn Output = Result<TcpStream, ...>>`.
2185
0
                let socket = match tasks::multiaddr_to_socket(&multiaddr) {
2186
0
                    Ok(socket) => socket,
2187
                    Err(_) => {
2188
                        // Address is in an invalid format or isn't supported.
2189
0
                        inner.log_callback.log(
2190
0
                            LogLevel::Debug,
2191
0
                            format!(
2192
0
                                "invalid-address; peer_id={}; address={}",
2193
0
                                peer_id, multiaddr
2194
0
                            ),
2195
0
                        );
2196
0
                        let _was_in = inner
2197
0
                            .peering_strategy
2198
0
                            .decrease_address_connections_and_remove_if_zero(
2199
0
                                &peer_id,
2200
0
                                multiaddr.as_ref(),
2201
0
                            );
2202
0
                        debug_assert!(_was_in.is_ok());
2203
0
                        continue;
2204
                    }
2205
                };
2206
2207
0
                inner.log_callback.log(
2208
0
                    LogLevel::Debug,
2209
0
                    format!("start-connecting; peer_id={peer_id}; address={multiaddr}"),
2210
0
                );
2211
0
2212
0
                let (tx, rx) = channel::bounded(16); // TODO: ?!
2213
0
2214
0
                let (connection_id, connection_task) = inner.network.add_single_stream_connection(
2215
0
                    Instant::now(),
2216
0
                    service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2217
0
                        is_initiator: true,
2218
0
                        noise_key: &inner.noise_key,
2219
0
                    },
2220
0
                    multiaddr.clone().into_bytes(),
2221
0
                    Some(peer_id.clone()),
2222
0
                    tx,
2223
0
                );
2224
0
2225
0
                // Handle the connection in a separate task.
2226
0
                (inner.tasks_executor)(Box::pin(tasks::connection_task(
2227
0
                    inner.log_callback.clone(),
2228
0
                    multiaddr.to_string(),
2229
0
                    socket,
2230
0
                    connection_id,
2231
0
                    connection_task,
2232
0
                    rx,
2233
0
                    inner.from_connections_tx.clone(),
2234
0
                )));
2235
            }
2236
2237
0
            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2238
0
                inner
2239
0
                    .network
2240
0
                    .gossip_open(
2241
0
                        chain_id,
2242
0
                        &peer_id,
2243
0
                        service::GossipKind::ConsensusTransactions,
2244
0
                    )
2245
0
                    .unwrap();
2246
0
2247
0
                inner.log_callback.log(
2248
0
                    LogLevel::Debug,
2249
0
                    format!(
2250
0
                        "gossip-open; peer_id={}; chain={}",
2251
0
                        peer_id, &inner.network[chain_id].log_name
2252
0
                    ),
2253
0
                );
2254
0
            }
2255
        }
2256
    }
2257
0
}
_RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0B5_
Line
Count
Source
847
21
async fn background_task(mut inner: Inner) {
848
    loop {
849
        enum WakeUpReason {
850
            IncomingConnection {
851
                socket: TcpStream,
852
                socket_addr: SocketAddr,
853
            },
854
            NetworkEvent(service::Event<channel::Sender<service::CoordinatorToConnection>>),
855
            Message(ToBackground),
856
            ForegroundClosed,
857
            FromConnectionTask {
858
                connection_id: service::ConnectionId,
859
                // TODO: this Option is weird
860
                message: Option<service::ConnectionToCoordinator>,
861
            },
862
            EventSendersReady,
863
            CanAssignSlot(PeerId, ChainId),
864
            CanStartConnect(PeerId),
865
            CanOpenGossip(PeerId, ChainId),
866
            StartKademliaDiscoveries,
867
            MessageToConnection {
868
                connection_id: service::ConnectionId,
869
                message: service::CoordinatorToConnection,
870
            },
871
        }
872
873
126
        let wake_up_reason = async {
874
            inner
875
                .to_background_rx
876
                .next()
877
                .await
878
                .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
879
        }
880
147
        .or({
881
147
            let event_senders_ready = 
matches!0
(inner.event_senders, either::Left(_));
882
147
            let event_pending_send = &inner.event_pending_send;
883
147
            let network = &mut inner.network;
884
147
            let peering_strategy = &mut inner.peering_strategy;
885
147
            let num_pending_out_attempts = &inner.num_pending_out_attempts;
886
            async move {
887
                if let Some(event) = (event_senders_ready && event_pending_send.is_none())
888
                    .then(|| network.next_event())
889
                    .flatten()
890
                {
891
                    WakeUpReason::NetworkEvent(event)
892
                } else if let Some((connection_id, message)) = network.pull_message_to_connection()
893
                {
894
                    WakeUpReason::MessageToConnection {
895
                        connection_id,
896
                        message,
897
                    }
898
                } else if let Some((peer_id, chain_id)) = network
899
                    .connected_unopened_gossip_desired()
900
                    .next()
901
                    .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id))
902
                {
903
                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
904
                } else if let Some(peer_id) = (*num_pending_out_attempts < 16)
905
                    .then(|| network.unconnected_desired().next().cloned())
906
                    .flatten()
907
                {
908
                    WakeUpReason::CanStartConnect(peer_id)
909
                } else {
910
                    'search: loop {
911
                        let mut earlier_unban = None;
912
913
                        for chain_id in network.chains().collect::<Vec<_>>() {
914
                            if network.gossip_desired_num(
915
                                chain_id,
916
                                service::GossipKind::ConsensusTransactions,
917
                            ) >= network[chain_id].max_slots
918
                            {
919
                                continue;
920
                            }
921
922
                            match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now())
923
                            {
924
                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
925
                                    break 'search WakeUpReason::CanAssignSlot(
926
                                        peer_id.clone(),
927
                                        chain_id,
928
                                    )
929
                                }
930
                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
931
                                    next_unban,
932
                                } => {
933
                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
934
                                        earlier_unban = Some(next_unban.clone());
935
                                    }
936
                                }
937
                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
938
                            }
939
                        }
940
941
                        if let Some(earlier_unban) = earlier_unban {
942
                            smol::Timer::at(earlier_unban).await;
943
                        } else {
944
                            future::pending::<()>().await;
945
                        }
946
                    }
947
                }
948
            }
949
        })
950
147
        .or(async {
951
            if let either::Right(sending) = &mut inner.event_senders {
952
                let event_senders = sending.await;
953
                inner.event_senders = either::Left(event_senders);
954
                WakeUpReason::EventSendersReady
955
            } else if inner.event_pending_send.is_some() {
956
                WakeUpReason::EventSendersReady
957
            } else {
958
                future::pending().await
959
            }
960
147
        })
961
147
        .or(async {
962
            (&mut inner.next_discovery).await;
963
            inner.next_discovery = smol::Timer::after(inner.next_discovery_period);
964
            inner.next_discovery_period =
965
                cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120));
966
            WakeUpReason::StartKademliaDiscoveries
967
147
        })
968
147
        .or(async {
969
            let (connection_id, message) = inner.from_connections_rx.next().await.unwrap();
970
            WakeUpReason::FromConnectionTask {
971
                connection_id,
972
                message,
973
            }
974
147
        })
975
147
        .or(async {
976
            let Some((socket, socket_addr)) = inner.incoming_connections.next().await else {
977
                future::pending().await
978
            };
979
            WakeUpReason::IncomingConnection {
980
                socket,
981
                socket_addr,
982
            }
983
147
        })
984
122
        .await;
985
986
0
        match wake_up_reason {
987
            WakeUpReason::MessageToConnection {
988
0
                connection_id,
989
0
                message,
990
0
            } => {
991
0
                // Note that it is critical for the sending to not take too long here, in order to
992
0
                // not block the process of the network service.
993
0
                // In particular, if sending the message to the connection is blocked due to
994
0
                // sending a message on the connection-to-coordinator channel, this will result
995
0
                // in a deadlock.
996
0
                // For this reason, the connection task is always ready to immediately accept a
997
0
                // message on the coordinator-to-connection channel.
998
0
                inner.network[connection_id].send(message).await.unwrap();
999
            }
1000
1001
            WakeUpReason::FromConnectionTask {
1002
0
                connection_id,
1003
0
                message,
1004
            } => {
1005
0
                if let Some(message) = message {
1006
0
                    inner
1007
0
                        .network
1008
0
                        .inject_connection_message(connection_id, message);
1009
0
                }
1010
            }
1011
1012
            WakeUpReason::IncomingConnection {
1013
0
                socket,
1014
0
                socket_addr,
1015
0
            } => {
1016
0
                // The Nagle algorithm, implemented in the kernel, consists in buffering the
1017
0
                // data to be sent out and waiting a bit before actually sending it out, in
1018
0
                // order to potentially merge multiple writes in a row into one packet. In
1019
0
                // the implementation below, it is guaranteed that the buffer in `WithBuffers`
1020
0
                // is filled with as much data as possible before the operating system gets
1021
0
                // involved. As such, we disable the Nagle algorithm, in order to avoid adding
1022
0
                // an artificial delay to all sends.
1023
0
                let _ = socket.set_nodelay(true);
1024
1025
0
                let multiaddr = [
1026
0
                    match socket_addr.ip() {
1027
0
                        IpAddr::V4(ip) => Protocol::<&[u8]>::Ip4(ip.octets()),
1028
0
                        IpAddr::V6(ip) => Protocol::Ip6(ip.octets()),
1029
                    },
1030
0
                    Protocol::Tcp(socket_addr.port()),
1031
0
                ]
1032
0
                .into_iter()
1033
0
                .collect::<Multiaddr>();
1034
0
1035
0
                inner.log_callback.log(
1036
0
                    LogLevel::Debug,
1037
0
                    format!("incoming-connection; multiaddr={}", multiaddr),
1038
0
                );
1039
0
1040
0
                let (tx, rx) = channel::bounded(16); // TODO: ?!
1041
0
1042
0
                let (connection_id, connection_task) = inner.network.add_single_stream_connection(
1043
0
                    Instant::now(),
1044
0
                    service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
1045
0
                        is_initiator: false,
1046
0
                        noise_key: &inner.noise_key,
1047
0
                    },
1048
0
                    multiaddr.clone().into_bytes(),
1049
0
                    None,
1050
0
                    tx,
1051
0
                );
1052
0
1053
0
                (inner.tasks_executor)(Box::pin(tasks::connection_task(
1054
0
                    inner.log_callback.clone(),
1055
0
                    multiaddr.to_string(),
1056
0
                    async move { Ok(socket) },
1057
0
                    connection_id,
1058
0
                    connection_task,
1059
0
                    rx,
1060
0
                    inner.from_connections_tx.clone(),
1061
0
                )));
1062
            }
1063
1064
            WakeUpReason::StartKademliaDiscoveries => {
1065
104
                for chain_id in inner.network.chains().collect::<Vec<_>>() {
1066
104
                    let random_peer_id =
1067
104
                        PeerId::from_public_key(&peer_id::PublicKey::Ed25519(rand::random()));
1068
104
1069
104
                    // TODO: select target closest to the random peer instead
1070
104
                    let target = inner
1071
104
                        .network
1072
104
                        .gossip_connected_peers(
1073
104
                            chain_id,
1074
104
                            service::GossipKind::ConsensusTransactions,
1075
104
                        )
1076
104
                        .next()
1077
104
                        .cloned();
1078
1079
104
                    if let Some(
target0
) = target {
1080
0
                        match inner.network.start_kademlia_find_node_request(
1081
0
                            &target,
1082
0
                            chain_id,
1083
0
                            &random_peer_id,
1084
0
                            Duration::from_secs(20),
1085
0
                        ) {
1086
0
                            Ok(_) => {}
1087
0
                            Err(service::StartRequestError::NoConnection) => unreachable!(),
1088
                        };
1089
104
                    } else {
1090
104
                        // TODO: log message
1091
104
                    }
1092
                }
1093
            }
1094
1095
            WakeUpReason::ForegroundClosed => {
1096
                // TODO: do a clean shutdown of all the connections
1097
0
                return;
1098
            }
1099
1100
            WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan {
1101
0
                peer_id,
1102
0
                chain_id,
1103
0
                severity,
1104
0
                reason,
1105
0
            }) => {
1106
0
                // Note that peer doesn't necessarily have an out slot.
1107
0
                inner.peering_strategy.unassign_slot_and_ban(
1108
0
                    &chain_id,
1109
0
                    &peer_id,
1110
0
                    Instant::now()
1111
0
                        + Duration::from_secs(match severity {
1112
0
                            BanSeverity::Low => 10,
1113
0
                            BanSeverity::High => 40,
1114
                        }),
1115
                );
1116
0
                if inner.network.gossip_remove_desired(
1117
0
                    chain_id,
1118
0
                    &peer_id,
1119
0
                    service::GossipKind::ConsensusTransactions,
1120
0
                ) {
1121
0
                    inner.log_callback.log(
1122
0
                        LogLevel::Debug,
1123
0
                        format!(
1124
0
                            "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}",
1125
0
                            peer_id, inner.network[chain_id].log_name, reason
1126
0
                        ),
1127
0
                    );
1128
0
                }
1129
1130
0
                if inner.network.gossip_is_connected(
1131
0
                    chain_id,
1132
0
                    &peer_id,
1133
0
                    service::GossipKind::ConsensusTransactions,
1134
0
                ) {
1135
0
                    let _close_result = inner.network.gossip_close(
1136
0
                        chain_id,
1137
0
                        &peer_id,
1138
0
                        service::GossipKind::ConsensusTransactions,
1139
0
                    );
1140
0
                    debug_assert!(_close_result.is_ok());
1141
1142
0
                    inner.log_callback.log(
1143
0
                        LogLevel::Debug,
1144
0
                        format!(
1145
0
                            "chain-disconnected; peer_id={}; chain={}",
1146
0
                            peer_id, inner.network[chain_id].log_name
1147
0
                        ),
1148
0
                    );
1149
0
1150
0
                    debug_assert!(inner.event_pending_send.is_none());
1151
0
                    inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id });
1152
0
                }
1153
            }
1154
1155
            WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock {
1156
0
                target,
1157
0
                chain_id,
1158
0
                scale_encoded_header,
1159
0
                is_best,
1160
0
                result_tx,
1161
0
            }) => {
1162
0
                let _ = result_tx.send(inner.network.gossip_send_block_announce(
1163
0
                    &target,
1164
0
                    chain_id,
1165
0
                    &scale_encoded_header,
1166
0
                    is_best,
1167
0
                ));
1168
0
            }
1169
            WakeUpReason::Message(ToBackground::ForegroundSetLocalBestBlock {
1170
21
                chain_id,
1171
21
                best_hash,
1172
21
                best_number,
1173
21
            }) => {
1174
21
                inner
1175
21
                    .network
1176
21
                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1177
21
            }
1178
            WakeUpReason::Message(ToBackground::ForegroundBlocksRequest {
1179
0
                target,
1180
0
                chain_id,
1181
0
                config,
1182
0
                result_tx,
1183
0
            }) => {
1184
0
                inner.log_callback.log(
1185
0
                    LogLevel::Debug,
1186
0
                    format!(
1187
0
                        "blocks-request-start; peer_id={}; chain={}; start={}; desired_count={}; direction={}",
1188
0
                        target,
1189
0
                        inner.network[chain_id].log_name,
1190
0
                        match &config.start {
1191
0
                            codec::BlocksRequestConfigStart::Hash(h) => either::Left(HashDisplay(h)),
1192
0
                            codec::BlocksRequestConfigStart::Number(n) => either::Right(n),
1193
                        },
1194
                        config.desired_count,
1195
0
                        match config.direction {
1196
0
                            codec::BlocksRequestDirection::Ascending => "ascending",
1197
0
                            codec::BlocksRequestDirection::Descending => "descending",
1198
                        },
1199
                    ),
1200
                );
1201
1202
0
                match inner.network.start_blocks_request(
1203
0
                    &target,
1204
0
                    chain_id,
1205
0
                    config,
1206
0
                    Duration::from_secs(12),
1207
0
                ) {
1208
0
                    Ok(request_id) => {
1209
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1210
0
                        inner.blocks_requests.insert(request_id, result_tx);
1211
0
                    }
1212
0
                    Err(service::StartRequestError::NoConnection) => {
1213
0
                        inner.log_callback.log(
1214
0
                            LogLevel::Debug,
1215
0
                            format!(
1216
0
                                "blocks-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1217
0
                                target,
1218
0
                                inner.network[chain_id].log_name,
1219
0
                            ),
1220
0
                        );
1221
0
                        let _ = result_tx.send(Err(BlocksRequestError::NoConnection));
1222
0
                    }
1223
                }
1224
            }
1225
            WakeUpReason::Message(ToBackground::ForegroundWarpSyncRequest {
1226
0
                target,
1227
0
                chain_id,
1228
0
                begin_hash,
1229
0
                result_tx,
1230
0
            }) => {
1231
0
                inner.log_callback.log(
1232
0
                    LogLevel::Debug,
1233
0
                    format!(
1234
0
                        "warp-sync-request-start; peer_id={}; chain={}; begin-hash={}",
1235
0
                        target,
1236
0
                        inner.network[chain_id].log_name,
1237
0
                        HashDisplay(&begin_hash)
1238
0
                    ),
1239
0
                );
1240
0
1241
0
                match inner.network.start_grandpa_warp_sync_request(
1242
0
                    &target,
1243
0
                    chain_id,
1244
0
                    begin_hash,
1245
0
                    Duration::from_secs(12),
1246
0
                ) {
1247
0
                    Ok(request_id) => {
1248
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1249
0
                        inner.warp_sync_requests.insert(request_id, result_tx);
1250
0
                    }
1251
0
                    Err(service::StartRequestError::NoConnection) => {
1252
0
                        inner.log_callback.log(
1253
0
                            LogLevel::Debug,
1254
0
                            format!(
1255
0
                                "warp-sync-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1256
0
                                target,
1257
0
                                inner.network[chain_id].log_name,
1258
0
                            ),
1259
0
                        );
1260
0
                        let _ = result_tx.send(Err(WarpSyncRequestError::NoConnection));
1261
0
                    }
1262
                }
1263
            }
1264
            WakeUpReason::Message(ToBackground::ForegroundStorageProofRequest {
1265
0
                target,
1266
0
                chain_id,
1267
0
                config,
1268
0
                result_tx,
1269
0
            }) => {
1270
0
                inner.log_callback.log(
1271
0
                    LogLevel::Debug,
1272
0
                    format!(
1273
0
                        "storage-request-start; peer_id={}; chain={}; block-hash={}; num-keys={}",
1274
0
                        target,
1275
0
                        inner.network[chain_id].log_name,
1276
0
                        HashDisplay(&config.block_hash),
1277
0
                        config.keys.len()
1278
0
                    ),
1279
0
                );
1280
0
1281
0
                match inner.network.start_storage_proof_request(
1282
0
                    &target,
1283
0
                    chain_id,
1284
0
                    config,
1285
0
                    Duration::from_secs(12),
1286
0
                ) {
1287
0
                    Ok(request_id) => {
1288
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1289
0
                        inner.storage_requests.insert(request_id, result_tx);
1290
0
                    }
1291
0
                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1292
0
                        inner.log_callback.log(
1293
0
                            LogLevel::Debug,
1294
0
                            format!(
1295
0
                                "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1296
0
                                target,
1297
0
                                inner.network[chain_id].log_name,
1298
0
                            ),
1299
0
                        );
1300
0
                        let _ = result_tx.send(Err(()));
1301
0
                    }
1302
0
                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1303
0
                        inner.log_callback.log(
1304
0
                            LogLevel::Debug,
1305
0
                            format!(
1306
0
                                "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large",
1307
0
                                target,
1308
0
                                inner.network[chain_id].log_name,
1309
0
                            ),
1310
0
                        );
1311
0
                        let _ = result_tx.send(Err(()));
1312
0
                    }
1313
                }
1314
            }
1315
            WakeUpReason::Message(ToBackground::ForegroundCallProofRequest {
1316
0
                target,
1317
0
                chain_id,
1318
0
                config,
1319
0
                result_tx,
1320
0
            }) => {
1321
0
                inner.log_callback.log(
1322
0
                    LogLevel::Debug,
1323
0
                    format!(
1324
0
                        "call-proof-request-start; peer_id={}; chain={}; block-hash={}; function={}",
1325
0
                        target,
1326
0
                        inner.network[chain_id].log_name,
1327
0
                        HashDisplay(&config.block_hash),
1328
0
                        config.method
1329
0
                    ),
1330
0
                );
1331
0
1332
0
                match inner.network.start_call_proof_request(
1333
0
                    &target,
1334
0
                    chain_id,
1335
0
                    config,
1336
0
                    Duration::from_secs(12),
1337
0
                ) {
1338
0
                    Ok(request_id) => {
1339
0
                        // TODO: somehow cancel the request if the `rx` is dropped?
1340
0
                        inner.call_proof_requests.insert(request_id, result_tx);
1341
0
                    }
1342
0
                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1343
0
                        inner.log_callback.log(
1344
0
                            LogLevel::Debug,
1345
0
                            format!(
1346
0
                                "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection",
1347
0
                                target,
1348
0
                                inner.network[chain_id].log_name,
1349
0
                            ),
1350
0
                        );
1351
0
                        let _ = result_tx.send(Err(()));
1352
0
                    }
1353
0
                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1354
0
                        inner.log_callback.log(
1355
0
                            LogLevel::Debug,
1356
0
                            format!(
1357
0
                                "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large",
1358
0
                                target,
1359
0
                                inner.network[chain_id].log_name,
1360
0
                            ),
1361
0
                        );
1362
0
                        let _ = result_tx.send(Err(()));
1363
0
                    }
1364
                }
1365
            }
1366
0
            WakeUpReason::Message(ToBackground::ForegroundGetNumConnections { result_tx }) => {
1367
0
                let _ = result_tx.send(inner.network.num_connections());
1368
0
            }
1369
            WakeUpReason::Message(ToBackground::ForegroundGetNumPeers {
1370
1
                chain_id,
1371
1
                result_tx,
1372
1
            }) => {
1373
1
                // TODO: optimize?
1374
1
                let _ = result_tx.send(
1375
1
                    inner
1376
1
                        .network
1377
1
                        .gossip_connected_peers(
1378
1
                            chain_id,
1379
1
                            service::GossipKind::ConsensusTransactions,
1380
1
                        )
1381
1
                        .count(),
1382
1
                );
1383
1
            }
1384
0
            WakeUpReason::Message(ToBackground::ForegroundGetNumTotalPeers { result_tx }) => {
1385
0
                // TODO: optimize?
1386
0
                let total = inner
1387
0
                    .network
1388
0
                    .chains()
1389
0
                    .map(|chain_id| {
1390
                        inner
1391
                            .network
1392
                            .gossip_connected_peers(
1393
                                chain_id,
1394
                                service::GossipKind::ConsensusTransactions,
1395
                            )
1396
                            .count()
1397
0
                    })
1398
0
                    .sum();
1399
0
                let _ = result_tx.send(total);
1400
0
            }
1401
1402
            WakeUpReason::EventSendersReady => {
1403
                // Dispatch the pending event, if any, to the various senders.
1404
1405
                // We made sure that the senders were ready before generating an event.
1406
0
                let either::Left(event_senders) = &mut inner.event_senders else {
1407
0
                    unreachable!()
1408
                };
1409
1410
0
                if let Some(event_to_dispatch) = inner.event_pending_send.take() {
1411
0
                    let mut event_senders = mem::take(event_senders);
1412
0
                    inner.event_senders = either::Right(Box::pin(async move {
1413
                        // Elements in `event_senders` are removed one by one and inserted
1414
                        // back if the channel is still open.
1415
                        for index in (0..event_senders.len()).rev() {
1416
                            let event_sender = event_senders.swap_remove(index);
1417
                            if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1418
                                continue;
1419
                            }
1420
1421
                            event_senders.push(event_sender);
1422
                        }
1423
                        event_senders
1424
0
                    }));
1425
0
                }
1426
            }
1427
1428
            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1429
0
                id,
1430
0
                expected_peer_id,
1431
0
                peer_id,
1432
0
                ..
1433
0
            }) => {
1434
0
                inner.num_pending_out_attempts -= 1;
1435
0
1436
0
                let remote_addr =
1437
0
                    Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned())
1438
0
                        .unwrap(); // TODO: review this unwrap
1439
0
                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
1440
                {
1441
0
                    inner
1442
0
                    .log_callback
1443
0
                    .log(LogLevel::Debug, format!("connected-peer-id-mismatch; expected_peer_id={}; actual_peer_id={}; address={}", expected_peer_id, peer_id, remote_addr));
1444
0
1445
0
                    let _was_in = inner
1446
0
                        .peering_strategy
1447
0
                        .decrease_address_connections_and_remove_if_zero(
1448
0
                            expected_peer_id,
1449
0
                            remote_addr.as_ref(),
1450
0
                        );
1451
0
                    debug_assert!(_was_in.is_ok());
1452
                    if let basic_peering_strategy::InsertAddressConnectionsResult::Inserted {
1453
0
                        address_removed: Some(addr_rm),
1454
0
                    } = inner.peering_strategy.increase_address_connections(
1455
0
                        &peer_id,
1456
0
                        remote_addr.into_bytes().to_owned(),
1457
0
                        10, // TODO: constant
1458
0
                    ) {
1459
0
                        let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap();
1460
0
                        inner.log_callback.log(
1461
0
                            LogLevel::Debug,
1462
0
                            format!("address-purged; peer_id={}; address={}", peer_id, addr_rm),
1463
0
                        );
1464
0
                    }
1465
0
                } else {
1466
0
                    inner
1467
0
                        .log_callback
1468
0
                        .log(LogLevel::Debug, format!("connected; peer_id={}", peer_id));
1469
0
                }
1470
            }
1471
1472
            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1473
                expected_peer_id: Some(_),
1474
                ..
1475
            })
1476
            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1477
0
                let (address, peer_id, handshake_finished) = match wake_up_reason {
1478
                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1479
0
                        address,
1480
0
                        expected_peer_id: Some(peer_id),
1481
0
                        ..
1482
0
                    }) => (address, peer_id, false),
1483
                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
1484
0
                        address,
1485
0
                        peer_id,
1486
0
                        ..
1487
0
                    }) => (address, peer_id, true),
1488
0
                    _ => unreachable!(),
1489
                };
1490
1491
0
                if !handshake_finished {
1492
0
                    inner.num_pending_out_attempts -= 1;
1493
0
                }
1494
1495
0
                inner
1496
0
                    .peering_strategy
1497
0
                    .decrease_address_connections(&peer_id, &address)
1498
0
                    .unwrap();
1499
0
                let address = Multiaddr::from_bytes(&address).unwrap();
1500
0
                inner.log_callback.log(
1501
0
                    LogLevel::Debug,
1502
0
                    format!(
1503
0
                        "disconnected; handshake-finished={}; peer_id={}; address={}",
1504
0
                        handshake_finished, peer_id, address
1505
0
                    ),
1506
0
                );
1507
0
1508
0
                // Ban the peer in order to avoid trying over and over again the same address(es).
1509
0
                // Even if the handshake was finished, it is possible that the peer simply shuts
1510
0
                // down connections immediately after it has been opened, hence the ban.
1511
0
                // Due to race conditions and peerid mismatches, it is possible that there is
1512
0
                // another existing connection or connection attempt with that same peer. However,
1513
0
                // it is not possible to be sure that we will reach 0 connections or connection
1514
0
                // attempts, and thus we ban the peer every time.
1515
0
                let ban_duration = Duration::from_secs(5);
1516
0
                inner.network.gossip_remove_desired_all(
1517
0
                    &peer_id,
1518
0
                    service::GossipKind::ConsensusTransactions,
1519
0
                );
1520
0
                for (&chain_id, what_happened) in inner
1521
0
                    .peering_strategy
1522
0
                    .unassign_slots_and_ban(&peer_id, Instant::now() + ban_duration)
1523
                {
1524
0
                    if matches!(
1525
0
                        what_happened,
1526
                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1527
0
                    ) {
1528
0
                        inner.log_callback.log(
1529
0
                            LogLevel::Debug,
1530
0
                            format!(
1531
0
                                "slot-unassigned; peer_id={}; chain={}; reason=disconnected",
1532
0
                                peer_id, inner.network[chain_id].log_name
1533
0
                            ),
1534
0
                        );
1535
0
                    }
1536
                }
1537
            }
1538
1539
            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1540
                expected_peer_id: None,
1541
0
                address,
1542
0
                ..
1543
0
            }) => {
1544
0
                inner.log_callback.log(
1545
0
                    LogLevel::Debug,
1546
0
                    format!(
1547
0
                        "disconnected; handshake-finished=false; address={}",
1548
0
                        Multiaddr::from_bytes(&address).unwrap()
1549
0
                    ),
1550
0
                );
1551
0
            }
1552
1553
            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1554
0
                id,
1555
0
                peer_id,
1556
0
                ping_time,
1557
0
            }) => {
1558
0
                let remote_addr =
1559
0
                    Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned())
1560
0
                        .unwrap(); // TODO: review this unwrap
1561
0
                inner.log_callback.log(
1562
0
                    LogLevel::Debug,
1563
0
                    format!("ping; peer_id={peer_id}; remote_addr={remote_addr}); ping-time={ping_time:?}"),
1564
0
                );
1565
0
            }
1566
1567
            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1568
0
                chain_id,
1569
0
                peer_id,
1570
0
                announce,
1571
0
            }) => {
1572
0
                let decoded = announce.decode();
1573
0
                let header_hash =
1574
0
                    header::hash_from_scale_encoded_header(decoded.scale_encoded_header);
1575
0
                match header::decode(
1576
0
                    decoded.scale_encoded_header,
1577
0
                    inner.network.block_number_bytes(chain_id),
1578
0
                ) {
1579
0
                    Ok(decoded_header) => {
1580
0
                        let mut _jaeger_span = inner.jaeger_service.block_announce_receive_span(
1581
0
                            &inner.local_peer_id,
1582
0
                            &peer_id,
1583
0
                            decoded_header.number,
1584
0
                            &decoded_header.hash(inner.network.block_number_bytes(chain_id)),
1585
0
                        );
1586
0
1587
0
                        inner.log_callback.log(LogLevel::Debug, format!(
1588
0
                            "block-announce; peer_id={}; chain={}; hash={}; number={}; is_best={:?}",
1589
0
                            peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded_header.number, decoded.is_best
1590
0
                        ));
1591
0
1592
0
                        debug_assert!(inner.event_pending_send.is_none());
1593
0
                        inner.event_pending_send = Some(Event::BlockAnnounce {
1594
0
                            chain_id,
1595
0
                            peer_id,
1596
0
                            is_best: decoded.is_best,
1597
0
                            scale_encoded_header: decoded.scale_encoded_header.to_owned(), // TODO: somewhat wasteful to copy here, could pass the entire announce
1598
0
                        });
1599
                    }
1600
0
                    Err(error) => {
1601
0
                        inner.log_callback.log(LogLevel::Warn, format!(
1602
0
                            "block-announce-bad-header; peer_id={}; chain={}; hash={}; is_best={:?}; error={}",
1603
0
                            peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded.is_best, error
1604
0
                        ));
1605
0
1606
0
                        if inner.network.gossip_remove_desired(
1607
0
                            chain_id,
1608
0
                            &peer_id,
1609
0
                            service::GossipKind::ConsensusTransactions,
1610
0
                        ) {
1611
0
                            inner.peering_strategy.unassign_slot_and_ban(
1612
0
                                &chain_id,
1613
0
                                &peer_id,
1614
0
                                Instant::now() + Duration::from_secs(10),
1615
0
                            );
1616
0
                            inner.log_callback.log(
1617
0
                                LogLevel::Debug,
1618
0
                                format!(
1619
0
                                    "slot-unassigned; peer_id={}; chain={}; reason=bad-block-announce",
1620
0
                                    peer_id, inner.network[chain_id].log_name
1621
0
                                ),
1622
0
                            );
1623
0
                        }
1624
0
                        let _ = inner.network.gossip_close(
1625
0
                            chain_id,
1626
0
                            &peer_id,
1627
0
                            service::GossipKind::ConsensusTransactions,
1628
0
                        ); // TODO: what is the return value?
1629
                    }
1630
                }
1631
            }
1632
            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
1633
0
                peer_id,
1634
0
                chain_id,
1635
0
                best_number,
1636
0
                best_hash,
1637
0
                ..
1638
0
            }) => {
1639
0
                inner.log_callback.log(
1640
0
                    LogLevel::Debug,
1641
0
                    format!(
1642
0
                        "chain-connected; peer_id={}; chain={}; best_number={}; best_hash={}",
1643
0
                        peer_id,
1644
0
                        inner.network[chain_id].log_name,
1645
0
                        best_number,
1646
0
                        HashDisplay(&best_hash),
1647
0
                    ),
1648
0
                );
1649
0
                debug_assert!(inner.event_pending_send.is_none());
1650
0
                inner.event_pending_send = Some(Event::Connected {
1651
0
                    peer_id,
1652
0
                    chain_id,
1653
0
                    best_block_number: best_number,
1654
0
                    best_block_hash: best_hash,
1655
0
                });
1656
            }
1657
            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
1658
0
                peer_id,
1659
0
                chain_id,
1660
0
                ..
1661
0
            }) => {
1662
0
                inner.log_callback.log(
1663
0
                    LogLevel::Debug,
1664
0
                    format!(
1665
0
                        "chain-disconnected; peer_id={}; chain={}",
1666
0
                        peer_id, inner.network[chain_id].log_name
1667
0
                    ),
1668
0
                );
1669
0
1670
0
                // Note that peer doesn't necessarily have an out slot, as this event
1671
0
                // might happen as a result of an inbound gossip connection.
1672
0
                inner.peering_strategy.unassign_slot_and_ban(
1673
0
                    &chain_id,
1674
0
                    &peer_id,
1675
0
                    Instant::now() + Duration::from_secs(10),
1676
0
                );
1677
0
                if inner.network.gossip_remove_desired(
1678
0
                    chain_id,
1679
0
                    &peer_id,
1680
0
                    service::GossipKind::ConsensusTransactions,
1681
0
                ) {
1682
0
                    inner.log_callback.log(
1683
0
                        LogLevel::Debug,
1684
0
                        format!(
1685
0
                            "slot-unassigned; peer_id={}; chain={}; reason=gossip-disconnected",
1686
0
                            peer_id, inner.network[chain_id].log_name
1687
0
                        ),
1688
0
                    );
1689
0
                }
1690
1691
0
                debug_assert!(inner.event_pending_send.is_none());
1692
0
                inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id });
1693
            }
1694
            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
1695
0
                chain_id,
1696
0
                peer_id,
1697
0
                error,
1698
0
                ..
1699
0
            }) => {
1700
0
                inner.log_callback.log(
1701
0
                    LogLevel::Debug,
1702
0
                    format!(
1703
0
                        "chain-connect-attempt-failed; peer_id={}; chain={}; error={}",
1704
0
                        peer_id, inner.network[chain_id].log_name, error
1705
0
                    ),
1706
0
                );
1707
0
1708
0
                // Note that peer doesn't necessarily have an out slot, as this event
1709
0
                // might happen as a result of an inbound gossip connection.
1710
0
                if inner.network.gossip_remove_desired(
1711
0
                    chain_id,
1712
0
                    &peer_id,
1713
0
                    service::GossipKind::ConsensusTransactions,
1714
0
                ) {
1715
0
                    inner.log_callback.log(
1716
0
                        LogLevel::Debug,
1717
0
                        format!(
1718
0
                            "slot-unassigned; peer_id={}; chain={}; reason=gossip-open-failed",
1719
0
                            peer_id, inner.network[chain_id].log_name
1720
0
                        ),
1721
0
                    );
1722
0
                }
1723
1724
0
                if let service::GossipConnectError::GenesisMismatch { .. } = error {
1725
0
                    inner
1726
0
                        .peering_strategy
1727
0
                        .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id);
1728
0
                } else {
1729
0
                    inner.peering_strategy.unassign_slot_and_ban(
1730
0
                        &chain_id,
1731
0
                        &peer_id,
1732
0
                        Instant::now() + Duration::from_secs(15),
1733
0
                    );
1734
0
                }
1735
            }
1736
            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
1737
0
                chain_id,
1738
0
                peer_id,
1739
0
                kind: service::GossipKind::ConsensusTransactions,
1740
0
            }) => {
1741
0
                // TODO: log this
1742
0
                // The networking state machine guarantees that `GossipInDesired`
1743
0
                // can't happen if we are already opening an out slot, which we do
1744
0
                // immediately.
1745
0
                // TODO: add debug_assert! ^
1746
0
                if inner
1747
0
                    .network
1748
0
                    .opened_gossip_undesired_by_chain(chain_id)
1749
0
                    .count()
1750
0
                    < inner.network[chain_id].max_in_peers
1751
0
                {
1752
0
                    inner
1753
0
                        .network
1754
0
                        .gossip_open(
1755
0
                            chain_id,
1756
0
                            &peer_id,
1757
0
                            service::GossipKind::ConsensusTransactions,
1758
0
                        )
1759
0
                        .unwrap();
1760
0
                } else {
1761
0
                    inner
1762
0
                        .network
1763
0
                        .gossip_close(
1764
0
                            chain_id,
1765
0
                            &peer_id,
1766
0
                            service::GossipKind::ConsensusTransactions,
1767
0
                        )
1768
0
                        .unwrap();
1769
0
                }
1770
            }
1771
            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
1772
                // All `GossipInDesired` are immediately accepted or rejected, meaning
1773
                // that this event can't happen.
1774
0
                unreachable!()
1775
            }
1776
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1777
0
                substream_id,
1778
0
                peer_id,
1779
0
                chain_id,
1780
0
                response: service::RequestResult::Blocks(response),
1781
0
            }) => {
1782
0
                match &response {
1783
0
                    Ok(success) => {
1784
0
                        inner.log_callback.log(
1785
0
                            LogLevel::Debug,
1786
0
                            format!(
1787
0
                                "blocks-request-ended; outcome=success; peer_id={peer_id}; chain={}; response-blocks={}",
1788
0
                                inner.network[chain_id].log_name,
1789
0
                                success.len()
1790
0
                            ),
1791
0
                        );
1792
0
                    }
1793
0
                    Err(err) => {
1794
0
                        inner.log_callback.log(
1795
0
                            LogLevel::Debug,
1796
0
                            format!("blocks-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1797
0
                            inner.network[chain_id].log_name, err),
1798
0
                        );
1799
0
                    }
1800
                }
1801
1802
0
                let _ = inner
1803
0
                    .blocks_requests
1804
0
                    .remove(&substream_id)
1805
0
                    .unwrap()
1806
0
                    .send(response.map_err(BlocksRequestError::Request));
1807
            }
1808
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1809
0
                substream_id,
1810
0
                peer_id,
1811
0
                chain_id,
1812
0
                response: service::RequestResult::GrandpaWarpSync(response),
1813
0
            }) => {
1814
0
                match &response {
1815
0
                    Ok(success) => {
1816
0
                        let decoded = success.decode();
1817
0
                        inner.log_callback.log(
1818
0
                            LogLevel::Debug,
1819
0
                            format!(
1820
0
                                "warp-sync-request-ended; outcome=success; peer_id={peer_id}; chain={}; num-fragments={}; is-finished={:?}",
1821
0
                                inner.network[chain_id].log_name,
1822
0
                                decoded.fragments.len(), decoded.is_finished,
1823
0
                            ),
1824
0
                        );
1825
0
                    }
1826
0
                    Err(err) => {
1827
0
                        inner.log_callback.log(
1828
0
                            LogLevel::Debug,
1829
0
                            format!("warp-sync-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1830
0
                            inner.network[chain_id].log_name, err),
1831
0
                        );
1832
0
                    }
1833
                }
1834
1835
0
                let _ = inner
1836
0
                    .warp_sync_requests
1837
0
                    .remove(&substream_id)
1838
0
                    .unwrap()
1839
0
                    .send(response.map_err(WarpSyncRequestError::Request));
1840
            }
1841
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1842
0
                substream_id,
1843
0
                peer_id,
1844
0
                chain_id,
1845
0
                response: service::RequestResult::StorageProof(response),
1846
0
            }) => {
1847
0
                match &response {
1848
0
                    Ok(success) => {
1849
0
                        inner.log_callback.log(
1850
0
                            LogLevel::Debug,
1851
0
                            format!(
1852
0
                                "storage-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}",
1853
0
                                inner.network[chain_id].log_name,
1854
0
                                BytesDisplay(u64::try_from(success.decode().len()).unwrap()),
1855
0
                            ),
1856
0
                        );
1857
0
                    }
1858
0
                    Err(err) => {
1859
0
                        inner.log_callback.log(
1860
0
                            LogLevel::Debug,
1861
0
                            format!(
1862
0
                                "storage-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1863
0
                                inner.network[chain_id].log_name, err
1864
0
                            ),
1865
0
                        );
1866
0
                    }
1867
                }
1868
1869
0
                let _ = inner
1870
0
                    .storage_requests
1871
0
                    .remove(&substream_id)
1872
0
                    .unwrap()
1873
0
                    .send(response.map_err(|_| ()));
1874
            }
1875
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1876
0
                substream_id,
1877
0
                peer_id,
1878
0
                chain_id,
1879
0
                response: service::RequestResult::CallProof(response),
1880
0
            }) => {
1881
0
                match &response {
1882
0
                    Ok(success) => {
1883
0
                        inner.log_callback.log(
1884
0
                            LogLevel::Debug,
1885
0
                            format!(
1886
0
                                "call-proof-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}",
1887
0
                                inner.network[chain_id].log_name,
1888
0
                                BytesDisplay(u64::try_from(success.decode().len()).unwrap()),
1889
0
                            ),
1890
0
                        );
1891
0
                    }
1892
0
                    Err(err) => {
1893
0
                        inner.log_callback.log(
1894
0
                            LogLevel::Debug,
1895
0
                            format!(
1896
0
                                "call-proof-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}",
1897
0
                                inner.network[chain_id].log_name,
1898
0
                                err
1899
0
                            ),
1900
0
                        );
1901
0
                    }
1902
                }
1903
1904
0
                let _ = inner
1905
0
                    .call_proof_requests
1906
0
                    .remove(&substream_id)
1907
0
                    .unwrap()
1908
0
                    .send(response.map_err(|_| ()));
1909
            }
1910
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1911
0
                peer_id: kademlia_request_target,
1912
0
                chain_id,
1913
0
                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
1914
                ..
1915
            }) => {
1916
0
                for (peer_id, addrs) in nodes {
1917
0
                    let mut valid_addrs = Vec::with_capacity(addrs.len());
1918
0
                    for addr in addrs {
1919
0
                        match Multiaddr::from_bytes(addr) {
1920
0
                            Ok(a) => valid_addrs.push(a),
1921
0
                            Err((error, addr)) => {
1922
0
                                inner.log_callback.log(
1923
0
                                    LogLevel::Debug,
1924
0
                                    format!(
1925
0
                                        "discovery-invalid-address; error={error}, addr={}, discovered_from={kademlia_request_target}",
1926
0
                                        hex::encode(&addr)
1927
0
                                    ),
1928
0
                                );
1929
0
                                continue;
1930
                            }
1931
                        }
1932
                    }
1933
1934
0
                    if !valid_addrs.is_empty() {
1935
                        // Note that we must call this function before `insert_address`,
1936
                        // as documented in `basic_peering_strategy`.
1937
                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
1938
0
                            peer_removed: Some(peer_removed),
1939
0
                        } = inner.peering_strategy.insert_chain_peer(
1940
0
                            chain_id,
1941
0
                            peer_id.clone(),
1942
0
                            100, // TODO: constant
1943
0
                        ) {
1944
0
                            inner.log_callback.log(
1945
0
                                LogLevel::Debug,
1946
0
                                format!(
1947
0
                                    "peer-forgotten; peer_id={}; chain={}",
1948
0
                                    peer_removed, inner.network[chain_id].log_name
1949
0
                                ),
1950
0
                            );
1951
0
                        }
1952
0
                    }
1953
1954
0
                    for addr in valid_addrs {
1955
0
                        inner.log_callback.log(
1956
0
                            LogLevel::Debug,
1957
0
                            format!(
1958
0
                                "discovered; chain={}; peer_id={peer_id}; address={addr}; discovered_from={kademlia_request_target}",
1959
0
                                inner.network[chain_id].log_name
1960
0
                            ),
1961
0
                        );
1962
0
1963
0
                        match inner
1964
0
                            .peering_strategy
1965
0
                             .insert_address(&peer_id, addr.into_bytes(), 10) // TODO: constant
1966
                            {
1967
0
                                basic_peering_strategy::InsertAddressResult::Inserted { address_removed: Some(addr_rm) } => {
1968
0
                                    let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap();
1969
0
                                    inner
1970
0
                                        .log_callback
1971
0
                                        .log(LogLevel::Debug, format!("address-purged; peer_id={}; address={}", peer_id, addr_rm));
1972
0
                                }
1973
0
                                basic_peering_strategy::InsertAddressResult::UnknownPeer => unreachable!(),
1974
0
                                _ => {}
1975
                            }
1976
                    }
1977
                }
1978
            }
1979
            WakeUpReason::NetworkEvent(service::Event::RequestResult {
1980
0
                peer_id,
1981
0
                chain_id,
1982
0
                response: service::RequestResult::KademliaFindNode(Err(error)),
1983
0
                ..
1984
0
            }) => {
1985
0
                inner.log_callback.log(
1986
0
                    LogLevel::Debug,
1987
0
                    format!(
1988
0
                        "discovery-error; chain={}; peer_id={peer_id}; error={error}",
1989
0
                        inner.network[chain_id].log_name
1990
0
                    ),
1991
0
                );
1992
0
            }
1993
            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
1994
                // We never start a request of any other kind.
1995
0
                unreachable!()
1996
            }
1997
            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
1998
                // Requests are answered immediately, and thus cancelling events can't happen.
1999
0
                unreachable!()
2000
            }
2001
            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2002
0
                peer_id,
2003
0
                substream_id,
2004
0
            }) => {
2005
0
                inner.log_callback.log(
2006
0
                    LogLevel::Debug,
2007
0
                    format!("identify-request; peer_id={}", peer_id),
2008
0
                );
2009
0
                inner
2010
0
                    .network
2011
0
                    .respond_identify(substream_id, &inner.identify_agent_version);
2012
0
            }
2013
            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn {
2014
0
                peer_id,
2015
0
                chain_id,
2016
0
                config,
2017
0
                substream_id,
2018
0
            }) => {
2019
0
                inner.log_callback.log(
2020
0
                    LogLevel::Debug,
2021
0
                    format!(
2022
0
                        "incoming-blocks-request; peer_id={}; chain={}",
2023
0
                        peer_id, inner.network[chain_id].log_name
2024
0
                    ),
2025
0
                );
2026
0
                let mut _jaeger_span = inner.jaeger_service.incoming_block_request_span(
2027
0
                    &inner.local_peer_id,
2028
0
                    &peer_id,
2029
0
                    config.desired_count.get(),
2030
0
                    if let (1, codec::BlocksRequestConfigStart::Hash(block_hash)) =
2031
0
                        (config.desired_count.get(), &config.start)
2032
                    {
2033
0
                        Some(block_hash)
2034
                    } else {
2035
0
                        None
2036
                    },
2037
                );
2038
2039
                // TODO: is it a good idea to await here while the lock is held and freezing the entire networking background task?
2040
0
                let response = blocks_request_response(
2041
0
                    &inner.network[chain_id].database,
2042
0
                    inner.network.block_number_bytes(chain_id),
2043
0
                    config,
2044
0
                )
2045
0
                .await;
2046
0
                inner.network.respond_blocks(
2047
0
                    substream_id,
2048
0
                    match response {
2049
0
                        Ok(b) => Some(b),
2050
0
                        Err(error) => {
2051
0
                            inner.log_callback.log(
2052
0
                                LogLevel::Warn,
2053
0
                                format!("incoming-blocks-request-error; error={}", error),
2054
0
                            );
2055
0
                            None
2056
                        }
2057
                    },
2058
                );
2059
            }
2060
            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2061
0
                chain_id,
2062
0
                peer_id,
2063
0
                state,
2064
0
            }) => {
2065
0
                inner.log_callback.log(LogLevel::Debug, format!(
2066
0
                    "grandpa-neighbor-packet; peer_id={}; chain={}; round_number={}; set_id={}; commit_finalized_height={}",
2067
0
                    peer_id,
2068
0
                    inner.network[chain_id].log_name,
2069
0
                    state.round_number,
2070
0
                    state.set_id,
2071
0
                    state.commit_finalized_height,
2072
0
                ));
2073
0
2074
0
                debug_assert!(inner.event_pending_send.is_none());
2075
0
                inner.event_pending_send = Some(Event::GrandpaNeighborPacket {
2076
0
                    chain_id,
2077
0
                    peer_id,
2078
0
                    finalized_block_height: state.commit_finalized_height,
2079
0
                });
2080
            }
2081
            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2082
0
                chain_id,
2083
0
                peer_id,
2084
0
                message,
2085
0
            }) => {
2086
0
                inner.log_callback.log(
2087
0
                    LogLevel::Debug,
2088
0
                    format!(
2089
0
                        "grandpa-commit-message; peer_id={}; chain={}; target_hash={}",
2090
0
                        peer_id,
2091
0
                        inner.network[chain_id].log_name,
2092
0
                        HashDisplay(message.decode().target_hash),
2093
0
                    ),
2094
0
                );
2095
0
            }
2096
0
            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2097
0
                inner.log_callback.log(
2098
0
                    LogLevel::Warn,
2099
0
                    format!("protocol-error; peer_id={}; error={}", peer_id, error),
2100
0
                );
2101
0
                inner
2102
0
                    .peering_strategy
2103
0
                    .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(5));
2104
0
                // TODO: log chain names?
2105
0
                inner.log_callback.log(
2106
0
                    LogLevel::Debug,
2107
0
                    format!(
2108
0
                        "all-slots-unassigned; reason=no-address; peer_id={}",
2109
0
                        peer_id
2110
0
                    ),
2111
0
                );
2112
0
            }
2113
2114
0
            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2115
0
                inner.peering_strategy.assign_slot(&chain_id, &peer_id);
2116
0
2117
0
                inner.log_callback.log(
2118
0
                    LogLevel::Debug,
2119
0
                    format!(
2120
0
                        "slot-assigned; peer_id={}; chain={}",
2121
0
                        peer_id, inner.network[chain_id].log_name
2122
0
                    ),
2123
0
                );
2124
0
2125
0
                inner.network.gossip_insert_desired(
2126
0
                    chain_id,
2127
0
                    peer_id,
2128
0
                    service::GossipKind::ConsensusTransactions,
2129
0
                );
2130
0
            }
2131
2132
0
            WakeUpReason::CanStartConnect(peer_id) => {
2133
0
                inner.num_pending_out_attempts += 1;
2134
2135
0
                let Some(multiaddr) = inner
2136
0
                    .peering_strategy
2137
0
                    .pick_address_and_add_connection(&peer_id)
2138
                else {
2139
                    // There is no address for that peer in the address book.
2140
0
                    inner.network.gossip_remove_desired_all(
2141
0
                        &peer_id,
2142
0
                        service::GossipKind::ConsensusTransactions,
2143
0
                    );
2144
0
                    for (chain_id, what_happened) in inner
2145
0
                        .peering_strategy
2146
0
                        .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(10))
2147
                    {
2148
0
                        if matches!(
2149
0
                            what_happened,
2150
                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2151
0
                        ) {
2152
0
                            inner.log_callback.log(
2153
0
                                LogLevel::Debug,
2154
0
                                format!(
2155
0
                                    "slot-unassigned; peer_id={}; chain={}; reason=no-address",
2156
0
                                    peer_id, inner.network[*chain_id].log_name
2157
0
                                ),
2158
0
                            );
2159
0
                        }
2160
                    }
2161
0
                    continue;
2162
                };
2163
2164
0
                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2165
0
                    Ok(a) => a,
2166
0
                    Err((multiaddr::FromBytesError, multiaddr)) => {
2167
0
                        // Address is in an invalid format.
2168
0
                        inner.log_callback.log(
2169
0
                            LogLevel::Debug,
2170
0
                            format!(
2171
0
                                "invalid-address; peer_id={}; address={:?}",
2172
0
                                peer_id, multiaddr
2173
0
                            ),
2174
0
                        );
2175
0
                        let _was_in = inner
2176
0
                            .peering_strategy
2177
0
                            .decrease_address_connections_and_remove_if_zero(&peer_id, &multiaddr);
2178
0
                        debug_assert!(_was_in.is_ok());
2179
0
                        continue;
2180
                    }
2181
                };
2182
2183
                // Convert the `multiaddr` (typically of the form `/ip4/a.b.c.d/tcp/d`) into
2184
                // a `Future<dyn Output = Result<TcpStream, ...>>`.
2185
0
                let socket = match tasks::multiaddr_to_socket(&multiaddr) {
2186
0
                    Ok(socket) => socket,
2187
                    Err(_) => {
2188
                        // Address is in an invalid format or isn't supported.
2189
0
                        inner.log_callback.log(
2190
0
                            LogLevel::Debug,
2191
0
                            format!(
2192
0
                                "invalid-address; peer_id={}; address={}",
2193
0
                                peer_id, multiaddr
2194
0
                            ),
2195
0
                        );
2196
0
                        let _was_in = inner
2197
0
                            .peering_strategy
2198
0
                            .decrease_address_connections_and_remove_if_zero(
2199
0
                                &peer_id,
2200
0
                                multiaddr.as_ref(),
2201
0
                            );
2202
0
                        debug_assert!(_was_in.is_ok());
2203
0
                        continue;
2204
                    }
2205
                };
2206
2207
0
                inner.log_callback.log(
2208
0
                    LogLevel::Debug,
2209
0
                    format!("start-connecting; peer_id={peer_id}; address={multiaddr}"),
2210
0
                );
2211
0
2212
0
                let (tx, rx) = channel::bounded(16); // TODO: ?!
2213
0
2214
0
                let (connection_id, connection_task) = inner.network.add_single_stream_connection(
2215
0
                    Instant::now(),
2216
0
                    service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2217
0
                        is_initiator: true,
2218
0
                        noise_key: &inner.noise_key,
2219
0
                    },
2220
0
                    multiaddr.clone().into_bytes(),
2221
0
                    Some(peer_id.clone()),
2222
0
                    tx,
2223
0
                );
2224
0
2225
0
                // Handle the connection in a separate task.
2226
0
                (inner.tasks_executor)(Box::pin(tasks::connection_task(
2227
0
                    inner.log_callback.clone(),
2228
0
                    multiaddr.to_string(),
2229
0
                    socket,
2230
0
                    connection_id,
2231
0
                    connection_task,
2232
0
                    rx,
2233
0
                    inner.from_connections_tx.clone(),
2234
0
                )));
2235
            }
2236
2237
0
            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2238
0
                inner
2239
0
                    .network
2240
0
                    .gossip_open(
2241
0
                        chain_id,
2242
0
                        &peer_id,
2243
0
                        service::GossipKind::ConsensusTransactions,
2244
0
                    )
2245
0
                    .unwrap();
2246
0
2247
0
                inner.log_callback.log(
2248
0
                    LogLevel::Debug,
2249
0
                    format!(
2250
0
                        "gossip-open; peer_id={}; chain={}",
2251
0
                        peer_id, &inner.network[chain_id].log_name
2252
0
                    ),
2253
0
                );
2254
0
            }
2255
        }
2256
    }
2257
0
}
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0B5_
2258
2259
/// Builds the response to a block request by reading from the given database.
2260
0
async fn blocks_request_response(
2261
0
    database: &database_thread::DatabaseThread,
2262
0
    block_number_bytes: usize,
2263
0
    config: codec::BlocksRequestConfig,
2264
0
) -> Result<Vec<codec::BlockData>, full_sqlite::CorruptedError> {
Unexecuted instantiation: _RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response
2265
0
    database
2266
0
        .with_database(move |database| {
2267
0
            let num_blocks = cmp::min(
2268
0
                usize::try_from(config.desired_count.get()).unwrap_or(usize::MAX),
2269
0
                128,
2270
0
            );
2271
0
2272
0
            let mut output = Vec::with_capacity(num_blocks);
2273
0
            let mut next_block = config.start;
2274
2275
            loop {
2276
0
                if output.len() >= num_blocks {
2277
0
                    break;
2278
0
                }
2279
2280
0
                let hash = match next_block {
2281
0
                    codec::BlocksRequestConfigStart::Hash(hash) => hash,
2282
0
                    codec::BlocksRequestConfigStart::Number(number) => {
2283
0
                        // TODO: naive block selection ; should choose the best chain instead
2284
0
                        match database.block_hash_by_number(number)?.next() {
2285
0
                            Some(h) => h,
2286
0
                            None => break,
2287
                        }
2288
                    }
2289
                };
2290
2291
0
                let header = match database.block_scale_encoded_header(&hash)? {
2292
0
                    Some(h) => h,
2293
0
                    None => break,
2294
                };
2295
2296
                next_block = {
2297
0
                    let decoded = header::decode(&header, block_number_bytes).unwrap();
2298
0
                    match config.direction {
2299
                        codec::BlocksRequestDirection::Ascending => {
2300
                            // TODO: right now, since we don't necessarily pick the best chain in `block_hash_by_number`, it is possible that the next block doesn't have the current block as parent
2301
0
                            codec::BlocksRequestConfigStart::Number(decoded.number + 1)
2302
                        }
2303
                        codec::BlocksRequestDirection::Descending => {
2304
0
                            codec::BlocksRequestConfigStart::Hash(*decoded.parent_hash)
2305
                        }
2306
                    }
2307
                };
2308
2309
0
                output.push(codec::BlockData {
2310
0
                    hash,
2311
0
                    header: if config.fields.header {
2312
0
                        Some(header)
2313
                    } else {
2314
0
                        None
2315
                    },
2316
0
                    body: if config.fields.body {
2317
0
                        Some(match database.block_extrinsics(&hash)? {
2318
0
                            Some(body) => body.collect(),
2319
0
                            None => break,
2320
                        })
2321
                    } else {
2322
0
                        None
2323
                    },
2324
0
                    justifications: if config.fields.justifications {
2325
                        // TODO: justifications aren't saved in database at the moment
2326
0
                        Some(Vec::new())
2327
                    } else {
2328
0
                        None
2329
                    },
2330
                });
2331
            }
2332
2333
0
            Ok(output)
2334
0
        })
Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response00B7_
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response00B7_
2335
0
        .await
2336
0
}
Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response0B5_
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response0B5_