smoldot_light/
network_service.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18//! Background network service.
19//!
20//! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes.
21//! Importantly, its design is oriented towards the particular use case of the light client.
22//!
23//! The [`NetworkService`] spawns one background task (using [`PlatformRef::spawn_task`]) for
24//! each active connection.
25//!
26//! The objective of the [`NetworkService`] in general is to try stay connected as much as
27//! possible to the nodes of the peer-to-peer network of the chain, and maintain open substreams
28//! with them in order to send out requests (e.g. block requests) and notifications (e.g. block
29//! announces).
30//!
31//! Connectivity to the network is performed in the background as an implementation detail of
32//! the service. The public API only allows emitting requests and notifications towards the
33//! already-connected nodes.
34//!
35//! After a [`NetworkService`] is created, one can add chains using [`NetworkService::add_chain`].
36//! If all references to a [`NetworkServiceChain`] are destroyed, the chain is automatically
37//! purged.
38//!
39//! An important part of the API is the list of channel receivers of [`Event`] returned by
40//! [`NetworkServiceChain::subscribe`]. These channels inform the foreground about updates to the
41//! network connectivity.
42
43use crate::{
44    log,
45    platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49    borrow::ToOwned as _,
50    boxed::Box,
51    collections::BTreeMap,
52    format,
53    string::{String, ToString as _},
54    sync::Arc,
55    vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65    header,
66    informant::{BytesDisplay, HashDisplay},
67    libp2p::{
68        connection,
69        multiaddr::{self, Multiaddr},
70        peer_id,
71    },
72    network::{basic_peering_strategy, codec, service},
73};
74
75pub use codec::{CallProofRequestConfig, Role};
76pub use service::{ChainId, EncodedMerkleProof, PeerId, QueueNotificationError};
77
78mod tasks;
79
80/// Configuration for a [`NetworkService`].
81pub struct Config<TPlat> {
82    /// Access to the platform's capabilities.
83    pub platform: TPlat,
84
85    /// Value sent back for the agent version when receiving an identification request.
86    pub identify_agent_version: String,
87
88    /// Capacity to allocate for the list of chains.
89    pub chains_capacity: usize,
90
91    /// Maximum number of connections that the service can open simultaneously. After this value
92    /// has been reached, a new connection can be opened after each
93    /// [`Config::connections_open_pool_restore_delay`].
94    pub connections_open_pool_size: u32,
95
96    /// Delay after which the service can open a new connection.
97    /// The delay is cumulative. If no connection has been opened for example for twice this
98    /// duration, then two connections can be opened at the same time, up to a maximum of
99    /// [`Config::connections_open_pool_size`].
100    pub connections_open_pool_restore_delay: Duration,
101}
102
103/// See [`NetworkService::add_chain`].
104///
105/// Note that this configuration is intentionally missing a field containing the bootstrap
106/// nodes of the chain. Bootstrap nodes are supposed to be added afterwards by calling
107/// [`NetworkServiceChain::discover`].
108pub struct ConfigChain {
109    /// Name of the chain, for logging purposes.
110    pub log_name: String,
111
112    /// Number of "out slots" of this chain. We establish simultaneously gossip substreams up to
113    /// this number of peers.
114    pub num_out_slots: usize,
115
116    /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether
117    /// the chains match.
118    ///
119    /// > **Note**: Be aware that this *must* be the *genesis* block, not any block known to be
120    /// >           in the chain.
121    pub genesis_block_hash: [u8; 32],
122
123    /// Number and hash of the current best block. Can later be updated with
124    /// [`NetworkServiceChain::set_local_best_block`].
125    pub best_block: (u64, [u8; 32]),
126
127    /// Optional identifier to insert into the networking protocol names. Used to differentiate
128    /// between chains with the same genesis hash.
129    pub fork_id: Option<String>,
130
131    /// Number of bytes of the block number in the networking protocol.
132    pub block_number_bytes: usize,
133
134    /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the
135    /// number of the finalized block at the time of the initialization.
136    pub grandpa_protocol_finalized_block_height: Option<u64>,
137}
138
139pub struct NetworkService<TPlat: PlatformRef> {
140    /// Channel connected to the background service.
141    messages_tx: async_channel::Sender<ToBackground<TPlat>>,
142
143    /// See [`Config::platform`].
144    platform: TPlat,
145}
146
147impl<TPlat: PlatformRef> NetworkService<TPlat> {
148    /// Initializes the network service with the given configuration.
149    pub fn new(config: Config<TPlat>) -> Arc<Self> {
150        let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
151
152        let network = service::ChainNetwork::new(service::Config {
153            chains_capacity: config.chains_capacity,
154            connections_capacity: 32,
155            handshake_timeout: Duration::from_secs(8),
156            randomness_seed: {
157                let mut seed = [0; 32];
158                config.platform.fill_random_bytes(&mut seed);
159                seed
160            },
161        });
162
163        // Spawn main task that processes the network service.
164        let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
165        let task = Box::pin(background_task(BackgroundTask {
166            randomness: rand_chacha::ChaCha20Rng::from_seed({
167                let mut seed = [0; 32];
168                config.platform.fill_random_bytes(&mut seed);
169                seed
170            }),
171            identify_agent_version: config.identify_agent_version,
172            tasks_messages_tx,
173            tasks_messages_rx: Box::pin(tasks_messages_rx),
174            peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
175                basic_peering_strategy::Config {
176                    randomness_seed: {
177                        let mut seed = [0; 32];
178                        config.platform.fill_random_bytes(&mut seed);
179                        seed
180                    },
181                    peers_capacity: 50, // TODO: ?
182                    chains_capacity: config.chains_capacity,
183                },
184            ),
185            network,
186            connections_open_pool_size: config.connections_open_pool_size,
187            connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
188            num_recent_connection_opening: 0,
189            next_recent_connection_restore: None,
190            platform: config.platform.clone(),
191            open_gossip_links: BTreeMap::new(),
192            event_pending_send: None,
193            event_senders: either::Left(Vec::new()),
194            pending_new_subscriptions: Vec::new(),
195            important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
196            main_messages_rx: Box::pin(main_messages_rx),
197            messages_rx: stream::SelectAll::new(),
198            blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
199            grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
200            storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
201            call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
202            child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
203            chains_by_next_discovery: BTreeMap::new(),
204        }));
205
206        config.platform.spawn_task("network-service".into(), {
207            let platform = config.platform.clone();
208            async move {
209                task.await;
210                log!(&platform, Debug, "network", "shutdown");
211            }
212        });
213
214        Arc::new(NetworkService {
215            messages_tx: main_messages_tx,
216            platform: config.platform,
217        })
218    }
219
220    /// Adds a chain to the list of chains that the network service connects to.
221    ///
222    /// Returns an object representing the chain and that allows interacting with it. If all
223    /// references to [`NetworkServiceChain`] are destroyed, the network service automatically
224    /// purges that chain.
225    pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
226        let (messages_tx, messages_rx) = async_channel::bounded(32);
227
228        // TODO: this code is hacky because we don't want to make `add_chain` async at the moment, because it's not convenient for lib.rs
229        self.platform.spawn_task("add-chain-message-send".into(), {
230            let config = service::ChainConfig {
231                grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
232                    |commit_finalized_height| service::GrandpaState {
233                        commit_finalized_height,
234                        round_number: 1,
235                        set_id: 0,
236                    },
237                ),
238                fork_id: config.fork_id.clone(),
239                block_number_bytes: config.block_number_bytes,
240                best_hash: config.best_block.1,
241                best_number: config.best_block.0,
242                genesis_hash: config.genesis_block_hash,
243                role: Role::Light,
244                allow_inbound_block_requests: false,
245                user_data: Chain {
246                    log_name: config.log_name,
247                    block_number_bytes: config.block_number_bytes,
248                    num_out_slots: config.num_out_slots,
249                    num_references: NonZero::<usize>::new(1).unwrap(),
250                    next_discovery_period: Duration::from_secs(2),
251                    next_discovery_when: self.platform.now(),
252                },
253            };
254
255            let messages_tx = self.messages_tx.clone();
256            async move {
257                let _ = messages_tx
258                    .send(ToBackground::AddChain {
259                        messages_rx,
260                        config,
261                    })
262                    .await;
263            }
264        });
265
266        Arc::new(NetworkServiceChain {
267            _keep_alive_messages_tx: self.messages_tx.clone(),
268            messages_tx,
269            marker: core::marker::PhantomData,
270        })
271    }
272}
273
274pub struct NetworkServiceChain<TPlat: PlatformRef> {
275    /// Copy of [`NetworkService::messages_tx`]. Used in order to maintain the network service
276    /// background task alive.
277    _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
278
279    /// Channel to send messages to the background task.
280    messages_tx: async_channel::Sender<ToBackgroundChain>,
281
282    /// Dummy to hold the `TPlat` type.
283    marker: core::marker::PhantomData<TPlat>,
284}
285
286/// Severity of a ban. See [`NetworkServiceChain::ban_and_disconnect`].
287#[derive(Debug, Copy, Clone, PartialEq, Eq)]
288pub enum BanSeverity {
289    Low,
290    High,
291}
292
293impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
294    /// Subscribes to the networking events that happen on the given chain.
295    ///
296    /// Calling this function returns a `Receiver` that receives events about the chain.
297    /// The new channel will immediately receive events about all the existing connections, so
298    /// that it is able to maintain a coherent view of the network.
299    ///
300    /// Note that this function is `async`, but it should return very quickly.
301    ///
302    /// The `Receiver` **must** be polled continuously. When the channel is full, the networking
303    /// connections will be back-pressured until the channel isn't full anymore.
304    ///
305    /// The `Receiver` never yields `None` unless the [`NetworkService`] crashes or is destroyed.
306    /// If `None` is yielded and the [`NetworkService`] is still alive, you should call
307    /// [`NetworkServiceChain::subscribe`] again to obtain a new `Receiver`.
308    ///
309    /// # Panic
310    ///
311    /// Panics if the given [`ChainId`] is invalid.
312    ///
313    // TODO: consider not killing the background until the channel is destroyed, as that would be a more sensical behaviour
314    pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
315        let (tx, rx) = async_channel::bounded(128);
316
317        self.messages_tx
318            .send(ToBackgroundChain::Subscribe { sender: tx })
319            .await
320            .unwrap();
321
322        rx
323    }
324
325    /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be
326    /// generated. Prevents a new gossip link with the same peer from being reopened for a
327    /// little while.
328    ///
329    /// `reason` is a human-readable string printed in the logs.
330    ///
331    /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the
332    /// reconnection was already happening as the call to this function is still being processed.
333    /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other
334    /// words, this function guarantees that we will be disconnected in the future rather than
335    /// guarantees that we will disconnect.
336    pub async fn ban_and_disconnect(
337        &self,
338        peer_id: PeerId,
339        severity: BanSeverity,
340        reason: &'static str,
341    ) {
342        let _ = self
343            .messages_tx
344            .send(ToBackgroundChain::DisconnectAndBan {
345                peer_id,
346                severity,
347                reason,
348            })
349            .await;
350    }
351
352    /// Sends a blocks request to the given peer.
353    // TODO: more docs
354    pub async fn blocks_request(
355        self: Arc<Self>,
356        target: PeerId,
357        config: codec::BlocksRequestConfig,
358        timeout: Duration,
359    ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
360        let (tx, rx) = oneshot::channel();
361
362        self.messages_tx
363            .send(ToBackgroundChain::StartBlocksRequest {
364                target: target.clone(),
365                config,
366                timeout,
367                result: tx,
368            })
369            .await
370            .unwrap();
371
372        rx.await.unwrap()
373    }
374
375    /// Sends a grandpa warp sync request to the given peer.
376    // TODO: more docs
377    pub async fn grandpa_warp_sync_request(
378        self: Arc<Self>,
379        target: PeerId,
380        begin_hash: [u8; 32],
381        timeout: Duration,
382    ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
383        let (tx, rx) = oneshot::channel();
384
385        self.messages_tx
386            .send(ToBackgroundChain::StartWarpSyncRequest {
387                target: target.clone(),
388                begin_hash,
389                timeout,
390                result: tx,
391            })
392            .await
393            .unwrap();
394
395        rx.await.unwrap()
396    }
397
398    pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
399        self.messages_tx
400            .send(ToBackgroundChain::SetLocalBestBlock {
401                best_hash,
402                best_number,
403            })
404            .await
405            .unwrap();
406    }
407
408    pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
409        self.messages_tx
410            .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
411            .await
412            .unwrap();
413    }
414
415    /// Sends a storage proof request to the given peer.
416    // TODO: more docs
417    pub async fn storage_proof_request(
418        self: Arc<Self>,
419        target: PeerId, // TODO: takes by value because of futures longevity issue
420        config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
421        timeout: Duration,
422    ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
423        let (tx, rx) = oneshot::channel();
424
425        self.messages_tx
426            .send(ToBackgroundChain::StartStorageProofRequest {
427                target: target.clone(),
428                config: codec::StorageProofRequestConfig {
429                    block_hash: config.block_hash,
430                    keys: config
431                        .keys
432                        .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead
433                        .collect::<Vec<_>>()
434                        .into_iter(),
435                },
436                timeout,
437                result: tx,
438            })
439            .await
440            .unwrap();
441
442        rx.await.unwrap()
443    }
444
445    /// Sends a call proof request to the given peer.
446    ///
447    /// See also [`NetworkServiceChain::call_proof_request`].
448    // TODO: more docs
449    pub async fn call_proof_request(
450        self: Arc<Self>,
451        target: PeerId, // TODO: takes by value because of futures longevity issue
452        config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
453        timeout: Duration,
454    ) -> Result<EncodedMerkleProof, CallProofRequestError> {
455        let (tx, rx) = oneshot::channel();
456
457        self.messages_tx
458            .send(ToBackgroundChain::StartCallProofRequest {
459                target: target.clone(),
460                config: codec::CallProofRequestConfig {
461                    block_hash: config.block_hash,
462                    method: config.method.into_owned().into(),
463                    parameter_vectored: config
464                        .parameter_vectored
465                        .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead
466                        .collect::<Vec<_>>()
467                        .into_iter(),
468                },
469                timeout,
470                result: tx,
471            })
472            .await
473            .unwrap();
474
475        rx.await.unwrap()
476    }
477
478    /// Sends a child storage proof request to the given peer.
479    pub async fn child_storage_proof_request(
480        self: Arc<Self>,
481        target: PeerId,
482        config: codec::ChildStorageProofRequestConfig<
483            impl AsRef<[u8]> + Clone,
484            impl Iterator<Item = impl AsRef<[u8]> + Clone>,
485        >,
486        timeout: Duration,
487    ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
488        let (tx, rx) = oneshot::channel();
489
490        self.messages_tx
491            .send(ToBackgroundChain::StartChildStorageProofRequest {
492                target: target.clone(),
493                config: ChildStorageProofRequestConfigOwned {
494                    block_hash: config.block_hash,
495                    child_trie: config.child_trie.as_ref().to_vec(),
496                    keys: config
497                        .keys
498                        .map(|key| key.as_ref().to_vec())
499                        .collect::<Vec<_>>(),
500                },
501                timeout,
502                result: tx,
503            })
504            .await
505            .unwrap();
506
507        rx.await.unwrap()
508    }
509
510    /// Announces transaction to the peers we are connected to.
511    ///
512    /// Returns a list of peers that we have sent the transaction to. Can return an empty `Vec`
513    /// if we didn't send the transaction to any peer.
514    ///
515    /// Note that the remote doesn't confirm that it has received the transaction. Because
516    /// networking is inherently unreliable, successfully sending a transaction to a peer doesn't
517    /// necessarily mean that the remote has received it. In practice, however, the likelihood of
518    /// a transaction not being received are extremely low. This can be considered as known flaw.
519    pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
520        let (tx, rx) = oneshot::channel();
521
522        self.messages_tx
523            .send(ToBackgroundChain::AnnounceTransaction {
524                transaction: transaction.to_vec(), // TODO: ovheread
525                result: tx,
526            })
527            .await
528            .unwrap();
529
530        rx.await.unwrap()
531    }
532
533    /// See [`service::ChainNetwork::gossip_send_block_announce`].
534    pub async fn send_block_announce(
535        self: Arc<Self>,
536        target: &PeerId,
537        scale_encoded_header: &[u8],
538        is_best: bool,
539    ) -> Result<(), QueueNotificationError> {
540        let (tx, rx) = oneshot::channel();
541
542        self.messages_tx
543            .send(ToBackgroundChain::SendBlockAnnounce {
544                target: target.clone(),                              // TODO: overhead
545                scale_encoded_header: scale_encoded_header.to_vec(), // TODO: overhead
546                is_best,
547                result: tx,
548            })
549            .await
550            .unwrap();
551
552        rx.await.unwrap()
553    }
554
555    /// Marks the given peers as belonging to the given chain, and adds some addresses to these
556    /// peers to the address book.
557    ///
558    /// The `important_nodes` parameter indicates whether these nodes are considered note-worthy
559    /// and should have additional logging.
560    pub async fn discover(
561        &self,
562        list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
563        important_nodes: bool,
564    ) {
565        self.messages_tx
566            .send(ToBackgroundChain::Discover {
567                // TODO: overhead
568                list: list
569                    .into_iter()
570                    .map(|(peer_id, addrs)| {
571                        (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
572                    })
573                    .collect::<Vec<_>>()
574                    .into_iter(),
575                important_nodes,
576            })
577            .await
578            .unwrap();
579    }
580
581    /// Returns a list of nodes (their [`PeerId`] and multiaddresses) that we know are part of
582    /// the network.
583    ///
584    /// Nodes that are discovered might disappear over time. In other words, there is no guarantee
585    /// that a node that has been added through [`NetworkServiceChain::discover`] will later be
586    /// returned by [`NetworkServiceChain::discovered_nodes`].
587    pub async fn discovered_nodes(
588        &self,
589    ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
590        let (tx, rx) = oneshot::channel();
591
592        self.messages_tx
593            .send(ToBackgroundChain::DiscoveredNodes { result: tx })
594            .await
595            .unwrap();
596
597        rx.await
598            .unwrap()
599            .into_iter()
600            .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
601    }
602
603    /// Returns an iterator to the list of [`PeerId`]s that we have an established connection
604    /// with.
605    pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
606        let (tx, rx) = oneshot::channel();
607        self.messages_tx
608            .send(ToBackgroundChain::PeersList { result: tx })
609            .await
610            .unwrap();
611        rx.await.unwrap().into_iter()
612    }
613}
614
615/// Event that can happen on the network service.
616#[derive(Debug, Clone)]
617pub enum Event {
618    Connected {
619        peer_id: PeerId,
620        role: Role,
621        best_block_number: u64,
622        best_block_hash: [u8; 32],
623    },
624    Disconnected {
625        peer_id: PeerId,
626    },
627    BlockAnnounce {
628        peer_id: PeerId,
629        announce: service::EncodedBlockAnnounce,
630    },
631    GrandpaNeighborPacket {
632        peer_id: PeerId,
633        finalized_block_height: u64,
634    },
635    /// Received a GrandPa commit message from the network.
636    GrandpaCommitMessage {
637        peer_id: PeerId,
638        message: service::EncodedGrandpaCommitMessage,
639    },
640}
641
642/// Error returned by [`NetworkServiceChain::blocks_request`].
643#[derive(Debug, derive_more::Display, derive_more::Error)]
644pub enum BlocksRequestError {
645    /// No established connection with the target.
646    NoConnection,
647    /// Error during the request.
648    #[display("{_0}")]
649    Request(service::BlocksRequestError),
650}
651
652/// Error returned by [`NetworkServiceChain::grandpa_warp_sync_request`].
653#[derive(Debug, derive_more::Display, derive_more::Error)]
654pub enum WarpSyncRequestError {
655    /// No established connection with the target.
656    NoConnection,
657    /// Error during the request.
658    #[display("{_0}")]
659    Request(service::GrandpaWarpSyncRequestError),
660}
661
662/// Error returned by [`NetworkServiceChain::storage_proof_request`].
663#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
664pub enum StorageProofRequestError {
665    /// No established connection with the target.
666    NoConnection,
667    /// Storage proof request is too large and can't be sent.
668    RequestTooLarge,
669    /// Error during the request.
670    #[display("{_0}")]
671    Request(service::StorageProofRequestError),
672}
673
674/// Error returned by [`NetworkServiceChain::call_proof_request`].
675#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
676pub enum CallProofRequestError {
677    /// No established connection with the target.
678    NoConnection,
679    /// Call proof request is too large and can't be sent.
680    RequestTooLarge,
681    /// Error during the request.
682    #[display("{_0}")]
683    Request(service::CallProofRequestError),
684}
685
686impl CallProofRequestError {
687    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
688    /// issue.
689    pub fn is_network_problem(&self) -> bool {
690        match self {
691            CallProofRequestError::Request(err) => err.is_network_problem(),
692            CallProofRequestError::RequestTooLarge => false,
693            CallProofRequestError::NoConnection => true,
694        }
695    }
696}
697
698/// Error returned by [`NetworkServiceChain::child_storage_proof_request`].
699#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
700pub enum ChildStorageProofRequestError {
701    /// No established connection with the target.
702    NoConnection,
703    /// Child storage proof request is too large and can't be sent.
704    RequestTooLarge,
705    /// Error during the request.
706    #[display("{_0}")]
707    Request(service::StorageProofRequestError),
708}
709
710impl ChildStorageProofRequestError {
711    /// Returns `true` if this is caused by networking issues, as opposed to a consensus-related
712    /// issue.
713    pub fn is_network_problem(&self) -> bool {
714        match self {
715            ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
716            ChildStorageProofRequestError::RequestTooLarge => false,
717            ChildStorageProofRequestError::NoConnection => true,
718        }
719    }
720}
721
722/// Owned version of [`codec::ChildStorageProofRequestConfig`] for sending across channel.
723struct ChildStorageProofRequestConfigOwned {
724    block_hash: [u8; 32],
725    child_trie: Vec<u8>,
726    keys: Vec<Vec<u8>>,
727}
728
729enum ToBackground<TPlat: PlatformRef> {
730    AddChain {
731        messages_rx: async_channel::Receiver<ToBackgroundChain>,
732        config: service::ChainConfig<Chain<TPlat>>,
733    },
734}
735
736enum ToBackgroundChain {
737    RemoveChain,
738    Subscribe {
739        sender: async_channel::Sender<Event>,
740    },
741    DisconnectAndBan {
742        peer_id: PeerId,
743        severity: BanSeverity,
744        reason: &'static str,
745    },
746    // TODO: serialize the request before sending over channel
747    StartBlocksRequest {
748        target: PeerId, // TODO: takes by value because of future longevity issue
749        config: codec::BlocksRequestConfig,
750        timeout: Duration,
751        result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
752    },
753    // TODO: serialize the request before sending over channel
754    StartWarpSyncRequest {
755        target: PeerId,
756        begin_hash: [u8; 32],
757        timeout: Duration,
758        result:
759            oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
760    },
761    // TODO: serialize the request before sending over channel
762    StartStorageProofRequest {
763        target: PeerId,
764        config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
765        timeout: Duration,
766        result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
767    },
768    // TODO: serialize the request before sending over channel
769    StartCallProofRequest {
770        target: PeerId, // TODO: takes by value because of futures longevity issue
771        config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
772        timeout: Duration,
773        result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
774    },
775    // TODO: serialize the request before sending over channel
776    StartChildStorageProofRequest {
777        target: PeerId,
778        config: ChildStorageProofRequestConfigOwned,
779        timeout: Duration,
780        result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
781    },
782    SetLocalBestBlock {
783        best_hash: [u8; 32],
784        best_number: u64,
785    },
786    SetLocalGrandpaState {
787        grandpa_state: service::GrandpaState,
788    },
789    AnnounceTransaction {
790        transaction: Vec<u8>,
791        result: oneshot::Sender<Vec<PeerId>>,
792    },
793    SendBlockAnnounce {
794        target: PeerId,
795        scale_encoded_header: Vec<u8>,
796        is_best: bool,
797        result: oneshot::Sender<Result<(), QueueNotificationError>>,
798    },
799    Discover {
800        list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
801        important_nodes: bool,
802    },
803    DiscoveredNodes {
804        result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
805    },
806    PeersList {
807        result: oneshot::Sender<Vec<PeerId>>,
808    },
809}
810
811struct BackgroundTask<TPlat: PlatformRef> {
812    /// See [`Config::platform`].
813    platform: TPlat,
814
815    /// Random number generator.
816    randomness: rand_chacha::ChaCha20Rng,
817
818    /// Value provided through [`Config::identify_agent_version`].
819    identify_agent_version: String,
820
821    /// Channel to send messages to the background task.
822    tasks_messages_tx:
823        async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
824
825    /// Channel to receive messages destined to the background task.
826    tasks_messages_rx: Pin<
827        Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
828    >,
829
830    /// Data structure holding the entire state of the networking.
831    network: service::ChainNetwork<
832        Chain<TPlat>,
833        async_channel::Sender<service::CoordinatorToConnection>,
834        TPlat::Instant,
835    >,
836
837    /// All known peers and their addresses.
838    peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
839
840    /// See [`Config::connections_open_pool_size`].
841    connections_open_pool_size: u32,
842
843    /// See [`Config::connections_open_pool_restore_delay`].
844    connections_open_pool_restore_delay: Duration,
845
846    /// Every time a connection is opened, the value in this field is increased by one. After
847    /// [`BackgroundTask::next_recent_connection_restore`] has yielded, the value is reduced by
848    /// one.
849    num_recent_connection_opening: u32,
850
851    /// Delay after which [`BackgroundTask::num_recent_connection_opening`] is increased by one.
852    next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
853
854    /// List of all open gossip links.
855    // TODO: using this data structure unfortunately means that PeerIds are cloned a lot, maybe some user data in ChainNetwork is better? not sure
856    open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
857
858    /// List of nodes that are considered as important for logging purposes.
859    // TODO: should also detect whenever we fail to open a block announces substream with any of these peers
860    important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
861
862    /// Event about to be sent on the senders of [`BackgroundTask::event_senders`].
863    event_pending_send: Option<(ChainId, Event)>,
864
865    /// Sending events through the public API.
866    ///
867    /// Contains either senders, or a `Future` that is currently sending an event and will yield
868    /// the senders back once it is finished.
869    // TODO: sort by ChainId instead of using a Vec?
870    event_senders: either::Either<
871        Vec<(ChainId, async_channel::Sender<Event>)>,
872        Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
873    >,
874
875    /// Whenever [`NetworkServiceChain::subscribe`] is called, the new sender is added to this list.
876    /// Once [`BackgroundTask::event_senders`] is ready, we properly initialize these senders.
877    pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
878
879    main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
880
881    messages_rx:
882        stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
883
884    blocks_requests: HashMap<
885        service::SubstreamId,
886        oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
887        fnv::FnvBuildHasher,
888    >,
889
890    grandpa_warp_sync_requests: HashMap<
891        service::SubstreamId,
892        oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
893        fnv::FnvBuildHasher,
894    >,
895
896    storage_proof_requests: HashMap<
897        service::SubstreamId,
898        oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
899        fnv::FnvBuildHasher,
900    >,
901
902    call_proof_requests: HashMap<
903        service::SubstreamId,
904        oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
905        fnv::FnvBuildHasher,
906    >,
907
908    child_storage_proof_requests: HashMap<
909        service::SubstreamId,
910        oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
911        fnv::FnvBuildHasher,
912    >,
913
914    /// All chains, indexed by the value of [`Chain::next_discovery_when`].
915    chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
916}
917
918struct Chain<TPlat: PlatformRef> {
919    log_name: String,
920
921    // TODO: this field is a hack due to the fact that `add_chain` can't be `async`; should eventually be fixed after a lib.rs refactor
922    num_references: NonZero<usize>,
923
924    /// See [`ConfigChain::block_number_bytes`].
925    // TODO: redundant with ChainNetwork? since we might not need to know this in the future i'm reluctant to add a getter to ChainNetwork
926    block_number_bytes: usize,
927
928    /// See [`ConfigChain::num_out_slots`].
929    num_out_slots: usize,
930
931    /// When the next discovery should be started for this chain.
932    next_discovery_when: TPlat::Instant,
933
934    /// After [`Chain::next_discovery_when`] is reached, the following discovery happens after
935    /// the given duration.
936    next_discovery_period: Duration,
937}
938
939#[derive(Clone)]
940struct OpenGossipLinkState {
941    role: Role,
942    best_block_number: u64,
943    best_block_hash: [u8; 32],
944    /// `None` if unknown.
945    finalized_block_height: Option<u64>,
946}
947
948async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
949    loop {
950        // Yield at every loop in order to provide better tasks granularity.
951        futures_lite::future::yield_now().await;
952
953        enum WakeUpReason<TPlat: PlatformRef> {
954            ForegroundClosed,
955            Message(ToBackground<TPlat>),
956            MessageForChain(ChainId, ToBackgroundChain),
957            NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
958            CanAssignSlot(PeerId, ChainId),
959            NextRecentConnectionRestore,
960            CanStartConnect(PeerId),
961            CanOpenGossip(PeerId, ChainId),
962            MessageFromConnection {
963                connection_id: service::ConnectionId,
964                message: service::ConnectionToCoordinator,
965            },
966            MessageToConnection {
967                connection_id: service::ConnectionId,
968                message: service::CoordinatorToConnection,
969            },
970            EventSendersReady,
971            StartDiscovery(ChainId),
972        }
973
974        let wake_up_reason = {
975            let message_received = async {
976                task.main_messages_rx
977                    .next()
978                    .await
979                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
980            };
981            let message_for_chain_received = async {
982                // Note that when the last entry of `messages_rx` yields `None`, `messages_rx`
983                // itself will yield `None`. For this reason, we can't use
984                // `task.messages_rx.is_empty()` to determine whether `messages_rx` will
985                // yield `None`.
986                let Some((chain_id, message)) = task.messages_rx.next().await else {
987                    future::pending().await
988                };
989                WakeUpReason::MessageForChain(chain_id, message)
990            };
991            let message_from_task_received = async {
992                let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
993                WakeUpReason::MessageFromConnection {
994                    connection_id,
995                    message,
996                }
997            };
998            let service_event = async {
999                if let Some(event) = (task.event_pending_send.is_none()
1000                    && task.pending_new_subscriptions.is_empty())
1001                .then(|| task.network.next_event())
1002                .flatten()
1003                {
1004                    WakeUpReason::NetworkEvent(event)
1005                } else if let Some(start_connect) = {
1006                    let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1007                        .then(|| {
1008                            task.network
1009                                .unconnected_desired()
1010                                .choose(&mut task.randomness)
1011                                .cloned()
1012                        })
1013                        .flatten();
1014                    x
1015                } {
1016                    WakeUpReason::CanStartConnect(start_connect)
1017                } else if let Some((peer_id, chain_id)) = {
1018                    let x = task
1019                        .network
1020                        .connected_unopened_gossip_desired()
1021                        .choose(&mut task.randomness)
1022                        .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1023                    x
1024                } {
1025                    WakeUpReason::CanOpenGossip(peer_id, chain_id)
1026                } else if let Some((connection_id, message)) =
1027                    task.network.pull_message_to_connection()
1028                {
1029                    WakeUpReason::MessageToConnection {
1030                        connection_id,
1031                        message,
1032                    }
1033                } else {
1034                    'search: loop {
1035                        let mut earlier_unban = None;
1036
1037                        for chain_id in task.network.chains().collect::<Vec<_>>() {
1038                            if task.network.gossip_desired_num(
1039                                chain_id,
1040                                service::GossipKind::ConsensusTransactions,
1041                            ) >= task.network[chain_id].num_out_slots
1042                            {
1043                                continue;
1044                            }
1045
1046                            match task
1047                                .peering_strategy
1048                                .pick_assignable_peer(&chain_id, &task.platform.now())
1049                            {
1050                                basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1051                                    break 'search WakeUpReason::CanAssignSlot(
1052                                        peer_id.clone(),
1053                                        chain_id,
1054                                    );
1055                                }
1056                                basic_peering_strategy::AssignablePeer::AllPeersBanned {
1057                                    next_unban,
1058                                } => {
1059                                    if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1060                                        earlier_unban = Some(next_unban.clone());
1061                                    }
1062                                }
1063                                basic_peering_strategy::AssignablePeer::NoPeer => continue,
1064                            }
1065                        }
1066
1067                        if let Some(earlier_unban) = earlier_unban {
1068                            task.platform.sleep_until(earlier_unban).await;
1069                        } else {
1070                            future::pending::<()>().await;
1071                        }
1072                    }
1073                }
1074            };
1075            let next_recent_connection_restore = async {
1076                if task.num_recent_connection_opening != 0
1077                    && task.next_recent_connection_restore.is_none()
1078                {
1079                    task.next_recent_connection_restore = Some(Box::pin(
1080                        task.platform
1081                            .sleep(task.connections_open_pool_restore_delay),
1082                    ));
1083                }
1084                if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1085                    delay.await;
1086                    task.next_recent_connection_restore = None;
1087                    WakeUpReason::NextRecentConnectionRestore
1088                } else {
1089                    future::pending().await
1090                }
1091            };
1092            let finished_sending_event = async {
1093                if let either::Right(event_sending_future) = &mut task.event_senders {
1094                    let event_senders = event_sending_future.await;
1095                    task.event_senders = either::Left(event_senders);
1096                    WakeUpReason::EventSendersReady
1097                } else if task.event_pending_send.is_some()
1098                    || !task.pending_new_subscriptions.is_empty()
1099                {
1100                    WakeUpReason::EventSendersReady
1101                } else {
1102                    future::pending().await
1103                }
1104            };
1105            let start_discovery = async {
1106                let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1107                    future::pending().await
1108                };
1109                next_discovery.get_mut().await;
1110                let ((_, chain_id), _) = next_discovery.remove_entry();
1111                WakeUpReason::StartDiscovery(chain_id)
1112            };
1113
1114            message_for_chain_received
1115                .or(message_received)
1116                .or(message_from_task_received)
1117                .or(service_event)
1118                .or(next_recent_connection_restore)
1119                .or(finished_sending_event)
1120                .or(start_discovery)
1121                .await
1122        };
1123
1124        match wake_up_reason {
1125            WakeUpReason::ForegroundClosed => {
1126                // End the task.
1127                return;
1128            }
1129            WakeUpReason::Message(ToBackground::AddChain {
1130                messages_rx,
1131                config,
1132            }) => {
1133                // TODO: this is not a completely clean way of handling duplicate chains, because the existing chain might have a different best block and role and all ; also, multiple sync services will call set_best_block and set_finalized_block
1134                let chain_id = match task.network.add_chain(config) {
1135                    Ok(id) => id,
1136                    Err(service::AddChainError::Duplicate { existing_identical }) => {
1137                        task.network[existing_identical].num_references = task.network
1138                            [existing_identical]
1139                            .num_references
1140                            .checked_add(1)
1141                            .unwrap();
1142                        existing_identical
1143                    }
1144                };
1145
1146                task.chains_by_next_discovery.insert(
1147                    (task.network[chain_id].next_discovery_when.clone(), chain_id),
1148                    Box::pin(
1149                        task.platform
1150                            .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1151                    ),
1152                );
1153
1154                task.messages_rx
1155                    .push(Box::pin(
1156                        messages_rx
1157                            .map(move |msg| (chain_id, msg))
1158                            .chain(stream::once(future::ready((
1159                                chain_id,
1160                                ToBackgroundChain::RemoveChain,
1161                            )))),
1162                    ) as Pin<Box<_>>);
1163
1164                log!(
1165                    &task.platform,
1166                    Debug,
1167                    "network",
1168                    "chain-added",
1169                    id = task.network[chain_id].log_name
1170                );
1171            }
1172            WakeUpReason::EventSendersReady => {
1173                // Dispatch the pending event, if any, to the various senders.
1174
1175                // We made sure that the senders were ready before generating an event.
1176                let either::Left(event_senders) = &mut task.event_senders else {
1177                    unreachable!()
1178                };
1179
1180                if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1181                    task.event_pending_send.take()
1182                {
1183                    let mut event_senders = mem::take(event_senders);
1184                    task.event_senders = either::Right(Box::pin(async move {
1185                        // Elements in `event_senders` are removed one by one and inserted
1186                        // back if the channel is still open.
1187                        for index in (0..event_senders.len()).rev() {
1188                            let (event_sender_chain_id, event_sender) =
1189                                event_senders.swap_remove(index);
1190                            if event_sender_chain_id == event_to_dispatch_chain_id {
1191                                if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1192                                    continue;
1193                                }
1194                            }
1195                            event_senders.push((event_sender_chain_id, event_sender));
1196                        }
1197                        event_senders
1198                    }));
1199                } else if !task.pending_new_subscriptions.is_empty() {
1200                    let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1201                    let mut event_senders = mem::take(event_senders);
1202                    // TODO: cloning :-/
1203                    let open_gossip_links = task.open_gossip_links.clone();
1204                    task.event_senders = either::Right(Box::pin(async move {
1205                        for (chain_id, new_subscription) in pending_new_subscriptions {
1206                            for ((link_chain_id, peer_id), state) in &open_gossip_links {
1207                                // TODO: optimize? this is O(n) by chain
1208                                if *link_chain_id != chain_id {
1209                                    continue;
1210                                }
1211
1212                                let _ = new_subscription
1213                                    .send(Event::Connected {
1214                                        peer_id: peer_id.clone(),
1215                                        role: state.role,
1216                                        best_block_number: state.best_block_number,
1217                                        best_block_hash: state.best_block_hash,
1218                                    })
1219                                    .await;
1220
1221                                if let Some(finalized_block_height) = state.finalized_block_height {
1222                                    let _ = new_subscription
1223                                        .send(Event::GrandpaNeighborPacket {
1224                                            peer_id: peer_id.clone(),
1225                                            finalized_block_height,
1226                                        })
1227                                        .await;
1228                                }
1229                            }
1230
1231                            event_senders.push((chain_id, new_subscription));
1232                        }
1233
1234                        event_senders
1235                    }));
1236                }
1237            }
1238            WakeUpReason::MessageFromConnection {
1239                connection_id,
1240                message,
1241            } => {
1242                task.network
1243                    .inject_connection_message(connection_id, message);
1244            }
1245            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1246                if let Some(new_ref) =
1247                    NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1248                {
1249                    task.network[chain_id].num_references = new_ref;
1250                    continue;
1251                }
1252
1253                for peer_id in task
1254                    .network
1255                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1256                    .cloned()
1257                    .collect::<Vec<_>>()
1258                {
1259                    task.network
1260                        .gossip_close(
1261                            chain_id,
1262                            &peer_id,
1263                            service::GossipKind::ConsensusTransactions,
1264                        )
1265                        .unwrap();
1266
1267                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1268                    debug_assert!(_was_in.is_some());
1269                }
1270
1271                let _was_in = task
1272                    .chains_by_next_discovery
1273                    .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1274                debug_assert!(_was_in.is_some());
1275
1276                log!(
1277                    &task.platform,
1278                    Debug,
1279                    "network",
1280                    "chain-removed",
1281                    id = task.network[chain_id].log_name
1282                );
1283                task.network.remove_chain(chain_id).unwrap();
1284                task.peering_strategy.remove_chain_peers(&chain_id);
1285            }
1286            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1287                task.pending_new_subscriptions.push((chain_id, sender));
1288            }
1289            WakeUpReason::MessageForChain(
1290                chain_id,
1291                ToBackgroundChain::DisconnectAndBan {
1292                    peer_id,
1293                    severity,
1294                    reason,
1295                },
1296            ) => {
1297                let ban_duration = Duration::from_secs(match severity {
1298                    BanSeverity::Low => 10,
1299                    BanSeverity::High => 40,
1300                });
1301
1302                let had_slot = matches!(
1303                    task.peering_strategy.unassign_slot_and_ban(
1304                        &chain_id,
1305                        &peer_id,
1306                        task.platform.now() + ban_duration,
1307                    ),
1308                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1309                );
1310
1311                if had_slot {
1312                    log!(
1313                        &task.platform,
1314                        Debug,
1315                        "network",
1316                        "slot-unassigned",
1317                        chain = &task.network[chain_id].log_name,
1318                        peer_id,
1319                        ?ban_duration,
1320                        reason = "user-ban",
1321                        user_reason = reason
1322                    );
1323                    task.network.gossip_remove_desired(
1324                        chain_id,
1325                        &peer_id,
1326                        service::GossipKind::ConsensusTransactions,
1327                    );
1328                }
1329
1330                if task.network.gossip_is_connected(
1331                    chain_id,
1332                    &peer_id,
1333                    service::GossipKind::ConsensusTransactions,
1334                ) {
1335                    let _closed_result = task.network.gossip_close(
1336                        chain_id,
1337                        &peer_id,
1338                        service::GossipKind::ConsensusTransactions,
1339                    );
1340                    debug_assert!(_closed_result.is_ok());
1341
1342                    log!(
1343                        &task.platform,
1344                        Debug,
1345                        "network",
1346                        "gossip-closed",
1347                        chain = &task.network[chain_id].log_name,
1348                        peer_id,
1349                    );
1350
1351                    let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1352                    debug_assert!(_was_in.is_some());
1353
1354                    debug_assert!(task.event_pending_send.is_none());
1355                    task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1356                }
1357            }
1358            WakeUpReason::MessageForChain(
1359                chain_id,
1360                ToBackgroundChain::StartBlocksRequest {
1361                    target,
1362                    config,
1363                    timeout,
1364                    result,
1365                },
1366            ) => {
1367                match &config.start {
1368                    codec::BlocksRequestConfigStart::Hash(hash) => {
1369                        log!(
1370                            &task.platform,
1371                            Debug,
1372                            "network",
1373                            "blocks-request-started",
1374                            chain = task.network[chain_id].log_name, target,
1375                            start = HashDisplay(hash),
1376                            num = config.desired_count.get(),
1377                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1378                            header = ?config.fields.header, body = ?config.fields.body,
1379                            justifications = ?config.fields.justifications
1380                        );
1381                    }
1382                    codec::BlocksRequestConfigStart::Number(number) => {
1383                        log!(
1384                            &task.platform,
1385                            Debug,
1386                            "network",
1387                            "blocks-request-started",
1388                            chain = task.network[chain_id].log_name, target, start = number,
1389                            num = config.desired_count.get(),
1390                            descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1391                            header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1392                        );
1393                    }
1394                }
1395
1396                match task
1397                    .network
1398                    .start_blocks_request(&target, chain_id, config.clone(), timeout)
1399                {
1400                    Ok(substream_id) => {
1401                        task.blocks_requests.insert(substream_id, result);
1402                    }
1403                    Err(service::StartRequestError::NoConnection) => {
1404                        log!(
1405                            &task.platform,
1406                            Debug,
1407                            "network",
1408                            "blocks-request-error",
1409                            chain = task.network[chain_id].log_name,
1410                            target,
1411                            error = "NoConnection"
1412                        );
1413                        let _ = result.send(Err(BlocksRequestError::NoConnection));
1414                    }
1415                }
1416            }
1417            WakeUpReason::MessageForChain(
1418                chain_id,
1419                ToBackgroundChain::StartWarpSyncRequest {
1420                    target,
1421                    begin_hash,
1422                    timeout,
1423                    result,
1424                },
1425            ) => {
1426                log!(
1427                    &task.platform,
1428                    Debug,
1429                    "network",
1430                    "warp-sync-request-started",
1431                    chain = task.network[chain_id].log_name,
1432                    target,
1433                    start = HashDisplay(&begin_hash)
1434                );
1435
1436                match task
1437                    .network
1438                    .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1439                {
1440                    Ok(substream_id) => {
1441                        task.grandpa_warp_sync_requests.insert(substream_id, result);
1442                    }
1443                    Err(service::StartRequestError::NoConnection) => {
1444                        log!(
1445                            &task.platform,
1446                            Debug,
1447                            "network",
1448                            "warp-sync-request-error",
1449                            chain = task.network[chain_id].log_name,
1450                            target,
1451                            error = "NoConnection"
1452                        );
1453                        let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1454                    }
1455                }
1456            }
1457            WakeUpReason::MessageForChain(
1458                chain_id,
1459                ToBackgroundChain::StartStorageProofRequest {
1460                    target,
1461                    config,
1462                    timeout,
1463                    result,
1464                },
1465            ) => {
1466                log!(
1467                    &task.platform,
1468                    Debug,
1469                    "network",
1470                    "storage-proof-request-started",
1471                    chain = task.network[chain_id].log_name,
1472                    target,
1473                    block_hash = HashDisplay(&config.block_hash)
1474                );
1475
1476                match task.network.start_storage_proof_request(
1477                    &target,
1478                    chain_id,
1479                    config.clone(),
1480                    timeout,
1481                ) {
1482                    Ok(substream_id) => {
1483                        task.storage_proof_requests.insert(substream_id, result);
1484                    }
1485                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1486                        log!(
1487                            &task.platform,
1488                            Debug,
1489                            "network",
1490                            "storage-proof-request-error",
1491                            chain = task.network[chain_id].log_name,
1492                            target,
1493                            error = "NoConnection"
1494                        );
1495                        let _ = result.send(Err(StorageProofRequestError::NoConnection));
1496                    }
1497                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1498                        log!(
1499                            &task.platform,
1500                            Debug,
1501                            "network",
1502                            "storage-proof-request-error",
1503                            chain = task.network[chain_id].log_name,
1504                            target,
1505                            error = "RequestTooLarge"
1506                        );
1507                        let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1508                    }
1509                };
1510            }
1511            WakeUpReason::MessageForChain(
1512                chain_id,
1513                ToBackgroundChain::StartCallProofRequest {
1514                    target,
1515                    config,
1516                    timeout,
1517                    result,
1518                },
1519            ) => {
1520                log!(
1521                    &task.platform,
1522                    Debug,
1523                    "network",
1524                    "call-proof-request-started",
1525                    chain = task.network[chain_id].log_name,
1526                    target,
1527                    block_hash = HashDisplay(&config.block_hash),
1528                    function = config.method
1529                );
1530                // TODO: log parameter
1531
1532                match task.network.start_call_proof_request(
1533                    &target,
1534                    chain_id,
1535                    config.clone(),
1536                    timeout,
1537                ) {
1538                    Ok(substream_id) => {
1539                        task.call_proof_requests.insert(substream_id, result);
1540                    }
1541                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1542                        log!(
1543                            &task.platform,
1544                            Debug,
1545                            "network",
1546                            "call-proof-request-error",
1547                            chain = task.network[chain_id].log_name,
1548                            target,
1549                            error = "NoConnection"
1550                        );
1551                        let _ = result.send(Err(CallProofRequestError::NoConnection));
1552                    }
1553                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1554                        log!(
1555                            &task.platform,
1556                            Debug,
1557                            "network",
1558                            "call-proof-request-error",
1559                            chain = task.network[chain_id].log_name,
1560                            target,
1561                            error = "RequestTooLarge"
1562                        );
1563                        let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1564                    }
1565                };
1566            }
1567            WakeUpReason::MessageForChain(
1568                chain_id,
1569                ToBackgroundChain::StartChildStorageProofRequest {
1570                    target,
1571                    config,
1572                    timeout,
1573                    result,
1574                },
1575            ) => {
1576                log!(
1577                    &task.platform,
1578                    Debug,
1579                    "network",
1580                    "child-storage-proof-request-started",
1581                    chain = task.network[chain_id].log_name,
1582                    target,
1583                    block_hash = HashDisplay(&config.block_hash)
1584                );
1585
1586                match task.network.start_child_storage_proof_request(
1587                    &target,
1588                    chain_id,
1589                    codec::ChildStorageProofRequestConfig {
1590                        block_hash: config.block_hash,
1591                        child_trie: &config.child_trie,
1592                        keys: config.keys.iter().map(|k| k.as_slice()),
1593                    },
1594                    timeout,
1595                ) {
1596                    Ok(substream_id) => {
1597                        task.child_storage_proof_requests
1598                            .insert(substream_id, result);
1599                    }
1600                    Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1601                        log!(
1602                            &task.platform,
1603                            Debug,
1604                            "network",
1605                            "child-storage-proof-request-error",
1606                            chain = task.network[chain_id].log_name,
1607                            target,
1608                            error = "NoConnection"
1609                        );
1610                        let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1611                    }
1612                    Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1613                        log!(
1614                            &task.platform,
1615                            Debug,
1616                            "network",
1617                            "child-storage-proof-request-error",
1618                            chain = task.network[chain_id].log_name,
1619                            target,
1620                            error = "RequestTooLarge"
1621                        );
1622                        let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1623                    }
1624                };
1625            }
1626            WakeUpReason::MessageForChain(
1627                chain_id,
1628                ToBackgroundChain::SetLocalBestBlock {
1629                    best_hash,
1630                    best_number,
1631                },
1632            ) => {
1633                task.network
1634                    .set_chain_local_best_block(chain_id, best_hash, best_number);
1635            }
1636            WakeUpReason::MessageForChain(
1637                chain_id,
1638                ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1639            ) => {
1640                log!(
1641                    &task.platform,
1642                    Debug,
1643                    "network",
1644                    "local-grandpa-state-announced",
1645                    chain = task.network[chain_id].log_name,
1646                    set_id = grandpa_state.set_id,
1647                    commit_finalized_height = grandpa_state.commit_finalized_height,
1648                );
1649
1650                // TODO: log the list of peers we sent the packet to
1651
1652                task.network
1653                    .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1654            }
1655            WakeUpReason::MessageForChain(
1656                chain_id,
1657                ToBackgroundChain::AnnounceTransaction {
1658                    transaction,
1659                    result,
1660                },
1661            ) => {
1662                // TODO: keep track of which peer knows about which transaction, and don't send it again
1663
1664                let peers_to_send = task
1665                    .network
1666                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1667                    .cloned()
1668                    .collect::<Vec<_>>();
1669
1670                let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1671                let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1672                for peer in &peers_to_send {
1673                    match task
1674                        .network
1675                        .gossip_send_transaction(peer, chain_id, &transaction)
1676                    {
1677                        Ok(()) => peers_sent.push(peer.to_base58()),
1678                        Err(QueueNotificationError::QueueFull) => {
1679                            peers_queue_full.push(peer.to_base58())
1680                        }
1681                        Err(QueueNotificationError::NoConnection) => unreachable!(),
1682                    }
1683                }
1684
1685                log!(
1686                    &task.platform,
1687                    Debug,
1688                    "network",
1689                    "transaction-announced",
1690                    chain = task.network[chain_id].log_name,
1691                    transaction =
1692                        hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
1693                    size = transaction.len(),
1694                    peers_sent = peers_sent.join(", "),
1695                    peers_queue_full = peers_queue_full.join(", "),
1696                );
1697
1698                let _ = result.send(peers_to_send);
1699            }
1700            WakeUpReason::MessageForChain(
1701                chain_id,
1702                ToBackgroundChain::SendBlockAnnounce {
1703                    target,
1704                    scale_encoded_header,
1705                    is_best,
1706                    result,
1707                },
1708            ) => {
1709                // TODO: log who the announce was sent to
1710                let _ = result.send(task.network.gossip_send_block_announce(
1711                    &target,
1712                    chain_id,
1713                    &scale_encoded_header,
1714                    is_best,
1715                ));
1716            }
1717            WakeUpReason::MessageForChain(
1718                chain_id,
1719                ToBackgroundChain::Discover {
1720                    list,
1721                    important_nodes,
1722                },
1723            ) => {
1724                for (peer_id, addrs) in list {
1725                    if important_nodes {
1726                        task.important_nodes.insert(peer_id.clone());
1727                    }
1728
1729                    // Note that we must call this function before `insert_address`, as documented
1730                    // in `basic_peering_strategy`.
1731                    task.peering_strategy
1732                        .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
1733
1734                    for addr in addrs {
1735                        let _ =
1736                            task.peering_strategy
1737                                .insert_address(&peer_id, addr.into_bytes(), 10);
1738                        // TODO: constant
1739                    }
1740                }
1741            }
1742            WakeUpReason::MessageForChain(
1743                chain_id,
1744                ToBackgroundChain::DiscoveredNodes { result },
1745            ) => {
1746                // TODO: consider returning Vec<u8>s for the addresses?
1747                let _ = result.send(
1748                    task.peering_strategy
1749                        .chain_peers_unordered(&chain_id)
1750                        .map(|peer_id| {
1751                            let addrs = task
1752                                .peering_strategy
1753                                .peer_addresses(peer_id)
1754                                .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
1755                                .collect::<Vec<_>>();
1756                            (peer_id.clone(), addrs)
1757                        })
1758                        .collect::<Vec<_>>(),
1759                );
1760            }
1761            WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
1762                let _ = result.send(
1763                    task.network
1764                        .gossip_connected_peers(
1765                            chain_id,
1766                            service::GossipKind::ConsensusTransactions,
1767                        )
1768                        .cloned()
1769                        .collect(),
1770                );
1771            }
1772            WakeUpReason::StartDiscovery(chain_id) => {
1773                // Re-insert the chain in `chains_by_next_discovery`.
1774                let chain = &mut task.network[chain_id];
1775                chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
1776                chain.next_discovery_period =
1777                    cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
1778                task.chains_by_next_discovery.insert(
1779                    (chain.next_discovery_when.clone(), chain_id),
1780                    Box::pin(
1781                        task.platform
1782                            .sleep(task.network[chain_id].next_discovery_period),
1783                    ),
1784                );
1785
1786                let random_peer_id = {
1787                    let mut pub_key = [0; 32];
1788                    rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
1789                    PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
1790                };
1791
1792                // TODO: select target closest to the random peer instead
1793                let target = task
1794                    .network
1795                    .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1796                    .next()
1797                    .cloned();
1798
1799                if let Some(target) = target {
1800                    match task.network.start_kademlia_find_node_request(
1801                        &target,
1802                        chain_id,
1803                        &random_peer_id,
1804                        Duration::from_secs(20),
1805                    ) {
1806                        Ok(_) => {}
1807                        Err(service::StartRequestError::NoConnection) => unreachable!(),
1808                    };
1809
1810                    log!(
1811                        &task.platform,
1812                        Debug,
1813                        "network",
1814                        "discovery-find-node-started",
1815                        chain = &task.network[chain_id].log_name,
1816                        request_target = target,
1817                        requested_peer_id = random_peer_id
1818                    );
1819                } else {
1820                    log!(
1821                        &task.platform,
1822                        Debug,
1823                        "network",
1824                        "discovery-skipped-no-peer",
1825                        chain = &task.network[chain_id].log_name
1826                    );
1827                }
1828            }
1829            WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1830                peer_id,
1831                expected_peer_id,
1832                id,
1833            }) => {
1834                let remote_addr =
1835                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
1836                if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
1837                {
1838                    log!(
1839                        &task.platform,
1840                        Debug,
1841                        "network",
1842                        "handshake-finished-peer-id-mismatch",
1843                        remote_addr,
1844                        expected_peer_id,
1845                        actual_peer_id = peer_id
1846                    );
1847
1848                    let _was_in = task
1849                        .peering_strategy
1850                        .decrease_address_connections_and_remove_if_zero(
1851                            expected_peer_id,
1852                            remote_addr.as_ref(),
1853                        );
1854                    debug_assert!(_was_in.is_ok());
1855                    let _ = task.peering_strategy.increase_address_connections(
1856                        &peer_id,
1857                        remote_addr.into_bytes().to_vec(),
1858                        10,
1859                    );
1860                } else {
1861                    log!(
1862                        &task.platform,
1863                        Debug,
1864                        "network",
1865                        "handshake-finished",
1866                        remote_addr,
1867                        peer_id
1868                    );
1869                }
1870            }
1871            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1872                expected_peer_id: Some(_),
1873                ..
1874            })
1875            | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1876                let (address, peer_id, handshake_finished) = match wake_up_reason {
1877                    WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1878                        address,
1879                        expected_peer_id: Some(peer_id),
1880                        ..
1881                    }) => (address, peer_id, false),
1882                    WakeUpReason::NetworkEvent(service::Event::Disconnected {
1883                        address,
1884                        peer_id,
1885                        ..
1886                    }) => (address, peer_id, true),
1887                    _ => unreachable!(),
1888                };
1889
1890                task.peering_strategy
1891                    .decrease_address_connections(&peer_id, &address)
1892                    .unwrap();
1893                let address = Multiaddr::from_bytes(address).unwrap();
1894                log!(
1895                    &task.platform,
1896                    Debug,
1897                    "network",
1898                    "connection-shutdown",
1899                    peer_id,
1900                    address,
1901                    ?handshake_finished
1902                );
1903
1904                // Ban the peer in order to avoid trying over and over again the same address(es).
1905                // Even if the handshake was finished, it is possible that the peer simply shuts
1906                // down connections immediately after it has been opened, hence the ban.
1907                // Due to race conditions and peerid mismatches, it is possible that there is
1908                // another existing connection or connection attempt with that same peer. However,
1909                // it is not possible to be sure that we will reach 0 connections or connection
1910                // attempts, and thus we ban the peer every time.
1911                let ban_duration = Duration::from_secs(5);
1912                task.network.gossip_remove_desired_all(
1913                    &peer_id,
1914                    service::GossipKind::ConsensusTransactions,
1915                );
1916                for (&chain_id, what_happened) in task
1917                    .peering_strategy
1918                    .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
1919                {
1920                    if matches!(
1921                        what_happened,
1922                        basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1923                    ) {
1924                        log!(
1925                            &task.platform,
1926                            Debug,
1927                            "network",
1928                            "slot-unassigned",
1929                            chain = &task.network[chain_id].log_name,
1930                            peer_id,
1931                            ?ban_duration,
1932                            reason = "pre-handshake-disconnect"
1933                        );
1934                    }
1935                }
1936            }
1937            WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1938                expected_peer_id: None,
1939                ..
1940            }) => {
1941                // This path can't be reached as we always set an expected peer id when creating
1942                // a connection.
1943                debug_assert!(false);
1944            }
1945            WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1946                id,
1947                peer_id,
1948                ping_time,
1949            }) => {
1950                let remote_addr =
1951                    Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); // TODO: review this unwrap
1952                log!(
1953                    &task.platform,
1954                    Debug,
1955                    "network",
1956                    "pong",
1957                    peer_id,
1958                    remote_addr,
1959                    ?ping_time
1960                );
1961            }
1962            WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1963                chain_id,
1964                peer_id,
1965                announce,
1966            }) => {
1967                log!(
1968                    &task.platform,
1969                    Debug,
1970                    "network",
1971                    "block-announce-received",
1972                    chain = &task.network[chain_id].log_name,
1973                    peer_id,
1974                    block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
1975                        announce.decode().scale_encoded_header
1976                    )),
1977                    is_best = announce.decode().is_best
1978                );
1979
1980                let decoded_announce = announce.decode();
1981                if decoded_announce.is_best {
1982                    let link = task
1983                        .open_gossip_links
1984                        .get_mut(&(chain_id, peer_id.clone()))
1985                        .unwrap();
1986                    if let Ok(decoded) = header::decode(
1987                        decoded_announce.scale_encoded_header,
1988                        task.network[chain_id].block_number_bytes,
1989                    ) {
1990                        link.best_block_hash = header::hash_from_scale_encoded_header(
1991                            decoded_announce.scale_encoded_header,
1992                        );
1993                        link.best_block_number = decoded.number;
1994                    }
1995                }
1996
1997                debug_assert!(task.event_pending_send.is_none());
1998                task.event_pending_send =
1999                    Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2000            }
2001            WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2002                peer_id,
2003                chain_id,
2004                role,
2005                best_number,
2006                best_hash,
2007                kind: service::GossipKind::ConsensusTransactions,
2008            }) => {
2009                log!(
2010                    &task.platform,
2011                    Debug,
2012                    "network",
2013                    "gossip-open-success",
2014                    chain = &task.network[chain_id].log_name,
2015                    peer_id,
2016                    best_number,
2017                    best_hash = HashDisplay(&best_hash)
2018                );
2019
2020                let _prev_value = task.open_gossip_links.insert(
2021                    (chain_id, peer_id.clone()),
2022                    OpenGossipLinkState {
2023                        best_block_number: best_number,
2024                        best_block_hash: best_hash,
2025                        role,
2026                        finalized_block_height: None,
2027                    },
2028                );
2029                debug_assert!(_prev_value.is_none());
2030
2031                debug_assert!(task.event_pending_send.is_none());
2032                task.event_pending_send = Some((
2033                    chain_id,
2034                    Event::Connected {
2035                        peer_id,
2036                        role,
2037                        best_block_number: best_number,
2038                        best_block_hash: best_hash,
2039                    },
2040                ));
2041            }
2042            WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2043                peer_id,
2044                chain_id,
2045                error,
2046                kind: service::GossipKind::ConsensusTransactions,
2047            }) => {
2048                log!(
2049                    &task.platform,
2050                    Debug,
2051                    "network",
2052                    "gossip-open-error",
2053                    chain = &task.network[chain_id].log_name,
2054                    peer_id,
2055                    ?error,
2056                );
2057                let ban_duration = Duration::from_secs(15);
2058
2059                // Note that peer doesn't necessarily have an out slot, as this event might happen
2060                // as a result of an inbound gossip connection.
2061                let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2062                    matches!(
2063                        task.peering_strategy
2064                            .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2065                        basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2066                    )
2067                } else {
2068                    matches!(
2069                        task.peering_strategy.unassign_slot_and_ban(
2070                            &chain_id,
2071                            &peer_id,
2072                            task.platform.now() + ban_duration,
2073                        ),
2074                        basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2075                    )
2076                };
2077
2078                if had_slot {
2079                    log!(
2080                        &task.platform,
2081                        Debug,
2082                        "network",
2083                        "slot-unassigned",
2084                        chain = &task.network[chain_id].log_name,
2085                        peer_id,
2086                        ?ban_duration,
2087                        reason = "gossip-open-failed"
2088                    );
2089                    task.network.gossip_remove_desired(
2090                        chain_id,
2091                        &peer_id,
2092                        service::GossipKind::ConsensusTransactions,
2093                    );
2094                }
2095            }
2096            WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2097                peer_id,
2098                chain_id,
2099                kind: service::GossipKind::ConsensusTransactions,
2100            }) => {
2101                log!(
2102                    &task.platform,
2103                    Debug,
2104                    "network",
2105                    "gossip-closed",
2106                    chain = &task.network[chain_id].log_name,
2107                    peer_id,
2108                );
2109                let ban_duration = Duration::from_secs(10);
2110
2111                let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2112                debug_assert!(_was_in.is_some());
2113
2114                // Note that peer doesn't necessarily have an out slot, as this event might happen
2115                // as a result of an inbound gossip connection.
2116                if matches!(
2117                    task.peering_strategy.unassign_slot_and_ban(
2118                        &chain_id,
2119                        &peer_id,
2120                        task.platform.now() + ban_duration,
2121                    ),
2122                    basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2123                ) {
2124                    log!(
2125                        &task.platform,
2126                        Debug,
2127                        "network",
2128                        "slot-unassigned",
2129                        chain = &task.network[chain_id].log_name,
2130                        peer_id,
2131                        ?ban_duration,
2132                        reason = "gossip-closed"
2133                    );
2134                    task.network.gossip_remove_desired(
2135                        chain_id,
2136                        &peer_id,
2137                        service::GossipKind::ConsensusTransactions,
2138                    );
2139                }
2140
2141                debug_assert!(task.event_pending_send.is_none());
2142                task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2143            }
2144            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2145                substream_id,
2146                peer_id,
2147                chain_id,
2148                response: service::RequestResult::Blocks(response),
2149            }) => {
2150                match &response {
2151                    Ok(blocks) => {
2152                        log!(
2153                            &task.platform,
2154                            Debug,
2155                            "network",
2156                            "blocks-request-success",
2157                            chain = task.network[chain_id].log_name,
2158                            target = peer_id,
2159                            num_blocks = blocks.len(),
2160                            block_data_total_size =
2161                                BytesDisplay(blocks.iter().fold(0, |sum, block| {
2162                                    let block_size = block.header.as_ref().map_or(0, |h| h.len())
2163                                        + block
2164                                            .body
2165                                            .as_ref()
2166                                            .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2167                                        + block
2168                                            .justifications
2169                                            .as_ref()
2170                                            .into_iter()
2171                                            .flat_map(|l| l.iter())
2172                                            .fold(0, |s, j| s + j.justification.len());
2173                                    sum + u64::try_from(block_size).unwrap()
2174                                }))
2175                        );
2176                    }
2177                    Err(error) => {
2178                        log!(
2179                            &task.platform,
2180                            Debug,
2181                            "network",
2182                            "blocks-request-error",
2183                            chain = task.network[chain_id].log_name,
2184                            target = peer_id,
2185                            ?error
2186                        );
2187                    }
2188                }
2189
2190                match &response {
2191                    Ok(_) => {}
2192                    Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2193                    Err(err) => {
2194                        log!(
2195                            &task.platform,
2196                            Debug,
2197                            "network",
2198                            format!(
2199                                "Error in block request with {}. This might indicate an \
2200                                incompatibility. Error: {}",
2201                                peer_id, err
2202                            )
2203                        );
2204                    }
2205                }
2206
2207                let _ = task
2208                    .blocks_requests
2209                    .remove(&substream_id)
2210                    .unwrap()
2211                    .send(response.map_err(BlocksRequestError::Request));
2212            }
2213            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2214                substream_id,
2215                peer_id,
2216                chain_id,
2217                response: service::RequestResult::GrandpaWarpSync(response),
2218            }) => {
2219                match &response {
2220                    Ok(response) => {
2221                        // TODO: print total bytes size
2222                        let decoded = response.decode();
2223                        log!(
2224                            &task.platform,
2225                            Debug,
2226                            "network",
2227                            "warp-sync-request-success",
2228                            chain = task.network[chain_id].log_name,
2229                            target = peer_id,
2230                            num_fragments = decoded.fragments.len(),
2231                            is_finished = ?decoded.is_finished,
2232                        );
2233                    }
2234                    Err(error) => {
2235                        log!(
2236                            &task.platform,
2237                            Debug,
2238                            "network",
2239                            "warp-sync-request-error",
2240                            chain = task.network[chain_id].log_name,
2241                            target = peer_id,
2242                            ?error,
2243                        );
2244                    }
2245                }
2246
2247                let _ = task
2248                    .grandpa_warp_sync_requests
2249                    .remove(&substream_id)
2250                    .unwrap()
2251                    .send(response.map_err(WarpSyncRequestError::Request));
2252            }
2253            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2254                substream_id,
2255                peer_id,
2256                chain_id,
2257                response: service::RequestResult::StorageProof(response),
2258            }) => {
2259                match &response {
2260                    Ok(items) => {
2261                        let decoded = items.decode();
2262                        log!(
2263                            &task.platform,
2264                            Debug,
2265                            "network",
2266                            "storage-proof-request-success",
2267                            chain = task.network[chain_id].log_name,
2268                            target = peer_id,
2269                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2270                        );
2271                    }
2272                    Err(error) => {
2273                        log!(
2274                            &task.platform,
2275                            Debug,
2276                            "network",
2277                            "storage-proof-request-error",
2278                            chain = task.network[chain_id].log_name,
2279                            target = peer_id,
2280                            ?error
2281                        );
2282                    }
2283                }
2284
2285                // Both regular storage proof and child storage proof use the same protocol,
2286                // so check both HashMaps for the request.
2287                if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2288                    let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2289                } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2290                {
2291                    let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2292                } else {
2293                    unreachable!()
2294                }
2295            }
2296            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2297                substream_id,
2298                peer_id,
2299                chain_id,
2300                response: service::RequestResult::CallProof(response),
2301            }) => {
2302                match &response {
2303                    Ok(items) => {
2304                        let decoded = items.decode();
2305                        log!(
2306                            &task.platform,
2307                            Debug,
2308                            "network",
2309                            "call-proof-request-success",
2310                            chain = task.network[chain_id].log_name,
2311                            target = peer_id,
2312                            total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2313                        );
2314                    }
2315                    Err(error) => {
2316                        log!(
2317                            &task.platform,
2318                            Debug,
2319                            "network",
2320                            "call-proof-request-error",
2321                            chain = task.network[chain_id].log_name,
2322                            target = peer_id,
2323                            ?error
2324                        );
2325                    }
2326                }
2327
2328                let _ = task
2329                    .call_proof_requests
2330                    .remove(&substream_id)
2331                    .unwrap()
2332                    .send(response.map_err(CallProofRequestError::Request));
2333            }
2334            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2335                peer_id: requestee_peer_id,
2336                chain_id,
2337                response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2338                ..
2339            }) => {
2340                for (peer_id, mut addrs) in nodes {
2341                    // Make sure to not insert too many address for a single peer.
2342                    // While the .
2343                    if addrs.len() >= 10 {
2344                        addrs.truncate(10);
2345                    }
2346
2347                    let mut valid_addrs = Vec::with_capacity(addrs.len());
2348                    for addr in addrs {
2349                        match Multiaddr::from_bytes(addr) {
2350                            Ok(a) => {
2351                                if platform::address_parse::multiaddr_to_address(&a)
2352                                    .ok()
2353                                    .map_or(false, |addr| {
2354                                        task.platform.supports_connection_type((&addr).into())
2355                                    })
2356                                {
2357                                    valid_addrs.push(a)
2358                                } else {
2359                                    log!(
2360                                        &task.platform,
2361                                        Debug,
2362                                        "network",
2363                                        "discovered-address-not-supported",
2364                                        chain = &task.network[chain_id].log_name,
2365                                        peer_id,
2366                                        addr = &a,
2367                                        obtained_from = requestee_peer_id
2368                                    );
2369                                }
2370                            }
2371                            Err((error, addr)) => {
2372                                log!(
2373                                    &task.platform,
2374                                    Debug,
2375                                    "network",
2376                                    "discovered-address-invalid",
2377                                    chain = &task.network[chain_id].log_name,
2378                                    peer_id,
2379                                    error,
2380                                    addr = hex::encode(&addr),
2381                                    obtained_from = requestee_peer_id
2382                                );
2383                            }
2384                        }
2385                    }
2386
2387                    if !valid_addrs.is_empty() {
2388                        // Note that we must call this function before `insert_address`,
2389                        // as documented in `basic_peering_strategy`.
2390                        let insert_outcome =
2391                            task.peering_strategy
2392                                .insert_chain_peer(chain_id, peer_id.clone(), 30); // TODO: constant
2393
2394                        if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2395                            peer_removed,
2396                        } = insert_outcome
2397                        {
2398                            if let Some(peer_removed) = peer_removed {
2399                                log!(
2400                                    &task.platform,
2401                                    Debug,
2402                                    "network",
2403                                    "peer-purged-from-address-book",
2404                                    chain = &task.network[chain_id].log_name,
2405                                    peer_id = peer_removed,
2406                                );
2407                            }
2408
2409                            log!(
2410                                &task.platform,
2411                                Debug,
2412                                "network",
2413                                "peer-discovered",
2414                                chain = &task.network[chain_id].log_name,
2415                                peer_id,
2416                                addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), // TODO: better formatting?
2417                                obtained_from = requestee_peer_id
2418                            );
2419                        }
2420                    }
2421
2422                    for addr in valid_addrs {
2423                        let _insert_result =
2424                            task.peering_strategy
2425                                .insert_address(&peer_id, addr.into_bytes(), 10); // TODO: constant
2426                        debug_assert!(!matches!(
2427                            _insert_result,
2428                            basic_peering_strategy::InsertAddressResult::UnknownPeer
2429                        ));
2430                    }
2431                }
2432            }
2433            WakeUpReason::NetworkEvent(service::Event::RequestResult {
2434                peer_id,
2435                chain_id,
2436                response: service::RequestResult::KademliaFindNode(Err(error)),
2437                ..
2438            }) => {
2439                log!(
2440                    &task.platform,
2441                    Debug,
2442                    "network",
2443                    "discovery-find-node-error",
2444                    chain = &task.network[chain_id].log_name,
2445                    ?error,
2446                    find_node_target = peer_id,
2447                );
2448
2449                // No error is printed if the request fails due to a benign networking error such
2450                // as an unresponsive peer.
2451                match error {
2452                    service::KademliaFindNodeError::RequestFailed(err)
2453                        if !err.is_protocol_error() => {}
2454
2455                    service::KademliaFindNodeError::RequestFailed(
2456                        service::RequestError::Substream(
2457                            connection::established::RequestError::ProtocolNotAvailable,
2458                        ),
2459                    ) => {
2460                        // TODO: remove this warning in a long time
2461                        log!(
2462                            &task.platform,
2463                            Warn,
2464                            "network",
2465                            format!(
2466                                "Problem during discovery on {}: protocol not available. \
2467                                This might indicate that the version of Substrate used by \
2468                                the chain doesn't include \
2469                                <https://github.com/paritytech/substrate/pull/12545>.",
2470                                &task.network[chain_id].log_name
2471                            )
2472                        );
2473                    }
2474                    _ => {
2475                        log!(
2476                            &task.platform,
2477                            Debug,
2478                            "network",
2479                            format!(
2480                                "Problem during discovery on {}: {}",
2481                                &task.network[chain_id].log_name, error
2482                            )
2483                        );
2484                    }
2485                }
2486            }
2487            WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
2488                // We never start any other kind of requests.
2489                unreachable!()
2490            }
2491            WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
2492                peer_id,
2493                chain_id,
2494                kind: service::GossipKind::ConsensusTransactions,
2495            }) => {
2496                // The networking state machine guarantees that `GossipInDesired`
2497                // can't happen if we are already opening an out slot, which we do
2498                // immediately.
2499                // TODO: add debug_assert! ^
2500                if task
2501                    .network
2502                    .opened_gossip_undesired_by_chain(chain_id)
2503                    .count()
2504                    < 4
2505                {
2506                    log!(
2507                        &task.platform,
2508                        Debug,
2509                        "network",
2510                        "gossip-in-request",
2511                        chain = &task.network[chain_id].log_name,
2512                        peer_id,
2513                        outcome = "accepted"
2514                    );
2515                    task.network
2516                        .gossip_open(
2517                            chain_id,
2518                            &peer_id,
2519                            service::GossipKind::ConsensusTransactions,
2520                        )
2521                        .unwrap();
2522                } else {
2523                    log!(
2524                        &task.platform,
2525                        Debug,
2526                        "network",
2527                        "gossip-in-request",
2528                        chain = &task.network[chain_id].log_name,
2529                        peer_id,
2530                        outcome = "rejected",
2531                    );
2532                    task.network
2533                        .gossip_close(
2534                            chain_id,
2535                            &peer_id,
2536                            service::GossipKind::ConsensusTransactions,
2537                        )
2538                        .unwrap();
2539                }
2540            }
2541            WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
2542                // Can't happen as we already instantaneously accept or reject gossip in requests.
2543                unreachable!()
2544            }
2545            WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2546                peer_id,
2547                substream_id,
2548            }) => {
2549                log!(
2550                    &task.platform,
2551                    Debug,
2552                    "network",
2553                    "identify-request-received",
2554                    peer_id,
2555                );
2556                task.network
2557                    .respond_identify(substream_id, &task.identify_agent_version);
2558            }
2559            WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
2560            WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
2561                // All incoming requests are immediately answered.
2562                unreachable!()
2563            }
2564            WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2565                chain_id,
2566                peer_id,
2567                state,
2568            }) => {
2569                log!(
2570                    &task.platform,
2571                    Debug,
2572                    "network",
2573                    "grandpa-neighbor-packet-received",
2574                    chain = &task.network[chain_id].log_name,
2575                    peer_id,
2576                    round_number = state.round_number,
2577                    set_id = state.set_id,
2578                    commit_finalized_height = state.commit_finalized_height,
2579                );
2580
2581                task.open_gossip_links
2582                    .get_mut(&(chain_id, peer_id.clone()))
2583                    .unwrap()
2584                    .finalized_block_height = Some(state.commit_finalized_height);
2585
2586                debug_assert!(task.event_pending_send.is_none());
2587                task.event_pending_send = Some((
2588                    chain_id,
2589                    Event::GrandpaNeighborPacket {
2590                        peer_id,
2591                        finalized_block_height: state.commit_finalized_height,
2592                    },
2593                ));
2594            }
2595            WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2596                chain_id,
2597                peer_id,
2598                message,
2599            }) => {
2600                log!(
2601                    &task.platform,
2602                    Debug,
2603                    "network",
2604                    "grandpa-commit-message-received",
2605                    chain = &task.network[chain_id].log_name,
2606                    peer_id,
2607                    target_block_hash = HashDisplay(message.decode().target_hash),
2608                );
2609
2610                debug_assert!(task.event_pending_send.is_none());
2611                task.event_pending_send =
2612                    Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
2613            }
2614            WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2615                // TODO: handle properly?
2616                log!(
2617                    &task.platform,
2618                    Warn,
2619                    "network",
2620                    "protocol-error",
2621                    peer_id,
2622                    ?error
2623                );
2624
2625                // TODO: disconnect peer
2626            }
2627            WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2628                task.peering_strategy.assign_slot(&chain_id, &peer_id);
2629
2630                log!(
2631                    &task.platform,
2632                    Debug,
2633                    "network",
2634                    "slot-assigned",
2635                    chain = &task.network[chain_id].log_name,
2636                    peer_id
2637                );
2638
2639                task.network.gossip_insert_desired(
2640                    chain_id,
2641                    peer_id,
2642                    service::GossipKind::ConsensusTransactions,
2643                );
2644            }
2645            WakeUpReason::NextRecentConnectionRestore => {
2646                task.num_recent_connection_opening =
2647                    task.num_recent_connection_opening.saturating_sub(1);
2648            }
2649            WakeUpReason::CanStartConnect(expected_peer_id) => {
2650                let Some(multiaddr) = task
2651                    .peering_strategy
2652                    .pick_address_and_add_connection(&expected_peer_id)
2653                else {
2654                    // There is no address for that peer in the address book.
2655                    task.network.gossip_remove_desired_all(
2656                        &expected_peer_id,
2657                        service::GossipKind::ConsensusTransactions,
2658                    );
2659                    let ban_duration = Duration::from_secs(10);
2660                    for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
2661                        &expected_peer_id,
2662                        task.platform.now() + ban_duration,
2663                    ) {
2664                        if matches!(
2665                            what_happened,
2666                            basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2667                        ) {
2668                            log!(
2669                                &task.platform,
2670                                Debug,
2671                                "network",
2672                                "slot-unassigned",
2673                                chain = &task.network[chain_id].log_name,
2674                                peer_id = expected_peer_id,
2675                                ?ban_duration,
2676                                reason = "no-address"
2677                            );
2678                        }
2679                    }
2680                    continue;
2681                };
2682
2683                let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2684                    Ok(a) => a,
2685                    Err((multiaddr::FromBytesError, addr)) => {
2686                        // Address is in an invalid format.
2687                        let _was_in = task
2688                            .peering_strategy
2689                            .decrease_address_connections_and_remove_if_zero(
2690                                &expected_peer_id,
2691                                &addr,
2692                            );
2693                        debug_assert!(_was_in.is_ok());
2694                        continue;
2695                    }
2696                };
2697
2698                let address = address_parse::multiaddr_to_address(&multiaddr)
2699                    .ok()
2700                    .filter(|addr| {
2701                        task.platform.supports_connection_type(match &addr {
2702                            address_parse::AddressOrMultiStreamAddress::Address(addr) => {
2703                                From::from(addr)
2704                            }
2705                            address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2706                                addr,
2707                            ) => From::from(addr),
2708                        })
2709                    });
2710
2711                let Some(address) = address else {
2712                    // Address is in an invalid format or isn't supported by the platform.
2713                    let _was_in = task
2714                        .peering_strategy
2715                        .decrease_address_connections_and_remove_if_zero(
2716                            &expected_peer_id,
2717                            multiaddr.as_ref(),
2718                        );
2719                    debug_assert!(_was_in.is_ok());
2720                    continue;
2721                };
2722
2723                // Each connection has its own individual Noise key.
2724                let noise_key = {
2725                    let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
2726                    task.platform.fill_random_bytes(&mut *noise_static_key);
2727                    let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
2728                    task.platform.fill_random_bytes(&mut *libp2p_key);
2729                    connection::NoiseKey::new(&libp2p_key, &noise_static_key)
2730                };
2731
2732                log!(
2733                    &task.platform,
2734                    Debug,
2735                    "network",
2736                    "connection-started",
2737                    expected_peer_id,
2738                    remote_addr = multiaddr,
2739                    local_peer_id =
2740                        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
2741                            .into_peer_id(),
2742                );
2743
2744                task.num_recent_connection_opening += 1;
2745
2746                let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
2747                    async_channel::bounded(8);
2748                let task_name = format!("connection-{}", multiaddr);
2749
2750                match address {
2751                    address_parse::AddressOrMultiStreamAddress::Address(address) => {
2752                        // As documented in the `PlatformRef` trait, `connect_stream` must
2753                        // return as soon as possible.
2754                        let connection = task.platform.connect_stream(address).await;
2755
2756                        let (connection_id, connection_task) =
2757                            task.network.add_single_stream_connection(
2758                                task.platform.now(),
2759                                service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2760                                    is_initiator: true,
2761                                    noise_key: &noise_key,
2762                                },
2763                                multiaddr.clone().into_bytes(),
2764                                Some(expected_peer_id.clone()),
2765                                coordinator_to_connection_tx,
2766                            );
2767
2768                        task.platform.spawn_task(
2769                            task_name.into(),
2770                            tasks::single_stream_connection_task::<TPlat>(
2771                                connection,
2772                                multiaddr.to_string(),
2773                                task.platform.clone(),
2774                                connection_id,
2775                                connection_task,
2776                                coordinator_to_connection_rx,
2777                                task.tasks_messages_tx.clone(),
2778                            ),
2779                        );
2780                    }
2781                    address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2782                        platform::MultiStreamAddress::WebRtc {
2783                            ip,
2784                            port,
2785                            remote_certificate_sha256,
2786                        },
2787                    ) => {
2788                        // We need to know the local TLS certificate in order to insert the
2789                        // connection, and as such we need to call `connect_multistream` here.
2790                        // As documented in the `PlatformRef` trait, `connect_multistream` must
2791                        // return as soon as possible.
2792                        let connection = task
2793                            .platform
2794                            .connect_multistream(platform::MultiStreamAddress::WebRtc {
2795                                ip,
2796                                port,
2797                                remote_certificate_sha256,
2798                            })
2799                            .await;
2800
2801                        // Convert the SHA256 hashes into multihashes.
2802                        let local_tls_certificate_multihash = [18u8, 32]
2803                            .into_iter()
2804                            .chain(connection.local_tls_certificate_sha256.into_iter())
2805                            .collect();
2806                        let remote_tls_certificate_multihash = [18u8, 32]
2807                            .into_iter()
2808                            .chain(remote_certificate_sha256.iter().copied())
2809                            .collect();
2810
2811                        let (connection_id, connection_task) =
2812                            task.network.add_multi_stream_connection(
2813                                task.platform.now(),
2814                                service::MultiStreamHandshakeKind::WebRtc {
2815                                    is_initiator: true,
2816                                    local_tls_certificate_multihash,
2817                                    remote_tls_certificate_multihash,
2818                                    noise_key: &noise_key,
2819                                },
2820                                multiaddr.clone().into_bytes(),
2821                                Some(expected_peer_id.clone()),
2822                                coordinator_to_connection_tx,
2823                            );
2824
2825                        task.platform.spawn_task(
2826                            task_name.into(),
2827                            tasks::webrtc_multi_stream_connection_task::<TPlat>(
2828                                connection.connection,
2829                                multiaddr.to_string(),
2830                                task.platform.clone(),
2831                                connection_id,
2832                                connection_task,
2833                                coordinator_to_connection_rx,
2834                                task.tasks_messages_tx.clone(),
2835                            ),
2836                        );
2837                    }
2838                }
2839            }
2840            WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2841                task.network
2842                    .gossip_open(
2843                        chain_id,
2844                        &peer_id,
2845                        service::GossipKind::ConsensusTransactions,
2846                    )
2847                    .unwrap();
2848
2849                log!(
2850                    &task.platform,
2851                    Debug,
2852                    "network",
2853                    "gossip-open-start",
2854                    chain = &task.network[chain_id].log_name,
2855                    peer_id,
2856                );
2857            }
2858            WakeUpReason::MessageToConnection {
2859                connection_id,
2860                message,
2861            } => {
2862                // Note that it is critical for the sending to not take too long here, in order to
2863                // not block the process of the network service.
2864                // In particular, if sending the message to the connection is blocked due to
2865                // sending a message on the connection-to-coordinator channel, this will result
2866                // in a deadlock.
2867                // For this reason, the connection task is always ready to immediately accept a
2868                // message on the coordinator-to-connection channel.
2869                let _send_result = task.network[connection_id].send(message).await;
2870                debug_assert!(_send_result.is_ok());
2871            }
2872        }
2873    }
2874}