smoldot_light/
network_service.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Background network service.
19//!
20//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21//! Importantly, its design is oriented towards the particular use case of the light client.
22//!
23//! The [`NetworkService`] spawns one background task (using [`PlatformRef::spawn_task`]) for
24//! each active connection.
25//!
26//! The objective of the [`NetworkService`] in general is to try stay connected as much as
27//! possible to the nodes of the peer-to-peer network of the chain, and maintain open substreams
28//! with them in order to send out requests (e.g. block requests) and notifications (e.g. block
29//! announces).
30//!
31//! Connectivity to the network is performed in the background as an implementation detail of
32//! the service. The public API only allows emitting requests and notifications towards the
33//! already-connected nodes.
34//!
35//! After a [`NetworkService`] is created, one can add chains using [`NetworkService::add_chain`].
36//! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically
37//! purged.
38//!
39//! An important part of the API is the list of channel receivers of [`Event`] returned by
40//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the
41//! network connectivity.
42
43use crate::{
44    log,
45    platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49    borrow::ToOwned as _,
50    boxed::Box,
51    collections::BTreeMap,
52    format,
53    string::{String, ToString as _},
54    sync::Arc,
55    vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65    header,
66    informant::{BytesDisplay, HashDisplay},
67    libp2p::{
68        connection,
69        multiaddr::{self, Multiaddr},
70        peer_id,
71    },
72    network::{basic_peering_strategy, codec, service},
73};
74
75pub use codec::{CallProofRequestConfig, Role};
76pub use service::{ChainId, EncodedMerkleProof, PeerId, QueueNotificationError};
77
78mod tasks;
79
80/// Configuration for a [`NetworkService`].
81pub struct Config<TPlat> {
82    /// Access to the platform's capabilities.
83    pub platform: TPlat,
84
85    /// Value sent back for the agent version when receiving an identification request.
86    pub identify_agent_version: String,
87
88    /// Capacity to allocate for the list of chains.
89    pub chains_capacity: usize,
90
91    /// Maximum number of connections that the service can open simultaneously. After this value
92    /// has been reached, a new connection can be opened after each
93    /// [`Config::connections_open_pool_restore_delay`].
94    pub connections_open_pool_size: u32,
95
96    /// Delay after which the service can open a new connection.
97    /// The delay is cumulative. If no connection has been opened for example for twice this
98    /// duration, then two connections can be opened at the same time, up to a maximum of
99    /// [`Config::connections_open_pool_size`].
100    pub connections_open_pool_restore_delay: Duration,
101}
102
103/// See [`NetworkService::add_chain`].
104///
105/// Note that this configuration is intentionally missing a field containing the bootstrap
106/// nodes of the chain. Bootstrap nodes are supposed to be added afterwards by calling
107/// [`NetworkServiceChain::discover`].
108pub struct ConfigChain {
109    /// Name of the chain, for logging purposes.
110    pub log_name: String,
111
112    /// Number of "out slots" of this chain. We establish simultaneously gossip substreams up to
113    /// this number of peers.
114    pub num_out_slots: usize,
115
116    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
117    /// the chains match.
118    ///
119    /// > **Note**: Be aware that this *must* be the *genesis* block, not any block known to be
120    /// >           in the chain.
121    pub genesis_block_hash: [u8; 32],
122
123    /// Number and hash of the current best block. Can later be updated with
124    /// [`NetworkServiceChain::set_local_best_block`].
125    pub best_block: (u64, [u8; 32]),
126
127    /// Optional identifier to insert into the networking protocol names. Used to differentiate
128    /// between chains with the same genesis hash.
129    pub fork_id: Option<String>,
130
131    /// Number of bytes of the block number in the networking protocol.
132    pub block_number_bytes: usize,
133
134    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
135    /// number of the finalized block at the time of the initialization.
136    pub grandpa_protocol_finalized_block_height: Option<u64>,
137}
138
139pub struct NetworkService<TPlat: PlatformRef> {
140    /// Channel connected to the background service.
141    messages_tx: async_channel::Sender<ToBackground<TPlat>>,
142
143    /// See [`Config::platform`].
144    platform: TPlat,
145}
146
147impl<TPlat: PlatformRef> NetworkService<TPlat> {
148    /// Initializes the network service with the given configuration.
149    pub fn new(config: Config<TPlat>) -> Arc<Self> {
150        let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
151
152        let network = service::ChainNetwork::new(service::Config {
153            chains_capacity: config.chains_capacity,
154            connections_capacity: 32,
155            handshake_timeout: Duration::from_secs(8),
156            randomness_seed: {
157                let mut seed = [0; 32];
158                config.platform.fill_random_bytes(&mut seed);
159                seed
160            },
161        });
162
163        // Spawn main task that processes the network service.
164        let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
165        let task = Box::pin(background_task(BackgroundTask {
166            randomness: rand_chacha::ChaCha20Rng::from_seed({
167                let mut seed = [0; 32];
168                config.platform.fill_random_bytes(&mut seed);
169                seed
170            }),
171            identify_agent_version: config.identify_agent_version,
172            tasks_messages_tx,
173            tasks_messages_rx: Box::pin(tasks_messages_rx),
174            peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
175                basic_peering_strategy::Config {
176                    randomness_seed: {
177                        let mut seed = [0; 32];
178                        config.platform.fill_random_bytes(&mut seed);
179                        seed
180                    },
181                    peers_capacity: 50, // TODO: ?
182                    chains_capacity: config.chains_capacity,
183                },
184            ),
185            network,
186            connections_open_pool_size: config.connections_open_pool_size,
187            connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
188            num_recent_connection_opening: 0,
189            next_recent_connection_restore: None,
190            platform: config.platform.clone(),
191            open_gossip_links: BTreeMap::new(),
192            event_pending_send: None,
193            event_senders: either::Left(Vec::new()),
194            pending_new_subscriptions: Vec::new(),
195            important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
196            main_messages_rx: Box::pin(main_messages_rx),
197            messages_rx: stream::SelectAll::new(),
198            blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
199            grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
200            storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
201            call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
202            chains_by_next_discovery: BTreeMap::new(),
203        }));
204
205        config.platform.spawn_task("network-service".into(), {
206            let platform = config.platform.clone();
207            async move {
208                task.await;
209                log!(&platform, Debug, "network", "shutdown");
210            }
211        });
212
213        Arc::new(NetworkService {
214            messages_tx: main_messages_tx,
215            platform: config.platform,
216        })
217    }
218
219    /// Adds a chain to the list of chains that the network service connects to.
220    ///
221    /// Returns an object representing the chain and that allows interacting with it. If all
222    /// references to [`NetworkServiceChain`] are destroyed, the network service automatically
223    /// purges that chain.
224    pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
225        let (messages_tx, messages_rx) = async_channel::bounded(32);
226
227        // TODO: this code is hacky because we don't want to make `add_chain` async at the moment, because it's not convenient for lib.rs
228        self.platform.spawn_task("add-chain-message-send".into(), {
229            let config = service::ChainConfig {
230                grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
231                    |commit_finalized_height| service::GrandpaState {
232                        commit_finalized_height,
233                        round_number: 1,
234                        set_id: 0,
235                    },
236                ),
237                fork_id: config.fork_id.clone(),
238                block_number_bytes: config.block_number_bytes,
239                best_hash: config.best_block.1,
240                best_number: config.best_block.0,
241                genesis_hash: config.genesis_block_hash,
242                role: Role::Light,
243                allow_inbound_block_requests: false,
244                user_data: Chain {
245                    log_name: config.log_name,
246                    block_number_bytes: config.block_number_bytes,
247                    num_out_slots: config.num_out_slots,
248                    num_references: NonZero::<usize>::new(1).unwrap(),
249                    next_discovery_period: Duration::from_secs(2),
250                    next_discovery_when: self.platform.now(),
251                },
252            };
253
254            let messages_tx = self.messages_tx.clone();
255            async move {
256                let _ = messages_tx
257                    .send(ToBackground::AddChain {
258                        messages_rx,
259                        config,
260                    })
261                    .await;
262            }
263        });
264
265        Arc::new(NetworkServiceChain {
266            _keep_alive_messages_tx: self.messages_tx.clone(),
267            messages_tx,
268            marker: core::marker::PhantomData,
269        })
270    }
271}
272
273pub struct NetworkServiceChain<TPlat: PlatformRef> {
274    /// Copy of [`NetworkService::messages_tx`]. Used in order to maintain the network service
275    /// background task alive.
276    _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
277
278    /// Channel to send messages to the background task.
279    messages_tx: async_channel::Sender<ToBackgroundChain>,
280
281    /// Dummy to hold the `TPlat` type.
282    marker: core::marker::PhantomData<TPlat>,
283}
284
285/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`].
286#[derive(Debug, Copy, Clone, PartialEq, Eq)]
287pub enum BanSeverity {
288    Low,
289    High,
290}
291
292impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
293    /// Subscribes to the networking events that happen on the given chain.
294    ///
295    /// Calling this function returns a `Receiver` that receives events about the chain.
296    /// The new channel will immediately receive events about all the existing connections, so
297    /// that it is able to maintain a coherent view of the network.
298    ///
299    /// Note that this function is `async`, but it should return very quickly.
300    ///
301    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
302    /// connections will be back-pressured until the channel isn't full anymore.
303    ///
304    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
305    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
306    /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`.
307    ///
308    /// # Panic
309    ///
310    /// Panics if the given [`ChainId`] is invalid.
311    ///
312    // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour
313    pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
314        let (tx, rx) = async_channel::bounded(128);
315
316        self.messages_tx
317            .send(ToBackgroundChain::Subscribe { sender: tx })
318            .await
319            .unwrap();
320
321        rx
322    }
323
324    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
325    /// generated. Prevents a new gossip link with the same peer from being reopened for a
326    /// little while.
327    ///
328    /// `reason` is a human-readable string printed in the logs.
329    ///
330    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
331    /// reconnection was already happening as the call to this function is still being processed.
332    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
333    /// words, this function guarantees that we will be disconnected in the future rather than
334    /// guarantees that we will disconnect.
335    pub async fn ban_and_disconnect(
336        &self,
337        peer_id: PeerId,
338        severity: BanSeverity,
339        reason: &'static str,
340    ) {
341        let _ = self
342            .messages_tx
343            .send(ToBackgroundChain::DisconnectAndBan {
344                peer_id,
345                severity,
346                reason,
347            })
348            .await;
349    }
350
351    /// Sends a blocks request to the given peer.
352    // TODO: more docs
353    pub async fn blocks_request(
354        self: Arc<Self>,
355        target: PeerId,
356        config: codec::BlocksRequestConfig,
357        timeout: Duration,
358    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
359        let (tx, rx) = oneshot::channel();
360
361        self.messages_tx
362            .send(ToBackgroundChain::StartBlocksRequest {
363                target: target.clone(),
364                config,
365                timeout,
366                result: tx,
367            })
368            .await
369            .unwrap();
370
371        rx.await.unwrap()
372    }
373
374    /// Sends a grandpa warp sync request to the given peer.
375    // TODO: more docs
376    pub async fn grandpa_warp_sync_request(
377        self: Arc<Self>,
378        target: PeerId,
379        begin_hash: [u8; 32],
380        timeout: Duration,
381    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
382        let (tx, rx) = oneshot::channel();
383
384        self.messages_tx
385            .send(ToBackgroundChain::StartWarpSyncRequest {
386                target: target.clone(),
387                begin_hash,
388                timeout,
389                result: tx,
390            })
391            .await
392            .unwrap();
393
394        rx.await.unwrap()
395    }
396
397    pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
398        self.messages_tx
399            .send(ToBackgroundChain::SetLocalBestBlock {
400                best_hash,
401                best_number,
402            })
403            .await
404            .unwrap();
405    }
406
407    pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
408        self.messages_tx
409            .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
410            .await
411            .unwrap();
412    }
413
414    /// Sends a storage proof request to the given peer.
415    // TODO: more docs
416    pub async fn storage_proof_request(
417        self: Arc<Self>,
418        target: PeerId, // TODO: takes by value because of futures longevity issue
419        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
420        timeout: Duration,
421    ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
422        let (tx, rx) = oneshot::channel();
423
424        self.messages_tx
425            .send(ToBackgroundChain::StartStorageProofRequest {
426                target: target.clone(),
427                config: codec::StorageProofRequestConfig {
428                    block_hash: config.block_hash,
429                    keys: config
430                        .keys
431                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
432                        .collect::<Vec<_>>()
433                        .into_iter(),
434                },
435                timeout,
436                result: tx,
437            })
438            .await
439            .unwrap();
440
441        rx.await.unwrap()
442    }
443
444    /// Sends a call proof request to the given peer.
445    ///
446    /// See also [`NetworkServiceChain::call_proof_request`].
447    // TODO: more docs
448    pub async fn call_proof_request(
449        self: Arc<Self>,
450        target: PeerId, // TODO: takes by value because of futures longevity issue
451        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
452        timeout: Duration,
453    ) -> Result<EncodedMerkleProof, CallProofRequestError> {
454        let (tx, rx) = oneshot::channel();
455
456        self.messages_tx
457            .send(ToBackgroundChain::StartCallProofRequest {
458                target: target.clone(),
459                config: codec::CallProofRequestConfig {
460                    block_hash: config.block_hash,
461                    method: config.method.into_owned().into(),
462                    parameter_vectored: config
463                        .parameter_vectored
464                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
465                        .collect::<Vec<_>>()
466                        .into_iter(),
467                },
468                timeout,
469                result: tx,
470            })
471            .await
472            .unwrap();
473
474        rx.await.unwrap()
475    }
476
477    /// Announces transaction to the peers we are connected to.
478    ///
479    /// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
480    /// if we didn't send the transaction to any peer.
481    ///
482    /// Note that the remote doesn't confirm that it has received the transaction. Because
483    /// networking is inherently unreliable, successfully sending a transaction to a peer doesn't
484    /// necessarily mean that the remote has received it. In practice, however, the likelihood of
485    /// a transaction not being received are extremely low. This can be considered as known flaw.
486    pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
487        let (tx, rx) = oneshot::channel();
488
489        self.messages_tx
490            .send(ToBackgroundChain::AnnounceTransaction {
491                transaction: transaction.to_vec(), // TODO: ovheread
492                result: tx,
493            })
494            .await
495            .unwrap();
496
497        rx.await.unwrap()
498    }
499
500    /// See [`service::ChainNetwork::gossip_send_block_announce`].
501    pub async fn send_block_announce(
502        self: Arc<Self>,
503        target: &PeerId,
504        scale_encoded_header: &[u8],
505        is_best: bool,
506    ) -> Result<(), QueueNotificationError> {
507        let (tx, rx) = oneshot::channel();
508
509        self.messages_tx
510            .send(ToBackgroundChain::SendBlockAnnounce {
511                target: target.clone(),                              // TODO: overhead
512                scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead
513                is_best,
514                result: tx,
515            })
516            .await
517            .unwrap();
518
519        rx.await.unwrap()
520    }
521
522    /// Marks the given peers as belonging to the given chain, and adds some addresses to these
523    /// peers to the address book.
524    ///
525    /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy
526    /// and should have additional logging.
527    pub async fn discover(
528        &self,
529        list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
530        important_nodes: bool,
531    ) {
532        self.messages_tx
533            .send(ToBackgroundChain::Discover {
534                // TODO: overhead
535                list: list
536                    .into_iter()
537                    .map(|(peer_id, addrs)| {
538                        (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
539                    })
540                    .collect::<Vec<_>>()
541                    .into_iter(),
542                important_nodes,
543            })
544            .await
545            .unwrap();
546    }
547
548    /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
549    /// the network.
550    ///
551    /// Nodes that are discovered might disappear over time. In other words, there is no guarantee
552    /// that a node that has been added through [`NetworkServiceChain::discover`] will later be
553    /// returned by [`NetworkServiceChain::discovered_nodes`].
554    pub async fn discovered_nodes(
555        &self,
556    ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
557        let (tx, rx) = oneshot::channel();
558
559        self.messages_tx
560            .send(ToBackgroundChain::DiscoveredNodes { result: tx })
561            .await
562            .unwrap();
563
564        rx.await
565            .unwrap()
566            .into_iter()
567            .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
568    }
569
570    /// Returns an iterator to the list of [`PeerId`]s that we have an established connection
571    /// with.
572    pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
573        let (tx, rx) = oneshot::channel();
574        self.messages_tx
575            .send(ToBackgroundChain::PeersList { result: tx })
576            .await
577            .unwrap();
578        rx.await.unwrap().into_iter()
579    }
580}
581
582/// Event that can happen on the network service.
583#[derive(Debug, Clone)]
584pub enum Event {
585    Connected {
586        peer_id: PeerId,
587        role: Role,
588        best_block_number: u64,
589        best_block_hash: [u8; 32],
590    },
591    Disconnected {
592        peer_id: PeerId,
593    },
594    BlockAnnounce {
595        peer_id: PeerId,
596        announce: service::EncodedBlockAnnounce,
597    },
598    GrandpaNeighborPacket {
599        peer_id: PeerId,
600        finalized_block_height: u64,
601    },
602    /// Received a GrandPa commit message from the network.
603    GrandpaCommitMessage {
604        peer_id: PeerId,
605        message: service::EncodedGrandpaCommitMessage,
606    },
607}
608
609/// Error returned by [`NetworkServiceChain::blocks_request`].
610#[derive(Debug, derive_more::Display, derive_more::Error)]
611pub enum BlocksRequestError {
612    /// No established connection with the target.
613    NoConnection,
614    /// Error during the request.
615    #[display("{_0}")]
616    Request(service::BlocksRequestError),
617}
618
619/// Error returned by [`NetworkServiceChain::grandpa_warp_sync_request`].
620#[derive(Debug, derive_more::Display, derive_more::Error)]
621pub enum WarpSyncRequestError {
622    /// No established connection with the target.
623    NoConnection,
624    /// Error during the request.
625    #[display("{_0}")]
626    Request(service::GrandpaWarpSyncRequestError),
627}
628
629/// Error returned by [`NetworkServiceChain::storage_proof_request`].
630#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
631pub enum StorageProofRequestError {
632    /// No established connection with the target.
633    NoConnection,
634    /// Storage proof request is too large and can't be sent.
635    RequestTooLarge,
636    /// Error during the request.
637    #[display("{_0}")]
638    Request(service::StorageProofRequestError),
639}
640
641/// Error returned by [`NetworkServiceChain::call_proof_request`].
642#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
643pub enum CallProofRequestError {
644    /// No established connection with the target.
645    NoConnection,
646    /// Call proof request is too large and can't be sent.
647    RequestTooLarge,
648    /// Error during the request.
649    #[display("{_0}")]
650    Request(service::CallProofRequestError),
651}
652
653impl CallProofRequestError {
654    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
655    /// issue.
656    pub fn is_network_problem(&self) -> bool {
657        match self {
658            CallProofRequestError::Request(err) => err.is_network_problem(),
659            CallProofRequestError::RequestTooLarge => false,
660            CallProofRequestError::NoConnection => true,
661        }
662    }
663}
664
665enum ToBackground<TPlat: PlatformRef> {
666    AddChain {
667        messages_rx: async_channel::Receiver<ToBackgroundChain>,
668        config: service::ChainConfig<Chain<TPlat>>,
669    },
670}
671
672enum ToBackgroundChain {
673    RemoveChain,
674    Subscribe {
675        sender: async_channel::Sender<Event>,
676    },
677    DisconnectAndBan {
678        peer_id: PeerId,
679        severity: BanSeverity,
680        reason: &'static str,
681    },
682    // TODO: serialize the request before sending over channel
683    StartBlocksRequest {
684        target: PeerId, // TODO: takes by value because of future longevity issue
685        config: codec::BlocksRequestConfig,
686        timeout: Duration,
687        result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
688    },
689    // TODO: serialize the request before sending over channel
690    StartWarpSyncRequest {
691        target: PeerId,
692        begin_hash: [u8; 32],
693        timeout: Duration,
694        result:
695            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
696    },
697    // TODO: serialize the request before sending over channel
698    StartStorageProofRequest {
699        target: PeerId,
700        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
701        timeout: Duration,
702        result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
703    },
704    // TODO: serialize the request before sending over channel
705    StartCallProofRequest {
706        target: PeerId, // TODO: takes by value because of futures longevity issue
707        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
708        timeout: Duration,
709        result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
710    },
711    SetLocalBestBlock {
712        best_hash: [u8; 32],
713        best_number: u64,
714    },
715    SetLocalGrandpaState {
716        grandpa_state: service::GrandpaState,
717    },
718    AnnounceTransaction {
719        transaction: Vec<u8>,
720        result: oneshot::Sender<Vec<PeerId>>,
721    },
722    SendBlockAnnounce {
723        target: PeerId,
724        scale_encoded_header: Vec<u8>,
725        is_best: bool,
726        result: oneshot::Sender<Result<(), QueueNotificationError>>,
727    },
728    Discover {
729        list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
730        important_nodes: bool,
731    },
732    DiscoveredNodes {
733        result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
734    },
735    PeersList {
736        result: oneshot::Sender<Vec<PeerId>>,
737    },
738}
739
740struct BackgroundTask<TPlat: PlatformRef> {
741    /// See [`Config::platform`].
742    platform: TPlat,
743
744    /// Random number generator.
745    randomness: rand_chacha::ChaCha20Rng,
746
747    /// Value provided through [`Config::identify_agent_version`].
748    identify_agent_version: String,
749
750    /// Channel to send messages to the background task.
751    tasks_messages_tx:
752        async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
753
754    /// Channel to receive messages destined to the background task.
755    tasks_messages_rx: Pin<
756        Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
757    >,
758
759    /// Data structure holding the entire state of the networking.
760    network: service::ChainNetwork<
761        Chain<TPlat>,
762        async_channel::Sender<service::CoordinatorToConnection>,
763        TPlat::Instant,
764    >,
765
766    /// All known peers and their addresses.
767    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
768
769    /// See [`Config::connections_open_pool_size`].
770    connections_open_pool_size: u32,
771
772    /// See [`Config::connections_open_pool_restore_delay`].
773    connections_open_pool_restore_delay: Duration,
774
775    /// Every time a connection is opened, the value in this field is increased by one. After
776    /// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
777    /// one.
778    num_recent_connection_opening: u32,
779
780    /// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
781    next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
782
783    /// List of all open gossip links.
784    // TODO: using this data structure unfortunately means that PeerIds are cloned a lot, maybe some user data in ChainNetwork is better? not sure
785    open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
786
787    /// List of nodes that are considered as important for logging purposes.
788    // TODO: should also detect whenever we fail to open a block announces substream with any of these peers
789    important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
790
791    /// Event about to be sent on the senders of [`BackgroundTask::event_senders`].
792    event_pending_send: Option<(ChainId, Event)>,
793
794    /// Sending events through the public API.
795    ///
796    /// Contains either senders, or a `Future` that is currently sending an event and will yield
797    /// the senders back once it is finished.
798    // TODO: sort by ChainId instead of using a Vec?
799    event_senders: either::Either<
800        Vec<(ChainId, async_channel::Sender<Event>)>,
801        Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
802    >,
803
804    /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list.
805    /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders.
806    pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
807
808    main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
809
810    messages_rx:
811        stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
812
813    blocks_requests: HashMap<
814        service::SubstreamId,
815        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
816        fnv::FnvBuildHasher,
817    >,
818
819    grandpa_warp_sync_requests: HashMap<
820        service::SubstreamId,
821        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
822        fnv::FnvBuildHasher,
823    >,
824
825    storage_proof_requests: HashMap<
826        service::SubstreamId,
827        oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
828        fnv::FnvBuildHasher,
829    >,
830
831    call_proof_requests: HashMap<
832        service::SubstreamId,
833        oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
834        fnv::FnvBuildHasher,
835    >,
836
837    /// All chains, indexed by the value of [`Chain::next_discovery_when`].
838    chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
839}
840
841struct Chain<TPlat: PlatformRef> {
842    log_name: String,
843
844    // TODO: this field is a hack due to the fact that `add_chain` can't be `async`; should eventually be fixed after a lib.rs refactor
845    num_references: NonZero<usize>,
846
847    /// See [`ConfigChain::block_number_bytes`].
848    // TODO: redundant with ChainNetwork? since we might not need to know this in the future i'm reluctant to add a getter to ChainNetwork
849    block_number_bytes: usize,
850
851    /// See [`ConfigChain::num_out_slots`].
852    num_out_slots: usize,
853
854    /// When the next discovery should be started for this chain.
855    next_discovery_when: TPlat::Instant,
856
857    /// After [`Chain::next_discovery_when`] is reached, the following discovery happens after
858    /// the given duration.
859    next_discovery_period: Duration,
860}
861
862#[derive(Clone)]
863struct OpenGossipLinkState {
864    role: Role,
865    best_block_number: u64,
866    best_block_hash: [u8; 32],
867    /// `None` if unknown.
868    finalized_block_height: Option<u64>,
869}
870
871async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
872    loop {
873        // Yield at every loop in order to provide better tasks granularity.
874        futures_lite::future::yield_now().await;
875
876        enum WakeUpReason<TPlat: PlatformRef> {
877            ForegroundClosed,
878            Message(ToBackground<TPlat>),
879            MessageForChain(ChainId, ToBackgroundChain),
880            NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
881            CanAssignSlot(PeerId, ChainId),
882            NextRecentConnectionRestore,
883            CanStartConnect(PeerId),
884            CanOpenGossip(PeerId, ChainId),
885            MessageFromConnection {
886                connection_id: service::ConnectionId,
887                message: service::ConnectionToCoordinator,
888            },
889            MessageToConnection {
890                connection_id: service::ConnectionId,
891                message: service::CoordinatorToConnection,
892            },
893            EventSendersReady,
894            StartDiscovery(ChainId),
895        }
896
897        let wake_up_reason = {
898            let message_received = async {
899                task.main_messages_rx
900                    .next()
901                    .await
902                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
903            };
904            let message_for_chain_received = async {
905                // Note that when the last entry of `messages_rx` yields `None`, `messages_rx`
906                // itself will yield `None`. For this reason, we can't use
907                // `task.messages_rx.is_empty()` to determine whether `messages_rx` will
908                // yield `None`.
909                let Some((chain_id, message)) = task.messages_rx.next().await else {
910                    future::pending().await
911                };
912                WakeUpReason::MessageForChain(chain_id, message)
913            };
914            let message_from_task_received = async {
915                let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
916                WakeUpReason::MessageFromConnection {
917                    connection_id,
918                    message,
919                }
920            };
921            let service_event = async {
922                if let Some(event) = (task.event_pending_send.is_none()
923                    && task.pending_new_subscriptions.is_empty())
924                .then(|| task.network.next_event())
925                .flatten()
926                {
927                    WakeUpReason::NetworkEvent(event)
928                } else if let Some(start_connect) = {
929                    let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
930                        .then(|| {
931                            task.network
932                                .unconnected_desired()
933                                .choose(&mut task.randomness)
934                                .cloned()
935                        })
936                        .flatten();
937                    x
938                } {
939                    WakeUpReason::CanStartConnect(start_connect)
940                } else if let Some((peer_id, chain_id)) = {
941                    let x = task
942                        .network
943                        .connected_unopened_gossip_desired()
944                        .choose(&mut task.randomness)
945                        .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
946                    x
947                } {
948                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
949                } else if let Some((connection_id, message)) =
950                    task.network.pull_message_to_connection()
951                {
952                    WakeUpReason::MessageToConnection {
953                        connection_id,
954                        message,
955                    }
956                } else {
957                    'search: loop {
958                        let mut earlier_unban = None;
959
960                        for chain_id in task.network.chains().collect::<Vec<_>>() {
961                            if task.network.gossip_desired_num(
962                                chain_id,
963                                service::GossipKind::ConsensusTransactions,
964                            ) >= task.network[chain_id].num_out_slots
965                            {
966                                continue;
967                            }
968
969                            match task
970                                .peering_strategy
971                                .pick_assignable_peer(&chain_id, &task.platform.now())
972                            {
973                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
974                                    break 'search WakeUpReason::CanAssignSlot(
975                                        peer_id.clone(),
976                                        chain_id,
977                                    );
978                                }
979                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
980                                    next_unban,
981                                } => {
982                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
983                                        earlier_unban = Some(next_unban.clone());
984                                    }
985                                }
986                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
987                            }
988                        }
989
990                        if let Some(earlier_unban) = earlier_unban {
991                            task.platform.sleep_until(earlier_unban).await;
992                        } else {
993                            future::pending::<()>().await;
994                        }
995                    }
996                }
997            };
998            let next_recent_connection_restore = async {
999                if task.num_recent_connection_opening != 0
1000                    && task.next_recent_connection_restore.is_none()
1001                {
1002                    task.next_recent_connection_restore = Some(Box::pin(
1003                        task.platform
1004                            .sleep(task.connections_open_pool_restore_delay),
1005                    ));
1006                }
1007                if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1008                    delay.await;
1009                    task.next_recent_connection_restore = None;
1010                    WakeUpReason::NextRecentConnectionRestore
1011                } else {
1012                    future::pending().await
1013                }
1014            };
1015            let finished_sending_event = async {
1016                if let either::Right(event_sending_future) = &mut task.event_senders {
1017                    let event_senders = event_sending_future.await;
1018                    task.event_senders = either::Left(event_senders);
1019                    WakeUpReason::EventSendersReady
1020                } else if task.event_pending_send.is_some()
1021                    || !task.pending_new_subscriptions.is_empty()
1022                {
1023                    WakeUpReason::EventSendersReady
1024                } else {
1025                    future::pending().await
1026                }
1027            };
1028            let start_discovery = async {
1029                let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1030                    future::pending().await
1031                };
1032                next_discovery.get_mut().await;
1033                let ((_, chain_id), _) = next_discovery.remove_entry();
1034                WakeUpReason::StartDiscovery(chain_id)
1035            };
1036
1037            message_for_chain_received
1038                .or(message_received)
1039                .or(message_from_task_received)
1040                .or(service_event)
1041                .or(next_recent_connection_restore)
1042                .or(finished_sending_event)
1043                .or(start_discovery)
1044                .await
1045        };
1046
1047        match wake_up_reason {
1048            WakeUpReason::ForegroundClosed => {
1049                // End the task.
1050                return;
1051            }
1052            WakeUpReason::Message(ToBackground::AddChain {
1053                messages_rx,
1054                config,
1055            }) => {
1056                // TODO: this is not a completely clean way of handling duplicate chains, because the existing chain might have a different best block and role and all ; also, multiple sync services will call set_best_block and set_finalized_block
1057                let chain_id = match task.network.add_chain(config) {
1058                    Ok(id) => id,
1059                    Err(service::AddChainError::Duplicate { existing_identical }) => {
1060                        task.network[existing_identical].num_references = task.network
1061                            [existing_identical]
1062                            .num_references
1063                            .checked_add(1)
1064                            .unwrap();
1065                        existing_identical
1066                    }
1067                };
1068
1069                task.chains_by_next_discovery.insert(
1070                    (task.network[chain_id].next_discovery_when.clone(), chain_id),
1071                    Box::pin(
1072                        task.platform
1073                            .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1074                    ),
1075                );
1076
1077                task.messages_rx
1078                    .push(Box::pin(
1079                        messages_rx
1080                            .map(move |msg| (chain_id, msg))
1081                            .chain(stream::once(future::ready((
1082                                chain_id,
1083                                ToBackgroundChain::RemoveChain,
1084                            )))),
1085                    ) as Pin<Box<_>>);
1086
1087                log!(
1088                    &task.platform,
1089                    Debug,
1090                    "network",
1091                    "chain-added",
1092                    id = task.network[chain_id].log_name
1093                );
1094            }
1095            WakeUpReason::EventSendersReady => {
1096                // Dispatch the pending event, if any, to the various senders.
1097
1098                // We made sure that the senders were ready before generating an event.
1099                let either::Left(event_senders) = &mut task.event_senders else {
1100                    unreachable!()
1101                };
1102
1103                if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1104                    task.event_pending_send.take()
1105                {
1106                    let mut event_senders = mem::take(event_senders);
1107                    task.event_senders = either::Right(Box::pin(async move {
1108                        // Elements in `event_senders` are removed one by one and inserted
1109                        // back if the channel is still open.
1110                        for index in (0..event_senders.len()).rev() {
1111                            let (event_sender_chain_id, event_sender) =
1112                                event_senders.swap_remove(index);
1113                            if event_sender_chain_id == event_to_dispatch_chain_id {
1114                                if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1115                                    continue;
1116                                }
1117                            }
1118                            event_senders.push((event_sender_chain_id, event_sender));
1119                        }
1120                        event_senders
1121                    }));
1122                } else if !task.pending_new_subscriptions.is_empty() {
1123                    let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1124                    let mut event_senders = mem::take(event_senders);
1125                    // TODO: cloning :-/
1126                    let open_gossip_links = task.open_gossip_links.clone();
1127                    task.event_senders = either::Right(Box::pin(async move {
1128                        for (chain_id, new_subscription) in pending_new_subscriptions {
1129                            for ((link_chain_id, peer_id), state) in &open_gossip_links {
1130                                // TODO: optimize? this is O(n) by chain
1131                                if *link_chain_id != chain_id {
1132                                    continue;
1133                                }
1134
1135                                let _ = new_subscription
1136                                    .send(Event::Connected {
1137                                        peer_id: peer_id.clone(),
1138                                        role: state.role,
1139                                        best_block_number: state.best_block_number,
1140                                        best_block_hash: state.best_block_hash,
1141                                    })
1142                                    .await;
1143
1144                                if let Some(finalized_block_height) = state.finalized_block_height {
1145                                    let _ = new_subscription
1146                                        .send(Event::GrandpaNeighborPacket {
1147                                            peer_id: peer_id.clone(),
1148                                            finalized_block_height,
1149                                        })
1150                                        .await;
1151                                }
1152                            }
1153
1154                            event_senders.push((chain_id, new_subscription));
1155                        }
1156
1157                        event_senders
1158                    }));
1159                }
1160            }
1161            WakeUpReason::MessageFromConnection {
1162                connection_id,
1163                message,
1164            } => {
1165                task.network
1166                    .inject_connection_message(connection_id, message);
1167            }
1168            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1169                if let Some(new_ref) =
1170                    NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1171                {
1172                    task.network[chain_id].num_references = new_ref;
1173                    continue;
1174                }
1175
1176                for peer_id in task
1177                    .network
1178                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1179                    .cloned()
1180                    .collect::<Vec<_>>()
1181                {
1182                    task.network
1183                        .gossip_close(
1184                            chain_id,
1185                            &peer_id,
1186                            service::GossipKind::ConsensusTransactions,
1187                        )
1188                        .unwrap();
1189
1190                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1191                    debug_assert!(_was_in.is_some());
1192                }
1193
1194                let _was_in = task
1195                    .chains_by_next_discovery
1196                    .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1197                debug_assert!(_was_in.is_some());
1198
1199                log!(
1200                    &task.platform,
1201                    Debug,
1202                    "network",
1203                    "chain-removed",
1204                    id = task.network[chain_id].log_name
1205                );
1206                task.network.remove_chain(chain_id).unwrap();
1207                task.peering_strategy.remove_chain_peers(&chain_id);
1208            }
1209            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1210                task.pending_new_subscriptions.push((chain_id, sender));
1211            }
1212            WakeUpReason::MessageForChain(
1213                chain_id,
1214                ToBackgroundChain::DisconnectAndBan {
1215                    peer_id,
1216                    severity,
1217                    reason,
1218                },
1219            ) => {
1220                let ban_duration = Duration::from_secs(match severity {
1221                    BanSeverity::Low => 10,
1222                    BanSeverity::High => 40,
1223                });
1224
1225                let had_slot = matches!(
1226                    task.peering_strategy.unassign_slot_and_ban(
1227                        &chain_id,
1228                        &peer_id,
1229                        task.platform.now() + ban_duration,
1230                    ),
1231                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1232                );
1233
1234                if had_slot {
1235                    log!(
1236                        &task.platform,
1237                        Debug,
1238                        "network",
1239                        "slot-unassigned",
1240                        chain = &task.network[chain_id].log_name,
1241                        peer_id,
1242                        ?ban_duration,
1243                        reason = "user-ban",
1244                        user_reason = reason
1245                    );
1246                    task.network.gossip_remove_desired(
1247                        chain_id,
1248                        &peer_id,
1249                        service::GossipKind::ConsensusTransactions,
1250                    );
1251                }
1252
1253                if task.network.gossip_is_connected(
1254                    chain_id,
1255                    &peer_id,
1256                    service::GossipKind::ConsensusTransactions,
1257                ) {
1258                    let _closed_result = task.network.gossip_close(
1259                        chain_id,
1260                        &peer_id,
1261                        service::GossipKind::ConsensusTransactions,
1262                    );
1263                    debug_assert!(_closed_result.is_ok());
1264
1265                    log!(
1266                        &task.platform,
1267                        Debug,
1268                        "network",
1269                        "gossip-closed",
1270                        chain = &task.network[chain_id].log_name,
1271                        peer_id,
1272                    );
1273
1274                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1275                    debug_assert!(_was_in.is_some());
1276
1277                    debug_assert!(task.event_pending_send.is_none());
1278                    task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1279                }
1280            }
1281            WakeUpReason::MessageForChain(
1282                chain_id,
1283                ToBackgroundChain::StartBlocksRequest {
1284                    target,
1285                    config,
1286                    timeout,
1287                    result,
1288                },
1289            ) => {
1290                match &config.start {
1291                    codec::BlocksRequestConfigStart::Hash(hash) => {
1292                        log!(
1293                            &task.platform,
1294                            Debug,
1295                            "network",
1296                            "blocks-request-started",
1297                            chain = task.network[chain_id].log_name, target,
1298                            start = HashDisplay(hash),
1299                            num = config.desired_count.get(),
1300                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1301                            header = ?config.fields.header, body = ?config.fields.body,
1302                            justifications = ?config.fields.justifications
1303                        );
1304                    }
1305                    codec::BlocksRequestConfigStart::Number(number) => {
1306                        log!(
1307                            &task.platform,
1308                            Debug,
1309                            "network",
1310                            "blocks-request-started",
1311                            chain = task.network[chain_id].log_name, target, start = number,
1312                            num = config.desired_count.get(),
1313                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1314                            header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1315                        );
1316                    }
1317                }
1318
1319                match task
1320                    .network
1321                    .start_blocks_request(&target, chain_id, config.clone(), timeout)
1322                {
1323                    Ok(substream_id) => {
1324                        task.blocks_requests.insert(substream_id, result);
1325                    }
1326                    Err(service::StartRequestError::NoConnection) => {
1327                        log!(
1328                            &task.platform,
1329                            Debug,
1330                            "network",
1331                            "blocks-request-error",
1332                            chain = task.network[chain_id].log_name,
1333                            target,
1334                            error = "NoConnection"
1335                        );
1336                        let _ = result.send(Err(BlocksRequestError::NoConnection));
1337                    }
1338                }
1339            }
1340            WakeUpReason::MessageForChain(
1341                chain_id,
1342                ToBackgroundChain::StartWarpSyncRequest {
1343                    target,
1344                    begin_hash,
1345                    timeout,
1346                    result,
1347                },
1348            ) => {
1349                log!(
1350                    &task.platform,
1351                    Debug,
1352                    "network",
1353                    "warp-sync-request-started",
1354                    chain = task.network[chain_id].log_name,
1355                    target,
1356                    start = HashDisplay(&begin_hash)
1357                );
1358
1359                match task
1360                    .network
1361                    .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1362                {
1363                    Ok(substream_id) => {
1364                        task.grandpa_warp_sync_requests.insert(substream_id, result);
1365                    }
1366                    Err(service::StartRequestError::NoConnection) => {
1367                        log!(
1368                            &task.platform,
1369                            Debug,
1370                            "network",
1371                            "warp-sync-request-error",
1372                            chain = task.network[chain_id].log_name,
1373                            target,
1374                            error = "NoConnection"
1375                        );
1376                        let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1377                    }
1378                }
1379            }
1380            WakeUpReason::MessageForChain(
1381                chain_id,
1382                ToBackgroundChain::StartStorageProofRequest {
1383                    target,
1384                    config,
1385                    timeout,
1386                    result,
1387                },
1388            ) => {
1389                log!(
1390                    &task.platform,
1391                    Debug,
1392                    "network",
1393                    "storage-proof-request-started",
1394                    chain = task.network[chain_id].log_name,
1395                    target,
1396                    block_hash = HashDisplay(&config.block_hash)
1397                );
1398
1399                match task.network.start_storage_proof_request(
1400                    &target,
1401                    chain_id,
1402                    config.clone(),
1403                    timeout,
1404                ) {
1405                    Ok(substream_id) => {
1406                        task.storage_proof_requests.insert(substream_id, result);
1407                    }
1408                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1409                        log!(
1410                            &task.platform,
1411                            Debug,
1412                            "network",
1413                            "storage-proof-request-error",
1414                            chain = task.network[chain_id].log_name,
1415                            target,
1416                            error = "NoConnection"
1417                        );
1418                        let _ = result.send(Err(StorageProofRequestError::NoConnection));
1419                    }
1420                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1421                        log!(
1422                            &task.platform,
1423                            Debug,
1424                            "network",
1425                            "storage-proof-request-error",
1426                            chain = task.network[chain_id].log_name,
1427                            target,
1428                            error = "RequestTooLarge"
1429                        );
1430                        let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1431                    }
1432                };
1433            }
1434            WakeUpReason::MessageForChain(
1435                chain_id,
1436                ToBackgroundChain::StartCallProofRequest {
1437                    target,
1438                    config,
1439                    timeout,
1440                    result,
1441                },
1442            ) => {
1443                log!(
1444                    &task.platform,
1445                    Debug,
1446                    "network",
1447                    "call-proof-request-started",
1448                    chain = task.network[chain_id].log_name,
1449                    target,
1450                    block_hash = HashDisplay(&config.block_hash),
1451                    function = config.method
1452                );
1453                // TODO: log parameter
1454
1455                match task.network.start_call_proof_request(
1456                    &target,
1457                    chain_id,
1458                    config.clone(),
1459                    timeout,
1460                ) {
1461                    Ok(substream_id) => {
1462                        task.call_proof_requests.insert(substream_id, result);
1463                    }
1464                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1465                        log!(
1466                            &task.platform,
1467                            Debug,
1468                            "network",
1469                            "call-proof-request-error",
1470                            chain = task.network[chain_id].log_name,
1471                            target,
1472                            error = "NoConnection"
1473                        );
1474                        let _ = result.send(Err(CallProofRequestError::NoConnection));
1475                    }
1476                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1477                        log!(
1478                            &task.platform,
1479                            Debug,
1480                            "network",
1481                            "call-proof-request-error",
1482                            chain = task.network[chain_id].log_name,
1483                            target,
1484                            error = "RequestTooLarge"
1485                        );
1486                        let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1487                    }
1488                };
1489            }
1490            WakeUpReason::MessageForChain(
1491                chain_id,
1492                ToBackgroundChain::SetLocalBestBlock {
1493                    best_hash,
1494                    best_number,
1495                },
1496            ) => {
1497                task.network
1498                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1499            }
1500            WakeUpReason::MessageForChain(
1501                chain_id,
1502                ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1503            ) => {
1504                log!(
1505                    &task.platform,
1506                    Debug,
1507                    "network",
1508                    "local-grandpa-state-announced",
1509                    chain = task.network[chain_id].log_name,
1510                    set_id = grandpa_state.set_id,
1511                    commit_finalized_height = grandpa_state.commit_finalized_height,
1512                );
1513
1514                // TODO: log the list of peers we sent the packet to
1515
1516                task.network
1517                    .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1518            }
1519            WakeUpReason::MessageForChain(
1520                chain_id,
1521                ToBackgroundChain::AnnounceTransaction {
1522                    transaction,
1523                    result,
1524                },
1525            ) => {
1526                // TODO: keep track of which peer knows about which transaction, and don't send it again
1527
1528                let peers_to_send = task
1529                    .network
1530                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1531                    .cloned()
1532                    .collect::<Vec<_>>();
1533
1534                let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1535                let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1536                for peer in &peers_to_send {
1537                    match task
1538                        .network
1539                        .gossip_send_transaction(peer, chain_id, &transaction)
1540                    {
1541                        Ok(()) => peers_sent.push(peer.to_base58()),
1542                        Err(QueueNotificationError::QueueFull) => {
1543                            peers_queue_full.push(peer.to_base58())
1544                        }
1545                        Err(QueueNotificationError::NoConnection) => unreachable!(),
1546                    }
1547                }
1548
1549                log!(
1550                    &task.platform,
1551                    Debug,
1552                    "network",
1553                    "transaction-announced",
1554                    chain = task.network[chain_id].log_name,
1555                    transaction =
1556                        hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
1557                    size = transaction.len(),
1558                    peers_sent = peers_sent.join(", "),
1559                    peers_queue_full = peers_queue_full.join(", "),
1560                );
1561
1562                let _ = result.send(peers_to_send);
1563            }
1564            WakeUpReason::MessageForChain(
1565                chain_id,
1566                ToBackgroundChain::SendBlockAnnounce {
1567                    target,
1568                    scale_encoded_header,
1569                    is_best,
1570                    result,
1571                },
1572            ) => {
1573                // TODO: log who the announce was sent to
1574                let _ = result.send(task.network.gossip_send_block_announce(
1575                    &target,
1576                    chain_id,
1577                    &scale_encoded_header,
1578                    is_best,
1579                ));
1580            }
1581            WakeUpReason::MessageForChain(
1582                chain_id,
1583                ToBackgroundChain::Discover {
1584                    list,
1585                    important_nodes,
1586                },
1587            ) => {
1588                for (peer_id, addrs) in list {
1589                    if important_nodes {
1590                        task.important_nodes.insert(peer_id.clone());
1591                    }
1592
1593                    // Note that we must call this function before `insert_address`, as documented
1594                    // in `basic_peering_strategy`.
1595                    task.peering_strategy
1596                        .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
1597
1598                    for addr in addrs {
1599                        let _ =
1600                            task.peering_strategy
1601                                .insert_address(&peer_id, addr.into_bytes(), 10);
1602                        // TODO: constant
1603                    }
1604                }
1605            }
1606            WakeUpReason::MessageForChain(
1607                chain_id,
1608                ToBackgroundChain::DiscoveredNodes { result },
1609            ) => {
1610                // TODO: consider returning Vec<u8>s for the addresses?
1611                let _ = result.send(
1612                    task.peering_strategy
1613                        .chain_peers_unordered(&chain_id)
1614                        .map(|peer_id| {
1615                            let addrs = task
1616                                .peering_strategy
1617                                .peer_addresses(peer_id)
1618                                .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
1619                                .collect::<Vec<_>>();
1620                            (peer_id.clone(), addrs)
1621                        })
1622                        .collect::<Vec<_>>(),
1623                );
1624            }
1625            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
1626                let _ = result.send(
1627                    task.network
1628                        .gossip_connected_peers(
1629                            chain_id,
1630                            service::GossipKind::ConsensusTransactions,
1631                        )
1632                        .cloned()
1633                        .collect(),
1634                );
1635            }
1636            WakeUpReason::StartDiscovery(chain_id) => {
1637                // Re-insert the chain in `chains_by_next_discovery`.
1638                let chain = &mut task.network[chain_id];
1639                chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
1640                chain.next_discovery_period =
1641                    cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
1642                task.chains_by_next_discovery.insert(
1643                    (chain.next_discovery_when.clone(), chain_id),
1644                    Box::pin(
1645                        task.platform
1646                            .sleep(task.network[chain_id].next_discovery_period),
1647                    ),
1648                );
1649
1650                let random_peer_id = {
1651                    let mut pub_key = [0; 32];
1652                    rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
1653                    PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
1654                };
1655
1656                // TODO: select target closest to the random peer instead
1657                let target = task
1658                    .network
1659                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1660                    .next()
1661                    .cloned();
1662
1663                if let Some(target) = target {
1664                    match task.network.start_kademlia_find_node_request(
1665                        &target,
1666                        chain_id,
1667                        &random_peer_id,
1668                        Duration::from_secs(20),
1669                    ) {
1670                        Ok(_) => {}
1671                        Err(service::StartRequestError::NoConnection) => unreachable!(),
1672                    };
1673
1674                    log!(
1675                        &task.platform,
1676                        Debug,
1677                        "network",
1678                        "discovery-find-node-started",
1679                        chain = &task.network[chain_id].log_name,
1680                        request_target = target,
1681                        requested_peer_id = random_peer_id
1682                    );
1683                } else {
1684                    log!(
1685                        &task.platform,
1686                        Debug,
1687                        "network",
1688                        "discovery-skipped-no-peer",
1689                        chain = &task.network[chain_id].log_name
1690                    );
1691                }
1692            }
1693            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1694                peer_id,
1695                expected_peer_id,
1696                id,
1697            }) => {
1698                let remote_addr =
1699                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
1700                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
1701                {
1702                    log!(
1703                        &task.platform,
1704                        Debug,
1705                        "network",
1706                        "handshake-finished-peer-id-mismatch",
1707                        remote_addr,
1708                        expected_peer_id,
1709                        actual_peer_id = peer_id
1710                    );
1711
1712                    let _was_in = task
1713                        .peering_strategy
1714                        .decrease_address_connections_and_remove_if_zero(
1715                            expected_peer_id,
1716                            remote_addr.as_ref(),
1717                        );
1718                    debug_assert!(_was_in.is_ok());
1719                    let _ = task.peering_strategy.increase_address_connections(
1720                        &peer_id,
1721                        remote_addr.into_bytes().to_vec(),
1722                        10,
1723                    );
1724                } else {
1725                    log!(
1726                        &task.platform,
1727                        Debug,
1728                        "network",
1729                        "handshake-finished",
1730                        remote_addr,
1731                        peer_id
1732                    );
1733                }
1734            }
1735            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1736                expected_peer_id: Some(_),
1737                ..
1738            })
1739            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1740                let (address, peer_id, handshake_finished) = match wake_up_reason {
1741                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1742                        address,
1743                        expected_peer_id: Some(peer_id),
1744                        ..
1745                    }) => (address, peer_id, false),
1746                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
1747                        address,
1748                        peer_id,
1749                        ..
1750                    }) => (address, peer_id, true),
1751                    _ => unreachable!(),
1752                };
1753
1754                task.peering_strategy
1755                    .decrease_address_connections(&peer_id, &address)
1756                    .unwrap();
1757                let address = Multiaddr::from_bytes(address).unwrap();
1758                log!(
1759                    &task.platform,
1760                    Debug,
1761                    "network",
1762                    "connection-shutdown",
1763                    peer_id,
1764                    address,
1765                    ?handshake_finished
1766                );
1767
1768                // Ban the peer in order to avoid trying over and over again the same address(es).
1769                // Even if the handshake was finished, it is possible that the peer simply shuts
1770                // down connections immediately after it has been opened, hence the ban.
1771                // Due to race conditions and peerid mismatches, it is possible that there is
1772                // another existing connection or connection attempt with that same peer. However,
1773                // it is not possible to be sure that we will reach 0 connections or connection
1774                // attempts, and thus we ban the peer every time.
1775                let ban_duration = Duration::from_secs(5);
1776                task.network.gossip_remove_desired_all(
1777                    &peer_id,
1778                    service::GossipKind::ConsensusTransactions,
1779                );
1780                for (&chain_id, what_happened) in task
1781                    .peering_strategy
1782                    .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
1783                {
1784                    if matches!(
1785                        what_happened,
1786                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1787                    ) {
1788                        log!(
1789                            &task.platform,
1790                            Debug,
1791                            "network",
1792                            "slot-unassigned",
1793                            chain = &task.network[chain_id].log_name,
1794                            peer_id,
1795                            ?ban_duration,
1796                            reason = "pre-handshake-disconnect"
1797                        );
1798                    }
1799                }
1800            }
1801            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1802                expected_peer_id: None,
1803                ..
1804            }) => {
1805                // This path can't be reached as we always set an expected peer id when creating
1806                // a connection.
1807                debug_assert!(false);
1808            }
1809            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1810                id,
1811                peer_id,
1812                ping_time,
1813            }) => {
1814                let remote_addr =
1815                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
1816                log!(
1817                    &task.platform,
1818                    Debug,
1819                    "network",
1820                    "pong",
1821                    peer_id,
1822                    remote_addr,
1823                    ?ping_time
1824                );
1825            }
1826            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1827                chain_id,
1828                peer_id,
1829                announce,
1830            }) => {
1831                log!(
1832                    &task.platform,
1833                    Debug,
1834                    "network",
1835                    "block-announce-received",
1836                    chain = &task.network[chain_id].log_name,
1837                    peer_id,
1838                    block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
1839                        announce.decode().scale_encoded_header
1840                    )),
1841                    is_best = announce.decode().is_best
1842                );
1843
1844                let decoded_announce = announce.decode();
1845                if decoded_announce.is_best {
1846                    let link = task
1847                        .open_gossip_links
1848                        .get_mut(&(chain_id, peer_id.clone()))
1849                        .unwrap();
1850                    if let Ok(decoded) = header::decode(
1851                        decoded_announce.scale_encoded_header,
1852                        task.network[chain_id].block_number_bytes,
1853                    ) {
1854                        link.best_block_hash = header::hash_from_scale_encoded_header(
1855                            decoded_announce.scale_encoded_header,
1856                        );
1857                        link.best_block_number = decoded.number;
1858                    }
1859                }
1860
1861                debug_assert!(task.event_pending_send.is_none());
1862                task.event_pending_send =
1863                    Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
1864            }
1865            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
1866                peer_id,
1867                chain_id,
1868                role,
1869                best_number,
1870                best_hash,
1871                kind: service::GossipKind::ConsensusTransactions,
1872            }) => {
1873                log!(
1874                    &task.platform,
1875                    Debug,
1876                    "network",
1877                    "gossip-open-success",
1878                    chain = &task.network[chain_id].log_name,
1879                    peer_id,
1880                    best_number,
1881                    best_hash = HashDisplay(&best_hash)
1882                );
1883
1884                let _prev_value = task.open_gossip_links.insert(
1885                    (chain_id, peer_id.clone()),
1886                    OpenGossipLinkState {
1887                        best_block_number: best_number,
1888                        best_block_hash: best_hash,
1889                        role,
1890                        finalized_block_height: None,
1891                    },
1892                );
1893                debug_assert!(_prev_value.is_none());
1894
1895                debug_assert!(task.event_pending_send.is_none());
1896                task.event_pending_send = Some((
1897                    chain_id,
1898                    Event::Connected {
1899                        peer_id,
1900                        role,
1901                        best_block_number: best_number,
1902                        best_block_hash: best_hash,
1903                    },
1904                ));
1905            }
1906            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
1907                peer_id,
1908                chain_id,
1909                error,
1910                kind: service::GossipKind::ConsensusTransactions,
1911            }) => {
1912                log!(
1913                    &task.platform,
1914                    Debug,
1915                    "network",
1916                    "gossip-open-error",
1917                    chain = &task.network[chain_id].log_name,
1918                    peer_id,
1919                    ?error,
1920                );
1921                let ban_duration = Duration::from_secs(15);
1922
1923                // Note that peer doesn't necessarily have an out slot, as this event might happen
1924                // as a result of an inbound gossip connection.
1925                let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
1926                    matches!(
1927                        task.peering_strategy
1928                            .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
1929                        basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
1930                    )
1931                } else {
1932                    matches!(
1933                        task.peering_strategy.unassign_slot_and_ban(
1934                            &chain_id,
1935                            &peer_id,
1936                            task.platform.now() + ban_duration,
1937                        ),
1938                        basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1939                    )
1940                };
1941
1942                if had_slot {
1943                    log!(
1944                        &task.platform,
1945                        Debug,
1946                        "network",
1947                        "slot-unassigned",
1948                        chain = &task.network[chain_id].log_name,
1949                        peer_id,
1950                        ?ban_duration,
1951                        reason = "gossip-open-failed"
1952                    );
1953                    task.network.gossip_remove_desired(
1954                        chain_id,
1955                        &peer_id,
1956                        service::GossipKind::ConsensusTransactions,
1957                    );
1958                }
1959            }
1960            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
1961                peer_id,
1962                chain_id,
1963                kind: service::GossipKind::ConsensusTransactions,
1964            }) => {
1965                log!(
1966                    &task.platform,
1967                    Debug,
1968                    "network",
1969                    "gossip-closed",
1970                    chain = &task.network[chain_id].log_name,
1971                    peer_id,
1972                );
1973                let ban_duration = Duration::from_secs(10);
1974
1975                let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1976                debug_assert!(_was_in.is_some());
1977
1978                // Note that peer doesn't necessarily have an out slot, as this event might happen
1979                // as a result of an inbound gossip connection.
1980                if matches!(
1981                    task.peering_strategy.unassign_slot_and_ban(
1982                        &chain_id,
1983                        &peer_id,
1984                        task.platform.now() + ban_duration,
1985                    ),
1986                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1987                ) {
1988                    log!(
1989                        &task.platform,
1990                        Debug,
1991                        "network",
1992                        "slot-unassigned",
1993                        chain = &task.network[chain_id].log_name,
1994                        peer_id,
1995                        ?ban_duration,
1996                        reason = "gossip-closed"
1997                    );
1998                    task.network.gossip_remove_desired(
1999                        chain_id,
2000                        &peer_id,
2001                        service::GossipKind::ConsensusTransactions,
2002                    );
2003                }
2004
2005                debug_assert!(task.event_pending_send.is_none());
2006                task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2007            }
2008            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2009                substream_id,
2010                peer_id,
2011                chain_id,
2012                response: service::RequestResult::Blocks(response),
2013            }) => {
2014                match &response {
2015                    Ok(blocks) => {
2016                        log!(
2017                            &task.platform,
2018                            Debug,
2019                            "network",
2020                            "blocks-request-success",
2021                            chain = task.network[chain_id].log_name,
2022                            target = peer_id,
2023                            num_blocks = blocks.len(),
2024                            block_data_total_size =
2025                                BytesDisplay(blocks.iter().fold(0, |sum, block| {
2026                                    let block_size = block.header.as_ref().map_or(0, |h| h.len())
2027                                        + block
2028                                            .body
2029                                            .as_ref()
2030                                            .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2031                                        + block
2032                                            .justifications
2033                                            .as_ref()
2034                                            .into_iter()
2035                                            .flat_map(|l| l.iter())
2036                                            .fold(0, |s, j| s + j.justification.len());
2037                                    sum + u64::try_from(block_size).unwrap()
2038                                }))
2039                        );
2040                    }
2041                    Err(error) => {
2042                        log!(
2043                            &task.platform,
2044                            Debug,
2045                            "network",
2046                            "blocks-request-error",
2047                            chain = task.network[chain_id].log_name,
2048                            target = peer_id,
2049                            ?error
2050                        );
2051                    }
2052                }
2053
2054                match &response {
2055                    Ok(_) => {}
2056                    Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2057                    Err(err) => {
2058                        log!(
2059                            &task.platform,
2060                            Debug,
2061                            "network",
2062                            format!(
2063                                "Error in block request with {}. This might indicate an \
2064                                incompatibility. Error: {}",
2065                                peer_id, err
2066                            )
2067                        );
2068                    }
2069                }
2070
2071                let _ = task
2072                    .blocks_requests
2073                    .remove(&substream_id)
2074                    .unwrap()
2075                    .send(response.map_err(BlocksRequestError::Request));
2076            }
2077            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2078                substream_id,
2079                peer_id,
2080                chain_id,
2081                response: service::RequestResult::GrandpaWarpSync(response),
2082            }) => {
2083                match &response {
2084                    Ok(response) => {
2085                        // TODO: print total bytes size
2086                        let decoded = response.decode();
2087                        log!(
2088                            &task.platform,
2089                            Debug,
2090                            "network",
2091                            "warp-sync-request-success",
2092                            chain = task.network[chain_id].log_name,
2093                            target = peer_id,
2094                            num_fragments = decoded.fragments.len(),
2095                            is_finished = ?decoded.is_finished,
2096                        );
2097                    }
2098                    Err(error) => {
2099                        log!(
2100                            &task.platform,
2101                            Debug,
2102                            "network",
2103                            "warp-sync-request-error",
2104                            chain = task.network[chain_id].log_name,
2105                            target = peer_id,
2106                            ?error,
2107                        );
2108                    }
2109                }
2110
2111                let _ = task
2112                    .grandpa_warp_sync_requests
2113                    .remove(&substream_id)
2114                    .unwrap()
2115                    .send(response.map_err(WarpSyncRequestError::Request));
2116            }
2117            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2118                substream_id,
2119                peer_id,
2120                chain_id,
2121                response: service::RequestResult::StorageProof(response),
2122            }) => {
2123                match &response {
2124                    Ok(items) => {
2125                        let decoded = items.decode();
2126                        log!(
2127                            &task.platform,
2128                            Debug,
2129                            "network",
2130                            "storage-proof-request-success",
2131                            chain = task.network[chain_id].log_name,
2132                            target = peer_id,
2133                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2134                        );
2135                    }
2136                    Err(error) => {
2137                        log!(
2138                            &task.platform,
2139                            Debug,
2140                            "network",
2141                            "storage-proof-request-error",
2142                            chain = task.network[chain_id].log_name,
2143                            target = peer_id,
2144                            ?error
2145                        );
2146                    }
2147                }
2148
2149                let _ = task
2150                    .storage_proof_requests
2151                    .remove(&substream_id)
2152                    .unwrap()
2153                    .send(response.map_err(StorageProofRequestError::Request));
2154            }
2155            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2156                substream_id,
2157                peer_id,
2158                chain_id,
2159                response: service::RequestResult::CallProof(response),
2160            }) => {
2161                match &response {
2162                    Ok(items) => {
2163                        let decoded = items.decode();
2164                        log!(
2165                            &task.platform,
2166                            Debug,
2167                            "network",
2168                            "call-proof-request-success",
2169                            chain = task.network[chain_id].log_name,
2170                            target = peer_id,
2171                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2172                        );
2173                    }
2174                    Err(error) => {
2175                        log!(
2176                            &task.platform,
2177                            Debug,
2178                            "network",
2179                            "call-proof-request-error",
2180                            chain = task.network[chain_id].log_name,
2181                            target = peer_id,
2182                            ?error
2183                        );
2184                    }
2185                }
2186
2187                let _ = task
2188                    .call_proof_requests
2189                    .remove(&substream_id)
2190                    .unwrap()
2191                    .send(response.map_err(CallProofRequestError::Request));
2192            }
2193            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2194                peer_id: requestee_peer_id,
2195                chain_id,
2196                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2197                ..
2198            }) => {
2199                for (peer_id, mut addrs) in nodes {
2200                    // Make sure to not insert too many address for a single peer.
2201                    // While the .
2202                    if addrs.len() >= 10 {
2203                        addrs.truncate(10);
2204                    }
2205
2206                    let mut valid_addrs = Vec::with_capacity(addrs.len());
2207                    for addr in addrs {
2208                        match Multiaddr::from_bytes(addr) {
2209                            Ok(a) => {
2210                                if platform::address_parse::multiaddr_to_address(&a)
2211                                    .ok()
2212                                    .map_or(false, |addr| {
2213                                        task.platform.supports_connection_type((&addr).into())
2214                                    })
2215                                {
2216                                    valid_addrs.push(a)
2217                                } else {
2218                                    log!(
2219                                        &task.platform,
2220                                        Debug,
2221                                        "network",
2222                                        "discovered-address-not-supported",
2223                                        chain = &task.network[chain_id].log_name,
2224                                        peer_id,
2225                                        addr = &a,
2226                                        obtained_from = requestee_peer_id
2227                                    );
2228                                }
2229                            }
2230                            Err((error, addr)) => {
2231                                log!(
2232                                    &task.platform,
2233                                    Debug,
2234                                    "network",
2235                                    "discovered-address-invalid",
2236                                    chain = &task.network[chain_id].log_name,
2237                                    peer_id,
2238                                    error,
2239                                    addr = hex::encode(&addr),
2240                                    obtained_from = requestee_peer_id
2241                                );
2242                            }
2243                        }
2244                    }
2245
2246                    if !valid_addrs.is_empty() {
2247                        // Note that we must call this function before `insert_address`,
2248                        // as documented in `basic_peering_strategy`.
2249                        let insert_outcome =
2250                            task.peering_strategy
2251                                .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2252
2253                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2254                            peer_removed,
2255                        } = insert_outcome
2256                        {
2257                            if let Some(peer_removed) = peer_removed {
2258                                log!(
2259                                    &task.platform,
2260                                    Debug,
2261                                    "network",
2262                                    "peer-purged-from-address-book",
2263                                    chain = &task.network[chain_id].log_name,
2264                                    peer_id = peer_removed,
2265                                );
2266                            }
2267
2268                            log!(
2269                                &task.platform,
2270                                Debug,
2271                                "network",
2272                                "peer-discovered",
2273                                chain = &task.network[chain_id].log_name,
2274                                peer_id,
2275                                addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), // TODO: better formatting?
2276                                obtained_from = requestee_peer_id
2277                            );
2278                        }
2279                    }
2280
2281                    for addr in valid_addrs {
2282                        let _insert_result =
2283                            task.peering_strategy
2284                                .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant
2285                        debug_assert!(!matches!(
2286                            _insert_result,
2287                            basic_peering_strategy::InsertAddressResult::UnknownPeer
2288                        ));
2289                    }
2290                }
2291            }
2292            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2293                peer_id,
2294                chain_id,
2295                response: service::RequestResult::KademliaFindNode(Err(error)),
2296                ..
2297            }) => {
2298                log!(
2299                    &task.platform,
2300                    Debug,
2301                    "network",
2302                    "discovery-find-node-error",
2303                    chain = &task.network[chain_id].log_name,
2304                    ?error,
2305                    find_node_target = peer_id,
2306                );
2307
2308                // No error is printed if the request fails due to a benign networking error such
2309                // as an unresponsive peer.
2310                match error {
2311                    service::KademliaFindNodeError::RequestFailed(err)
2312                        if !err.is_protocol_error() => {}
2313
2314                    service::KademliaFindNodeError::RequestFailed(
2315                        service::RequestError::Substream(
2316                            connection::established::RequestError::ProtocolNotAvailable,
2317                        ),
2318                    ) => {
2319                        // TODO: remove this warning in a long time
2320                        log!(
2321                            &task.platform,
2322                            Warn,
2323                            "network",
2324                            format!(
2325                                "Problem during discovery on {}: protocol not available. \
2326                                This might indicate that the version of Substrate used by \
2327                                the chain doesn't include \
2328                                <https://github.com/paritytech/substrate/pull/12545>.",
2329                                &task.network[chain_id].log_name
2330                            )
2331                        );
2332                    }
2333                    _ => {
2334                        log!(
2335                            &task.platform,
2336                            Debug,
2337                            "network",
2338                            format!(
2339                                "Problem during discovery on {}: {}",
2340                                &task.network[chain_id].log_name, error
2341                            )
2342                        );
2343                    }
2344                }
2345            }
2346            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
2347                // We never start any other kind of requests.
2348                unreachable!()
2349            }
2350            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
2351                peer_id,
2352                chain_id,
2353                kind: service::GossipKind::ConsensusTransactions,
2354            }) => {
2355                // The networking state machine guarantees that `GossipInDesired`
2356                // can't happen if we are already opening an out slot, which we do
2357                // immediately.
2358                // TODO: add debug_assert! ^
2359                if task
2360                    .network
2361                    .opened_gossip_undesired_by_chain(chain_id)
2362                    .count()
2363                    < 4
2364                {
2365                    log!(
2366                        &task.platform,
2367                        Debug,
2368                        "network",
2369                        "gossip-in-request",
2370                        chain = &task.network[chain_id].log_name,
2371                        peer_id,
2372                        outcome = "accepted"
2373                    );
2374                    task.network
2375                        .gossip_open(
2376                            chain_id,
2377                            &peer_id,
2378                            service::GossipKind::ConsensusTransactions,
2379                        )
2380                        .unwrap();
2381                } else {
2382                    log!(
2383                        &task.platform,
2384                        Debug,
2385                        "network",
2386                        "gossip-in-request",
2387                        chain = &task.network[chain_id].log_name,
2388                        peer_id,
2389                        outcome = "rejected",
2390                    );
2391                    task.network
2392                        .gossip_close(
2393                            chain_id,
2394                            &peer_id,
2395                            service::GossipKind::ConsensusTransactions,
2396                        )
2397                        .unwrap();
2398                }
2399            }
2400            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
2401                // Can't happen as we already instantaneously accept or reject gossip in requests.
2402                unreachable!()
2403            }
2404            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2405                peer_id,
2406                substream_id,
2407            }) => {
2408                log!(
2409                    &task.platform,
2410                    Debug,
2411                    "network",
2412                    "identify-request-received",
2413                    peer_id,
2414                );
2415                task.network
2416                    .respond_identify(substream_id, &task.identify_agent_version);
2417            }
2418            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
2419            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
2420                // All incoming requests are immediately answered.
2421                unreachable!()
2422            }
2423            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2424                chain_id,
2425                peer_id,
2426                state,
2427            }) => {
2428                log!(
2429                    &task.platform,
2430                    Debug,
2431                    "network",
2432                    "grandpa-neighbor-packet-received",
2433                    chain = &task.network[chain_id].log_name,
2434                    peer_id,
2435                    round_number = state.round_number,
2436                    set_id = state.set_id,
2437                    commit_finalized_height = state.commit_finalized_height,
2438                );
2439
2440                task.open_gossip_links
2441                    .get_mut(&(chain_id, peer_id.clone()))
2442                    .unwrap()
2443                    .finalized_block_height = Some(state.commit_finalized_height);
2444
2445                debug_assert!(task.event_pending_send.is_none());
2446                task.event_pending_send = Some((
2447                    chain_id,
2448                    Event::GrandpaNeighborPacket {
2449                        peer_id,
2450                        finalized_block_height: state.commit_finalized_height,
2451                    },
2452                ));
2453            }
2454            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2455                chain_id,
2456                peer_id,
2457                message,
2458            }) => {
2459                log!(
2460                    &task.platform,
2461                    Debug,
2462                    "network",
2463                    "grandpa-commit-message-received",
2464                    chain = &task.network[chain_id].log_name,
2465                    peer_id,
2466                    target_block_hash = HashDisplay(message.decode().target_hash),
2467                );
2468
2469                debug_assert!(task.event_pending_send.is_none());
2470                task.event_pending_send =
2471                    Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
2472            }
2473            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2474                // TODO: handle properly?
2475                log!(
2476                    &task.platform,
2477                    Warn,
2478                    "network",
2479                    "protocol-error",
2480                    peer_id,
2481                    ?error
2482                );
2483
2484                // TODO: disconnect peer
2485            }
2486            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2487                task.peering_strategy.assign_slot(&chain_id, &peer_id);
2488
2489                log!(
2490                    &task.platform,
2491                    Debug,
2492                    "network",
2493                    "slot-assigned",
2494                    chain = &task.network[chain_id].log_name,
2495                    peer_id
2496                );
2497
2498                task.network.gossip_insert_desired(
2499                    chain_id,
2500                    peer_id,
2501                    service::GossipKind::ConsensusTransactions,
2502                );
2503            }
2504            WakeUpReason::NextRecentConnectionRestore => {
2505                task.num_recent_connection_opening =
2506                    task.num_recent_connection_opening.saturating_sub(1);
2507            }
2508            WakeUpReason::CanStartConnect(expected_peer_id) => {
2509                let Some(multiaddr) = task
2510                    .peering_strategy
2511                    .pick_address_and_add_connection(&expected_peer_id)
2512                else {
2513                    // There is no address for that peer in the address book.
2514                    task.network.gossip_remove_desired_all(
2515                        &expected_peer_id,
2516                        service::GossipKind::ConsensusTransactions,
2517                    );
2518                    let ban_duration = Duration::from_secs(10);
2519                    for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
2520                        &expected_peer_id,
2521                        task.platform.now() + ban_duration,
2522                    ) {
2523                        if matches!(
2524                            what_happened,
2525                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2526                        ) {
2527                            log!(
2528                                &task.platform,
2529                                Debug,
2530                                "network",
2531                                "slot-unassigned",
2532                                chain = &task.network[chain_id].log_name,
2533                                peer_id = expected_peer_id,
2534                                ?ban_duration,
2535                                reason = "no-address"
2536                            );
2537                        }
2538                    }
2539                    continue;
2540                };
2541
2542                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2543                    Ok(a) => a,
2544                    Err((multiaddr::FromBytesError, addr)) => {
2545                        // Address is in an invalid format.
2546                        let _was_in = task
2547                            .peering_strategy
2548                            .decrease_address_connections_and_remove_if_zero(
2549                                &expected_peer_id,
2550                                &addr,
2551                            );
2552                        debug_assert!(_was_in.is_ok());
2553                        continue;
2554                    }
2555                };
2556
2557                let address = address_parse::multiaddr_to_address(&multiaddr)
2558                    .ok()
2559                    .filter(|addr| {
2560                        task.platform.supports_connection_type(match &addr {
2561                            address_parse::AddressOrMultiStreamAddress::Address(addr) => {
2562                                From::from(addr)
2563                            }
2564                            address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2565                                addr,
2566                            ) => From::from(addr),
2567                        })
2568                    });
2569
2570                let Some(address) = address else {
2571                    // Address is in an invalid format or isn't supported by the platform.
2572                    let _was_in = task
2573                        .peering_strategy
2574                        .decrease_address_connections_and_remove_if_zero(
2575                            &expected_peer_id,
2576                            multiaddr.as_ref(),
2577                        );
2578                    debug_assert!(_was_in.is_ok());
2579                    continue;
2580                };
2581
2582                // Each connection has its own individual Noise key.
2583                let noise_key = {
2584                    let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
2585                    task.platform.fill_random_bytes(&mut *noise_static_key);
2586                    let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
2587                    task.platform.fill_random_bytes(&mut *libp2p_key);
2588                    connection::NoiseKey::new(&libp2p_key, &noise_static_key)
2589                };
2590
2591                log!(
2592                    &task.platform,
2593                    Debug,
2594                    "network",
2595                    "connection-started",
2596                    expected_peer_id,
2597                    remote_addr = multiaddr,
2598                    local_peer_id =
2599                        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
2600                            .into_peer_id(),
2601                );
2602
2603                task.num_recent_connection_opening += 1;
2604
2605                let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
2606                    async_channel::bounded(8);
2607                let task_name = format!("connection-{}", multiaddr);
2608
2609                match address {
2610                    address_parse::AddressOrMultiStreamAddress::Address(address) => {
2611                        // As documented in the `PlatformRef` trait, `connect_stream` must
2612                        // return as soon as possible.
2613                        let connection = task.platform.connect_stream(address).await;
2614
2615                        let (connection_id, connection_task) =
2616                            task.network.add_single_stream_connection(
2617                                task.platform.now(),
2618                                service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2619                                    is_initiator: true,
2620                                    noise_key: &noise_key,
2621                                },
2622                                multiaddr.clone().into_bytes(),
2623                                Some(expected_peer_id.clone()),
2624                                coordinator_to_connection_tx,
2625                            );
2626
2627                        task.platform.spawn_task(
2628                            task_name.into(),
2629                            tasks::single_stream_connection_task::<TPlat>(
2630                                connection,
2631                                multiaddr.to_string(),
2632                                task.platform.clone(),
2633                                connection_id,
2634                                connection_task,
2635                                coordinator_to_connection_rx,
2636                                task.tasks_messages_tx.clone(),
2637                            ),
2638                        );
2639                    }
2640                    address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2641                        platform::MultiStreamAddress::WebRtc {
2642                            ip,
2643                            port,
2644                            remote_certificate_sha256,
2645                        },
2646                    ) => {
2647                        // We need to know the local TLS certificate in order to insert the
2648                        // connection, and as such we need to call `connect_multistream` here.
2649                        // As documented in the `PlatformRef` trait, `connect_multistream` must
2650                        // return as soon as possible.
2651                        let connection = task
2652                            .platform
2653                            .connect_multistream(platform::MultiStreamAddress::WebRtc {
2654                                ip,
2655                                port,
2656                                remote_certificate_sha256,
2657                            })
2658                            .await;
2659
2660                        // Convert the SHA256 hashes into multihashes.
2661                        let local_tls_certificate_multihash = [18u8, 32]
2662                            .into_iter()
2663                            .chain(connection.local_tls_certificate_sha256.into_iter())
2664                            .collect();
2665                        let remote_tls_certificate_multihash = [18u8, 32]
2666                            .into_iter()
2667                            .chain(remote_certificate_sha256.iter().copied())
2668                            .collect();
2669
2670                        let (connection_id, connection_task) =
2671                            task.network.add_multi_stream_connection(
2672                                task.platform.now(),
2673                                service::MultiStreamHandshakeKind::WebRtc {
2674                                    is_initiator: true,
2675                                    local_tls_certificate_multihash,
2676                                    remote_tls_certificate_multihash,
2677                                    noise_key: &noise_key,
2678                                },
2679                                multiaddr.clone().into_bytes(),
2680                                Some(expected_peer_id.clone()),
2681                                coordinator_to_connection_tx,
2682                            );
2683
2684                        task.platform.spawn_task(
2685                            task_name.into(),
2686                            tasks::webrtc_multi_stream_connection_task::<TPlat>(
2687                                connection.connection,
2688                                multiaddr.to_string(),
2689                                task.platform.clone(),
2690                                connection_id,
2691                                connection_task,
2692                                coordinator_to_connection_rx,
2693                                task.tasks_messages_tx.clone(),
2694                            ),
2695                        );
2696                    }
2697                }
2698            }
2699            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2700                task.network
2701                    .gossip_open(
2702                        chain_id,
2703                        &peer_id,
2704                        service::GossipKind::ConsensusTransactions,
2705                    )
2706                    .unwrap();
2707
2708                log!(
2709                    &task.platform,
2710                    Debug,
2711                    "network",
2712                    "gossip-open-start",
2713                    chain = &task.network[chain_id].log_name,
2714                    peer_id,
2715                );
2716            }
2717            WakeUpReason::MessageToConnection {
2718                connection_id,
2719                message,
2720            } => {
2721                // Note that it is critical for the sending to not take too long here, in order to
2722                // not block the process of the network service.
2723                // In particular, if sending the message to the connection is blocked due to
2724                // sending a message on the connection-to-coordinator channel, this will result
2725                // in a deadlock.
2726                // For this reason, the connection task is always ready to immediately accept a
2727                // message on the coordinator-to-connection channel.
2728                let _send_result = task.network[connection_id].send(message).await;
2729                debug_assert!(_send_result.is_ok());
2730            }
2731        }
2732    }
2733}