1use crate::{
44 log,
45 platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49 borrow::ToOwned as _,
50 boxed::Box,
51 collections::BTreeMap,
52 format,
53 string::{String, ToString as _},
54 sync::Arc,
55 vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65 header,
66 informant::{BytesDisplay, HashDisplay},
67 libp2p::{
68 connection,
69 multiaddr::{self, Multiaddr},
70 peer_id,
71 },
72 network::{basic_peering_strategy, codec, service},
73};
74
75pub use codec::{CallProofRequestConfig, Role};
76pub use service::{ChainId, EncodedMerkleProof, PeerId, QueueNotificationError};
77
78mod tasks;
79
80pub struct Config<TPlat> {
82 pub platform: TPlat,
84
85 pub identify_agent_version: String,
87
88 pub chains_capacity: usize,
90
91 pub connections_open_pool_size: u32,
95
96 pub connections_open_pool_restore_delay: Duration,
101}
102
103pub struct ConfigChain {
109 pub log_name: String,
111
112 pub num_out_slots: usize,
115
116 pub genesis_block_hash: [u8; 32],
122
123 pub best_block: (u64, [u8; 32]),
126
127 pub fork_id: Option<String>,
130
131 pub block_number_bytes: usize,
133
134 pub grandpa_protocol_finalized_block_height: Option<u64>,
137}
138
139pub struct NetworkService<TPlat: PlatformRef> {
140 messages_tx: async_channel::Sender<ToBackground<TPlat>>,
142
143 platform: TPlat,
145}
146
147impl<TPlat: PlatformRef> NetworkService<TPlat> {
148 pub fn new(config: Config<TPlat>) -> Arc<Self> {
150 let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
151
152 let network = service::ChainNetwork::new(service::Config {
153 chains_capacity: config.chains_capacity,
154 connections_capacity: 32,
155 handshake_timeout: Duration::from_secs(8),
156 randomness_seed: {
157 let mut seed = [0; 32];
158 config.platform.fill_random_bytes(&mut seed);
159 seed
160 },
161 });
162
163 let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
165 let task = Box::pin(background_task(BackgroundTask {
166 randomness: rand_chacha::ChaCha20Rng::from_seed({
167 let mut seed = [0; 32];
168 config.platform.fill_random_bytes(&mut seed);
169 seed
170 }),
171 identify_agent_version: config.identify_agent_version,
172 tasks_messages_tx,
173 tasks_messages_rx: Box::pin(tasks_messages_rx),
174 peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
175 basic_peering_strategy::Config {
176 randomness_seed: {
177 let mut seed = [0; 32];
178 config.platform.fill_random_bytes(&mut seed);
179 seed
180 },
181 peers_capacity: 50, chains_capacity: config.chains_capacity,
183 },
184 ),
185 network,
186 connections_open_pool_size: config.connections_open_pool_size,
187 connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
188 num_recent_connection_opening: 0,
189 next_recent_connection_restore: None,
190 platform: config.platform.clone(),
191 open_gossip_links: BTreeMap::new(),
192 event_pending_send: None,
193 event_senders: either::Left(Vec::new()),
194 pending_new_subscriptions: Vec::new(),
195 important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
196 main_messages_rx: Box::pin(main_messages_rx),
197 messages_rx: stream::SelectAll::new(),
198 blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
199 grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
200 storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
201 call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
202 child_storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
203 chains_by_next_discovery: BTreeMap::new(),
204 }));
205
206 config.platform.spawn_task("network-service".into(), {
207 let platform = config.platform.clone();
208 async move {
209 task.await;
210 log!(&platform, Debug, "network", "shutdown");
211 }
212 });
213
214 Arc::new(NetworkService {
215 messages_tx: main_messages_tx,
216 platform: config.platform,
217 })
218 }
219
220 pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
226 let (messages_tx, messages_rx) = async_channel::bounded(32);
227
228 self.platform.spawn_task("add-chain-message-send".into(), {
230 let config = service::ChainConfig {
231 grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
232 |commit_finalized_height| service::GrandpaState {
233 commit_finalized_height,
234 round_number: 1,
235 set_id: 0,
236 },
237 ),
238 fork_id: config.fork_id.clone(),
239 block_number_bytes: config.block_number_bytes,
240 best_hash: config.best_block.1,
241 best_number: config.best_block.0,
242 genesis_hash: config.genesis_block_hash,
243 role: Role::Light,
244 allow_inbound_block_requests: false,
245 user_data: Chain {
246 log_name: config.log_name,
247 block_number_bytes: config.block_number_bytes,
248 num_out_slots: config.num_out_slots,
249 num_references: NonZero::<usize>::new(1).unwrap(),
250 next_discovery_period: Duration::from_secs(2),
251 next_discovery_when: self.platform.now(),
252 },
253 };
254
255 let messages_tx = self.messages_tx.clone();
256 async move {
257 let _ = messages_tx
258 .send(ToBackground::AddChain {
259 messages_rx,
260 config,
261 })
262 .await;
263 }
264 });
265
266 Arc::new(NetworkServiceChain {
267 _keep_alive_messages_tx: self.messages_tx.clone(),
268 messages_tx,
269 marker: core::marker::PhantomData,
270 })
271 }
272}
273
274pub struct NetworkServiceChain<TPlat: PlatformRef> {
275 _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
278
279 messages_tx: async_channel::Sender<ToBackgroundChain>,
281
282 marker: core::marker::PhantomData<TPlat>,
284}
285
286#[derive(Debug, Copy, Clone, PartialEq, Eq)]
288pub enum BanSeverity {
289 Low,
290 High,
291}
292
293impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
294 pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
315 let (tx, rx) = async_channel::bounded(128);
316
317 self.messages_tx
318 .send(ToBackgroundChain::Subscribe { sender: tx })
319 .await
320 .unwrap();
321
322 rx
323 }
324
325 pub async fn ban_and_disconnect(
337 &self,
338 peer_id: PeerId,
339 severity: BanSeverity,
340 reason: &'static str,
341 ) {
342 let _ = self
343 .messages_tx
344 .send(ToBackgroundChain::DisconnectAndBan {
345 peer_id,
346 severity,
347 reason,
348 })
349 .await;
350 }
351
352 pub async fn blocks_request(
355 self: Arc<Self>,
356 target: PeerId,
357 config: codec::BlocksRequestConfig,
358 timeout: Duration,
359 ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
360 let (tx, rx) = oneshot::channel();
361
362 self.messages_tx
363 .send(ToBackgroundChain::StartBlocksRequest {
364 target: target.clone(),
365 config,
366 timeout,
367 result: tx,
368 })
369 .await
370 .unwrap();
371
372 rx.await.unwrap()
373 }
374
375 pub async fn grandpa_warp_sync_request(
378 self: Arc<Self>,
379 target: PeerId,
380 begin_hash: [u8; 32],
381 timeout: Duration,
382 ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
383 let (tx, rx) = oneshot::channel();
384
385 self.messages_tx
386 .send(ToBackgroundChain::StartWarpSyncRequest {
387 target: target.clone(),
388 begin_hash,
389 timeout,
390 result: tx,
391 })
392 .await
393 .unwrap();
394
395 rx.await.unwrap()
396 }
397
398 pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
399 self.messages_tx
400 .send(ToBackgroundChain::SetLocalBestBlock {
401 best_hash,
402 best_number,
403 })
404 .await
405 .unwrap();
406 }
407
408 pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
409 self.messages_tx
410 .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
411 .await
412 .unwrap();
413 }
414
415 pub async fn storage_proof_request(
418 self: Arc<Self>,
419 target: PeerId, config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
421 timeout: Duration,
422 ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
423 let (tx, rx) = oneshot::channel();
424
425 self.messages_tx
426 .send(ToBackgroundChain::StartStorageProofRequest {
427 target: target.clone(),
428 config: codec::StorageProofRequestConfig {
429 block_hash: config.block_hash,
430 keys: config
431 .keys
432 .map(|key| key.as_ref().to_vec()) .collect::<Vec<_>>()
434 .into_iter(),
435 },
436 timeout,
437 result: tx,
438 })
439 .await
440 .unwrap();
441
442 rx.await.unwrap()
443 }
444
445 pub async fn call_proof_request(
450 self: Arc<Self>,
451 target: PeerId, config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
453 timeout: Duration,
454 ) -> Result<EncodedMerkleProof, CallProofRequestError> {
455 let (tx, rx) = oneshot::channel();
456
457 self.messages_tx
458 .send(ToBackgroundChain::StartCallProofRequest {
459 target: target.clone(),
460 config: codec::CallProofRequestConfig {
461 block_hash: config.block_hash,
462 method: config.method.into_owned().into(),
463 parameter_vectored: config
464 .parameter_vectored
465 .map(|v| v.as_ref().to_vec()) .collect::<Vec<_>>()
467 .into_iter(),
468 },
469 timeout,
470 result: tx,
471 })
472 .await
473 .unwrap();
474
475 rx.await.unwrap()
476 }
477
478 pub async fn child_storage_proof_request(
480 self: Arc<Self>,
481 target: PeerId,
482 config: codec::ChildStorageProofRequestConfig<
483 impl AsRef<[u8]> + Clone,
484 impl Iterator<Item = impl AsRef<[u8]> + Clone>,
485 >,
486 timeout: Duration,
487 ) -> Result<service::EncodedMerkleProof, ChildStorageProofRequestError> {
488 let (tx, rx) = oneshot::channel();
489
490 self.messages_tx
491 .send(ToBackgroundChain::StartChildStorageProofRequest {
492 target: target.clone(),
493 config: ChildStorageProofRequestConfigOwned {
494 block_hash: config.block_hash,
495 child_trie: config.child_trie.as_ref().to_vec(),
496 keys: config
497 .keys
498 .map(|key| key.as_ref().to_vec())
499 .collect::<Vec<_>>(),
500 },
501 timeout,
502 result: tx,
503 })
504 .await
505 .unwrap();
506
507 rx.await.unwrap()
508 }
509
510 pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
520 let (tx, rx) = oneshot::channel();
521
522 self.messages_tx
523 .send(ToBackgroundChain::AnnounceTransaction {
524 transaction: transaction.to_vec(), result: tx,
526 })
527 .await
528 .unwrap();
529
530 rx.await.unwrap()
531 }
532
533 pub async fn send_block_announce(
535 self: Arc<Self>,
536 target: &PeerId,
537 scale_encoded_header: &[u8],
538 is_best: bool,
539 ) -> Result<(), QueueNotificationError> {
540 let (tx, rx) = oneshot::channel();
541
542 self.messages_tx
543 .send(ToBackgroundChain::SendBlockAnnounce {
544 target: target.clone(), scale_encoded_header: scale_encoded_header.to_vec(), is_best,
547 result: tx,
548 })
549 .await
550 .unwrap();
551
552 rx.await.unwrap()
553 }
554
555 pub async fn discover(
561 &self,
562 list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
563 important_nodes: bool,
564 ) {
565 self.messages_tx
566 .send(ToBackgroundChain::Discover {
567 list: list
569 .into_iter()
570 .map(|(peer_id, addrs)| {
571 (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
572 })
573 .collect::<Vec<_>>()
574 .into_iter(),
575 important_nodes,
576 })
577 .await
578 .unwrap();
579 }
580
581 pub async fn discovered_nodes(
588 &self,
589 ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
590 let (tx, rx) = oneshot::channel();
591
592 self.messages_tx
593 .send(ToBackgroundChain::DiscoveredNodes { result: tx })
594 .await
595 .unwrap();
596
597 rx.await
598 .unwrap()
599 .into_iter()
600 .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
601 }
602
603 pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
606 let (tx, rx) = oneshot::channel();
607 self.messages_tx
608 .send(ToBackgroundChain::PeersList { result: tx })
609 .await
610 .unwrap();
611 rx.await.unwrap().into_iter()
612 }
613}
614
615#[derive(Debug, Clone)]
617pub enum Event {
618 Connected {
619 peer_id: PeerId,
620 role: Role,
621 best_block_number: u64,
622 best_block_hash: [u8; 32],
623 },
624 Disconnected {
625 peer_id: PeerId,
626 },
627 BlockAnnounce {
628 peer_id: PeerId,
629 announce: service::EncodedBlockAnnounce,
630 },
631 GrandpaNeighborPacket {
632 peer_id: PeerId,
633 finalized_block_height: u64,
634 },
635 GrandpaCommitMessage {
637 peer_id: PeerId,
638 message: service::EncodedGrandpaCommitMessage,
639 },
640}
641
642#[derive(Debug, derive_more::Display, derive_more::Error)]
644pub enum BlocksRequestError {
645 NoConnection,
647 #[display("{_0}")]
649 Request(service::BlocksRequestError),
650}
651
652#[derive(Debug, derive_more::Display, derive_more::Error)]
654pub enum WarpSyncRequestError {
655 NoConnection,
657 #[display("{_0}")]
659 Request(service::GrandpaWarpSyncRequestError),
660}
661
662#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
664pub enum StorageProofRequestError {
665 NoConnection,
667 RequestTooLarge,
669 #[display("{_0}")]
671 Request(service::StorageProofRequestError),
672}
673
674#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
676pub enum CallProofRequestError {
677 NoConnection,
679 RequestTooLarge,
681 #[display("{_0}")]
683 Request(service::CallProofRequestError),
684}
685
686impl CallProofRequestError {
687 pub fn is_network_problem(&self) -> bool {
690 match self {
691 CallProofRequestError::Request(err) => err.is_network_problem(),
692 CallProofRequestError::RequestTooLarge => false,
693 CallProofRequestError::NoConnection => true,
694 }
695 }
696}
697
698#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
700pub enum ChildStorageProofRequestError {
701 NoConnection,
703 RequestTooLarge,
705 #[display("{_0}")]
707 Request(service::StorageProofRequestError),
708}
709
710impl ChildStorageProofRequestError {
711 pub fn is_network_problem(&self) -> bool {
714 match self {
715 ChildStorageProofRequestError::Request(err) => err.is_network_problem(),
716 ChildStorageProofRequestError::RequestTooLarge => false,
717 ChildStorageProofRequestError::NoConnection => true,
718 }
719 }
720}
721
722struct ChildStorageProofRequestConfigOwned {
724 block_hash: [u8; 32],
725 child_trie: Vec<u8>,
726 keys: Vec<Vec<u8>>,
727}
728
729enum ToBackground<TPlat: PlatformRef> {
730 AddChain {
731 messages_rx: async_channel::Receiver<ToBackgroundChain>,
732 config: service::ChainConfig<Chain<TPlat>>,
733 },
734}
735
736enum ToBackgroundChain {
737 RemoveChain,
738 Subscribe {
739 sender: async_channel::Sender<Event>,
740 },
741 DisconnectAndBan {
742 peer_id: PeerId,
743 severity: BanSeverity,
744 reason: &'static str,
745 },
746 StartBlocksRequest {
748 target: PeerId, config: codec::BlocksRequestConfig,
750 timeout: Duration,
751 result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
752 },
753 StartWarpSyncRequest {
755 target: PeerId,
756 begin_hash: [u8; 32],
757 timeout: Duration,
758 result:
759 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
760 },
761 StartStorageProofRequest {
763 target: PeerId,
764 config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
765 timeout: Duration,
766 result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
767 },
768 StartCallProofRequest {
770 target: PeerId, config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
772 timeout: Duration,
773 result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
774 },
775 StartChildStorageProofRequest {
777 target: PeerId,
778 config: ChildStorageProofRequestConfigOwned,
779 timeout: Duration,
780 result: oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
781 },
782 SetLocalBestBlock {
783 best_hash: [u8; 32],
784 best_number: u64,
785 },
786 SetLocalGrandpaState {
787 grandpa_state: service::GrandpaState,
788 },
789 AnnounceTransaction {
790 transaction: Vec<u8>,
791 result: oneshot::Sender<Vec<PeerId>>,
792 },
793 SendBlockAnnounce {
794 target: PeerId,
795 scale_encoded_header: Vec<u8>,
796 is_best: bool,
797 result: oneshot::Sender<Result<(), QueueNotificationError>>,
798 },
799 Discover {
800 list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
801 important_nodes: bool,
802 },
803 DiscoveredNodes {
804 result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
805 },
806 PeersList {
807 result: oneshot::Sender<Vec<PeerId>>,
808 },
809}
810
811struct BackgroundTask<TPlat: PlatformRef> {
812 platform: TPlat,
814
815 randomness: rand_chacha::ChaCha20Rng,
817
818 identify_agent_version: String,
820
821 tasks_messages_tx:
823 async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
824
825 tasks_messages_rx: Pin<
827 Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
828 >,
829
830 network: service::ChainNetwork<
832 Chain<TPlat>,
833 async_channel::Sender<service::CoordinatorToConnection>,
834 TPlat::Instant,
835 >,
836
837 peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
839
840 connections_open_pool_size: u32,
842
843 connections_open_pool_restore_delay: Duration,
845
846 num_recent_connection_opening: u32,
850
851 next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
853
854 open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
857
858 important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
861
862 event_pending_send: Option<(ChainId, Event)>,
864
865 event_senders: either::Either<
871 Vec<(ChainId, async_channel::Sender<Event>)>,
872 Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
873 >,
874
875 pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
878
879 main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
880
881 messages_rx:
882 stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
883
884 blocks_requests: HashMap<
885 service::SubstreamId,
886 oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
887 fnv::FnvBuildHasher,
888 >,
889
890 grandpa_warp_sync_requests: HashMap<
891 service::SubstreamId,
892 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
893 fnv::FnvBuildHasher,
894 >,
895
896 storage_proof_requests: HashMap<
897 service::SubstreamId,
898 oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
899 fnv::FnvBuildHasher,
900 >,
901
902 call_proof_requests: HashMap<
903 service::SubstreamId,
904 oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
905 fnv::FnvBuildHasher,
906 >,
907
908 child_storage_proof_requests: HashMap<
909 service::SubstreamId,
910 oneshot::Sender<Result<service::EncodedMerkleProof, ChildStorageProofRequestError>>,
911 fnv::FnvBuildHasher,
912 >,
913
914 chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
916}
917
918struct Chain<TPlat: PlatformRef> {
919 log_name: String,
920
921 num_references: NonZero<usize>,
923
924 block_number_bytes: usize,
927
928 num_out_slots: usize,
930
931 next_discovery_when: TPlat::Instant,
933
934 next_discovery_period: Duration,
937}
938
939#[derive(Clone)]
940struct OpenGossipLinkState {
941 role: Role,
942 best_block_number: u64,
943 best_block_hash: [u8; 32],
944 finalized_block_height: Option<u64>,
946}
947
948async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
949 loop {
950 futures_lite::future::yield_now().await;
952
953 enum WakeUpReason<TPlat: PlatformRef> {
954 ForegroundClosed,
955 Message(ToBackground<TPlat>),
956 MessageForChain(ChainId, ToBackgroundChain),
957 NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
958 CanAssignSlot(PeerId, ChainId),
959 NextRecentConnectionRestore,
960 CanStartConnect(PeerId),
961 CanOpenGossip(PeerId, ChainId),
962 MessageFromConnection {
963 connection_id: service::ConnectionId,
964 message: service::ConnectionToCoordinator,
965 },
966 MessageToConnection {
967 connection_id: service::ConnectionId,
968 message: service::CoordinatorToConnection,
969 },
970 EventSendersReady,
971 StartDiscovery(ChainId),
972 }
973
974 let wake_up_reason = {
975 let message_received = async {
976 task.main_messages_rx
977 .next()
978 .await
979 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
980 };
981 let message_for_chain_received = async {
982 let Some((chain_id, message)) = task.messages_rx.next().await else {
987 future::pending().await
988 };
989 WakeUpReason::MessageForChain(chain_id, message)
990 };
991 let message_from_task_received = async {
992 let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
993 WakeUpReason::MessageFromConnection {
994 connection_id,
995 message,
996 }
997 };
998 let service_event = async {
999 if let Some(event) = (task.event_pending_send.is_none()
1000 && task.pending_new_subscriptions.is_empty())
1001 .then(|| task.network.next_event())
1002 .flatten()
1003 {
1004 WakeUpReason::NetworkEvent(event)
1005 } else if let Some(start_connect) = {
1006 let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
1007 .then(|| {
1008 task.network
1009 .unconnected_desired()
1010 .choose(&mut task.randomness)
1011 .cloned()
1012 })
1013 .flatten();
1014 x
1015 } {
1016 WakeUpReason::CanStartConnect(start_connect)
1017 } else if let Some((peer_id, chain_id)) = {
1018 let x = task
1019 .network
1020 .connected_unopened_gossip_desired()
1021 .choose(&mut task.randomness)
1022 .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
1023 x
1024 } {
1025 WakeUpReason::CanOpenGossip(peer_id, chain_id)
1026 } else if let Some((connection_id, message)) =
1027 task.network.pull_message_to_connection()
1028 {
1029 WakeUpReason::MessageToConnection {
1030 connection_id,
1031 message,
1032 }
1033 } else {
1034 'search: loop {
1035 let mut earlier_unban = None;
1036
1037 for chain_id in task.network.chains().collect::<Vec<_>>() {
1038 if task.network.gossip_desired_num(
1039 chain_id,
1040 service::GossipKind::ConsensusTransactions,
1041 ) >= task.network[chain_id].num_out_slots
1042 {
1043 continue;
1044 }
1045
1046 match task
1047 .peering_strategy
1048 .pick_assignable_peer(&chain_id, &task.platform.now())
1049 {
1050 basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
1051 break 'search WakeUpReason::CanAssignSlot(
1052 peer_id.clone(),
1053 chain_id,
1054 );
1055 }
1056 basic_peering_strategy::AssignablePeer::AllPeersBanned {
1057 next_unban,
1058 } => {
1059 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
1060 earlier_unban = Some(next_unban.clone());
1061 }
1062 }
1063 basic_peering_strategy::AssignablePeer::NoPeer => continue,
1064 }
1065 }
1066
1067 if let Some(earlier_unban) = earlier_unban {
1068 task.platform.sleep_until(earlier_unban).await;
1069 } else {
1070 future::pending::<()>().await;
1071 }
1072 }
1073 }
1074 };
1075 let next_recent_connection_restore = async {
1076 if task.num_recent_connection_opening != 0
1077 && task.next_recent_connection_restore.is_none()
1078 {
1079 task.next_recent_connection_restore = Some(Box::pin(
1080 task.platform
1081 .sleep(task.connections_open_pool_restore_delay),
1082 ));
1083 }
1084 if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1085 delay.await;
1086 task.next_recent_connection_restore = None;
1087 WakeUpReason::NextRecentConnectionRestore
1088 } else {
1089 future::pending().await
1090 }
1091 };
1092 let finished_sending_event = async {
1093 if let either::Right(event_sending_future) = &mut task.event_senders {
1094 let event_senders = event_sending_future.await;
1095 task.event_senders = either::Left(event_senders);
1096 WakeUpReason::EventSendersReady
1097 } else if task.event_pending_send.is_some()
1098 || !task.pending_new_subscriptions.is_empty()
1099 {
1100 WakeUpReason::EventSendersReady
1101 } else {
1102 future::pending().await
1103 }
1104 };
1105 let start_discovery = async {
1106 let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1107 future::pending().await
1108 };
1109 next_discovery.get_mut().await;
1110 let ((_, chain_id), _) = next_discovery.remove_entry();
1111 WakeUpReason::StartDiscovery(chain_id)
1112 };
1113
1114 message_for_chain_received
1115 .or(message_received)
1116 .or(message_from_task_received)
1117 .or(service_event)
1118 .or(next_recent_connection_restore)
1119 .or(finished_sending_event)
1120 .or(start_discovery)
1121 .await
1122 };
1123
1124 match wake_up_reason {
1125 WakeUpReason::ForegroundClosed => {
1126 return;
1128 }
1129 WakeUpReason::Message(ToBackground::AddChain {
1130 messages_rx,
1131 config,
1132 }) => {
1133 let chain_id = match task.network.add_chain(config) {
1135 Ok(id) => id,
1136 Err(service::AddChainError::Duplicate { existing_identical }) => {
1137 task.network[existing_identical].num_references = task.network
1138 [existing_identical]
1139 .num_references
1140 .checked_add(1)
1141 .unwrap();
1142 existing_identical
1143 }
1144 };
1145
1146 task.chains_by_next_discovery.insert(
1147 (task.network[chain_id].next_discovery_when.clone(), chain_id),
1148 Box::pin(
1149 task.platform
1150 .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1151 ),
1152 );
1153
1154 task.messages_rx
1155 .push(Box::pin(
1156 messages_rx
1157 .map(move |msg| (chain_id, msg))
1158 .chain(stream::once(future::ready((
1159 chain_id,
1160 ToBackgroundChain::RemoveChain,
1161 )))),
1162 ) as Pin<Box<_>>);
1163
1164 log!(
1165 &task.platform,
1166 Debug,
1167 "network",
1168 "chain-added",
1169 id = task.network[chain_id].log_name
1170 );
1171 }
1172 WakeUpReason::EventSendersReady => {
1173 let either::Left(event_senders) = &mut task.event_senders else {
1177 unreachable!()
1178 };
1179
1180 if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1181 task.event_pending_send.take()
1182 {
1183 let mut event_senders = mem::take(event_senders);
1184 task.event_senders = either::Right(Box::pin(async move {
1185 for index in (0..event_senders.len()).rev() {
1188 let (event_sender_chain_id, event_sender) =
1189 event_senders.swap_remove(index);
1190 if event_sender_chain_id == event_to_dispatch_chain_id {
1191 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1192 continue;
1193 }
1194 }
1195 event_senders.push((event_sender_chain_id, event_sender));
1196 }
1197 event_senders
1198 }));
1199 } else if !task.pending_new_subscriptions.is_empty() {
1200 let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1201 let mut event_senders = mem::take(event_senders);
1202 let open_gossip_links = task.open_gossip_links.clone();
1204 task.event_senders = either::Right(Box::pin(async move {
1205 for (chain_id, new_subscription) in pending_new_subscriptions {
1206 for ((link_chain_id, peer_id), state) in &open_gossip_links {
1207 if *link_chain_id != chain_id {
1209 continue;
1210 }
1211
1212 let _ = new_subscription
1213 .send(Event::Connected {
1214 peer_id: peer_id.clone(),
1215 role: state.role,
1216 best_block_number: state.best_block_number,
1217 best_block_hash: state.best_block_hash,
1218 })
1219 .await;
1220
1221 if let Some(finalized_block_height) = state.finalized_block_height {
1222 let _ = new_subscription
1223 .send(Event::GrandpaNeighborPacket {
1224 peer_id: peer_id.clone(),
1225 finalized_block_height,
1226 })
1227 .await;
1228 }
1229 }
1230
1231 event_senders.push((chain_id, new_subscription));
1232 }
1233
1234 event_senders
1235 }));
1236 }
1237 }
1238 WakeUpReason::MessageFromConnection {
1239 connection_id,
1240 message,
1241 } => {
1242 task.network
1243 .inject_connection_message(connection_id, message);
1244 }
1245 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1246 if let Some(new_ref) =
1247 NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1248 {
1249 task.network[chain_id].num_references = new_ref;
1250 continue;
1251 }
1252
1253 for peer_id in task
1254 .network
1255 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1256 .cloned()
1257 .collect::<Vec<_>>()
1258 {
1259 task.network
1260 .gossip_close(
1261 chain_id,
1262 &peer_id,
1263 service::GossipKind::ConsensusTransactions,
1264 )
1265 .unwrap();
1266
1267 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1268 debug_assert!(_was_in.is_some());
1269 }
1270
1271 let _was_in = task
1272 .chains_by_next_discovery
1273 .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1274 debug_assert!(_was_in.is_some());
1275
1276 log!(
1277 &task.platform,
1278 Debug,
1279 "network",
1280 "chain-removed",
1281 id = task.network[chain_id].log_name
1282 );
1283 task.network.remove_chain(chain_id).unwrap();
1284 task.peering_strategy.remove_chain_peers(&chain_id);
1285 }
1286 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1287 task.pending_new_subscriptions.push((chain_id, sender));
1288 }
1289 WakeUpReason::MessageForChain(
1290 chain_id,
1291 ToBackgroundChain::DisconnectAndBan {
1292 peer_id,
1293 severity,
1294 reason,
1295 },
1296 ) => {
1297 let ban_duration = Duration::from_secs(match severity {
1298 BanSeverity::Low => 10,
1299 BanSeverity::High => 40,
1300 });
1301
1302 let had_slot = matches!(
1303 task.peering_strategy.unassign_slot_and_ban(
1304 &chain_id,
1305 &peer_id,
1306 task.platform.now() + ban_duration,
1307 ),
1308 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1309 );
1310
1311 if had_slot {
1312 log!(
1313 &task.platform,
1314 Debug,
1315 "network",
1316 "slot-unassigned",
1317 chain = &task.network[chain_id].log_name,
1318 peer_id,
1319 ?ban_duration,
1320 reason = "user-ban",
1321 user_reason = reason
1322 );
1323 task.network.gossip_remove_desired(
1324 chain_id,
1325 &peer_id,
1326 service::GossipKind::ConsensusTransactions,
1327 );
1328 }
1329
1330 if task.network.gossip_is_connected(
1331 chain_id,
1332 &peer_id,
1333 service::GossipKind::ConsensusTransactions,
1334 ) {
1335 let _closed_result = task.network.gossip_close(
1336 chain_id,
1337 &peer_id,
1338 service::GossipKind::ConsensusTransactions,
1339 );
1340 debug_assert!(_closed_result.is_ok());
1341
1342 log!(
1343 &task.platform,
1344 Debug,
1345 "network",
1346 "gossip-closed",
1347 chain = &task.network[chain_id].log_name,
1348 peer_id,
1349 );
1350
1351 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1352 debug_assert!(_was_in.is_some());
1353
1354 debug_assert!(task.event_pending_send.is_none());
1355 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1356 }
1357 }
1358 WakeUpReason::MessageForChain(
1359 chain_id,
1360 ToBackgroundChain::StartBlocksRequest {
1361 target,
1362 config,
1363 timeout,
1364 result,
1365 },
1366 ) => {
1367 match &config.start {
1368 codec::BlocksRequestConfigStart::Hash(hash) => {
1369 log!(
1370 &task.platform,
1371 Debug,
1372 "network",
1373 "blocks-request-started",
1374 chain = task.network[chain_id].log_name, target,
1375 start = HashDisplay(hash),
1376 num = config.desired_count.get(),
1377 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1378 header = ?config.fields.header, body = ?config.fields.body,
1379 justifications = ?config.fields.justifications
1380 );
1381 }
1382 codec::BlocksRequestConfigStart::Number(number) => {
1383 log!(
1384 &task.platform,
1385 Debug,
1386 "network",
1387 "blocks-request-started",
1388 chain = task.network[chain_id].log_name, target, start = number,
1389 num = config.desired_count.get(),
1390 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1391 header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1392 );
1393 }
1394 }
1395
1396 match task
1397 .network
1398 .start_blocks_request(&target, chain_id, config.clone(), timeout)
1399 {
1400 Ok(substream_id) => {
1401 task.blocks_requests.insert(substream_id, result);
1402 }
1403 Err(service::StartRequestError::NoConnection) => {
1404 log!(
1405 &task.platform,
1406 Debug,
1407 "network",
1408 "blocks-request-error",
1409 chain = task.network[chain_id].log_name,
1410 target,
1411 error = "NoConnection"
1412 );
1413 let _ = result.send(Err(BlocksRequestError::NoConnection));
1414 }
1415 }
1416 }
1417 WakeUpReason::MessageForChain(
1418 chain_id,
1419 ToBackgroundChain::StartWarpSyncRequest {
1420 target,
1421 begin_hash,
1422 timeout,
1423 result,
1424 },
1425 ) => {
1426 log!(
1427 &task.platform,
1428 Debug,
1429 "network",
1430 "warp-sync-request-started",
1431 chain = task.network[chain_id].log_name,
1432 target,
1433 start = HashDisplay(&begin_hash)
1434 );
1435
1436 match task
1437 .network
1438 .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1439 {
1440 Ok(substream_id) => {
1441 task.grandpa_warp_sync_requests.insert(substream_id, result);
1442 }
1443 Err(service::StartRequestError::NoConnection) => {
1444 log!(
1445 &task.platform,
1446 Debug,
1447 "network",
1448 "warp-sync-request-error",
1449 chain = task.network[chain_id].log_name,
1450 target,
1451 error = "NoConnection"
1452 );
1453 let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1454 }
1455 }
1456 }
1457 WakeUpReason::MessageForChain(
1458 chain_id,
1459 ToBackgroundChain::StartStorageProofRequest {
1460 target,
1461 config,
1462 timeout,
1463 result,
1464 },
1465 ) => {
1466 log!(
1467 &task.platform,
1468 Debug,
1469 "network",
1470 "storage-proof-request-started",
1471 chain = task.network[chain_id].log_name,
1472 target,
1473 block_hash = HashDisplay(&config.block_hash)
1474 );
1475
1476 match task.network.start_storage_proof_request(
1477 &target,
1478 chain_id,
1479 config.clone(),
1480 timeout,
1481 ) {
1482 Ok(substream_id) => {
1483 task.storage_proof_requests.insert(substream_id, result);
1484 }
1485 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1486 log!(
1487 &task.platform,
1488 Debug,
1489 "network",
1490 "storage-proof-request-error",
1491 chain = task.network[chain_id].log_name,
1492 target,
1493 error = "NoConnection"
1494 );
1495 let _ = result.send(Err(StorageProofRequestError::NoConnection));
1496 }
1497 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1498 log!(
1499 &task.platform,
1500 Debug,
1501 "network",
1502 "storage-proof-request-error",
1503 chain = task.network[chain_id].log_name,
1504 target,
1505 error = "RequestTooLarge"
1506 );
1507 let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1508 }
1509 };
1510 }
1511 WakeUpReason::MessageForChain(
1512 chain_id,
1513 ToBackgroundChain::StartCallProofRequest {
1514 target,
1515 config,
1516 timeout,
1517 result,
1518 },
1519 ) => {
1520 log!(
1521 &task.platform,
1522 Debug,
1523 "network",
1524 "call-proof-request-started",
1525 chain = task.network[chain_id].log_name,
1526 target,
1527 block_hash = HashDisplay(&config.block_hash),
1528 function = config.method
1529 );
1530 match task.network.start_call_proof_request(
1533 &target,
1534 chain_id,
1535 config.clone(),
1536 timeout,
1537 ) {
1538 Ok(substream_id) => {
1539 task.call_proof_requests.insert(substream_id, result);
1540 }
1541 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1542 log!(
1543 &task.platform,
1544 Debug,
1545 "network",
1546 "call-proof-request-error",
1547 chain = task.network[chain_id].log_name,
1548 target,
1549 error = "NoConnection"
1550 );
1551 let _ = result.send(Err(CallProofRequestError::NoConnection));
1552 }
1553 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1554 log!(
1555 &task.platform,
1556 Debug,
1557 "network",
1558 "call-proof-request-error",
1559 chain = task.network[chain_id].log_name,
1560 target,
1561 error = "RequestTooLarge"
1562 );
1563 let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1564 }
1565 };
1566 }
1567 WakeUpReason::MessageForChain(
1568 chain_id,
1569 ToBackgroundChain::StartChildStorageProofRequest {
1570 target,
1571 config,
1572 timeout,
1573 result,
1574 },
1575 ) => {
1576 log!(
1577 &task.platform,
1578 Debug,
1579 "network",
1580 "child-storage-proof-request-started",
1581 chain = task.network[chain_id].log_name,
1582 target,
1583 block_hash = HashDisplay(&config.block_hash)
1584 );
1585
1586 match task.network.start_child_storage_proof_request(
1587 &target,
1588 chain_id,
1589 codec::ChildStorageProofRequestConfig {
1590 block_hash: config.block_hash,
1591 child_trie: &config.child_trie,
1592 keys: config.keys.iter().map(|k| k.as_slice()),
1593 },
1594 timeout,
1595 ) {
1596 Ok(substream_id) => {
1597 task.child_storage_proof_requests
1598 .insert(substream_id, result);
1599 }
1600 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1601 log!(
1602 &task.platform,
1603 Debug,
1604 "network",
1605 "child-storage-proof-request-error",
1606 chain = task.network[chain_id].log_name,
1607 target,
1608 error = "NoConnection"
1609 );
1610 let _ = result.send(Err(ChildStorageProofRequestError::NoConnection));
1611 }
1612 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1613 log!(
1614 &task.platform,
1615 Debug,
1616 "network",
1617 "child-storage-proof-request-error",
1618 chain = task.network[chain_id].log_name,
1619 target,
1620 error = "RequestTooLarge"
1621 );
1622 let _ = result.send(Err(ChildStorageProofRequestError::RequestTooLarge));
1623 }
1624 };
1625 }
1626 WakeUpReason::MessageForChain(
1627 chain_id,
1628 ToBackgroundChain::SetLocalBestBlock {
1629 best_hash,
1630 best_number,
1631 },
1632 ) => {
1633 task.network
1634 .set_chain_local_best_block(chain_id, best_hash, best_number);
1635 }
1636 WakeUpReason::MessageForChain(
1637 chain_id,
1638 ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1639 ) => {
1640 log!(
1641 &task.platform,
1642 Debug,
1643 "network",
1644 "local-grandpa-state-announced",
1645 chain = task.network[chain_id].log_name,
1646 set_id = grandpa_state.set_id,
1647 commit_finalized_height = grandpa_state.commit_finalized_height,
1648 );
1649
1650 task.network
1653 .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1654 }
1655 WakeUpReason::MessageForChain(
1656 chain_id,
1657 ToBackgroundChain::AnnounceTransaction {
1658 transaction,
1659 result,
1660 },
1661 ) => {
1662 let peers_to_send = task
1665 .network
1666 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1667 .cloned()
1668 .collect::<Vec<_>>();
1669
1670 let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1671 let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1672 for peer in &peers_to_send {
1673 match task
1674 .network
1675 .gossip_send_transaction(peer, chain_id, &transaction)
1676 {
1677 Ok(()) => peers_sent.push(peer.to_base58()),
1678 Err(QueueNotificationError::QueueFull) => {
1679 peers_queue_full.push(peer.to_base58())
1680 }
1681 Err(QueueNotificationError::NoConnection) => unreachable!(),
1682 }
1683 }
1684
1685 log!(
1686 &task.platform,
1687 Debug,
1688 "network",
1689 "transaction-announced",
1690 chain = task.network[chain_id].log_name,
1691 transaction =
1692 hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
1693 size = transaction.len(),
1694 peers_sent = peers_sent.join(", "),
1695 peers_queue_full = peers_queue_full.join(", "),
1696 );
1697
1698 let _ = result.send(peers_to_send);
1699 }
1700 WakeUpReason::MessageForChain(
1701 chain_id,
1702 ToBackgroundChain::SendBlockAnnounce {
1703 target,
1704 scale_encoded_header,
1705 is_best,
1706 result,
1707 },
1708 ) => {
1709 let _ = result.send(task.network.gossip_send_block_announce(
1711 &target,
1712 chain_id,
1713 &scale_encoded_header,
1714 is_best,
1715 ));
1716 }
1717 WakeUpReason::MessageForChain(
1718 chain_id,
1719 ToBackgroundChain::Discover {
1720 list,
1721 important_nodes,
1722 },
1723 ) => {
1724 for (peer_id, addrs) in list {
1725 if important_nodes {
1726 task.important_nodes.insert(peer_id.clone());
1727 }
1728
1729 task.peering_strategy
1732 .insert_chain_peer(chain_id, peer_id.clone(), 30); for addr in addrs {
1735 let _ =
1736 task.peering_strategy
1737 .insert_address(&peer_id, addr.into_bytes(), 10);
1738 }
1740 }
1741 }
1742 WakeUpReason::MessageForChain(
1743 chain_id,
1744 ToBackgroundChain::DiscoveredNodes { result },
1745 ) => {
1746 let _ = result.send(
1748 task.peering_strategy
1749 .chain_peers_unordered(&chain_id)
1750 .map(|peer_id| {
1751 let addrs = task
1752 .peering_strategy
1753 .peer_addresses(peer_id)
1754 .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
1755 .collect::<Vec<_>>();
1756 (peer_id.clone(), addrs)
1757 })
1758 .collect::<Vec<_>>(),
1759 );
1760 }
1761 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
1762 let _ = result.send(
1763 task.network
1764 .gossip_connected_peers(
1765 chain_id,
1766 service::GossipKind::ConsensusTransactions,
1767 )
1768 .cloned()
1769 .collect(),
1770 );
1771 }
1772 WakeUpReason::StartDiscovery(chain_id) => {
1773 let chain = &mut task.network[chain_id];
1775 chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
1776 chain.next_discovery_period =
1777 cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
1778 task.chains_by_next_discovery.insert(
1779 (chain.next_discovery_when.clone(), chain_id),
1780 Box::pin(
1781 task.platform
1782 .sleep(task.network[chain_id].next_discovery_period),
1783 ),
1784 );
1785
1786 let random_peer_id = {
1787 let mut pub_key = [0; 32];
1788 rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
1789 PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
1790 };
1791
1792 let target = task
1794 .network
1795 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1796 .next()
1797 .cloned();
1798
1799 if let Some(target) = target {
1800 match task.network.start_kademlia_find_node_request(
1801 &target,
1802 chain_id,
1803 &random_peer_id,
1804 Duration::from_secs(20),
1805 ) {
1806 Ok(_) => {}
1807 Err(service::StartRequestError::NoConnection) => unreachable!(),
1808 };
1809
1810 log!(
1811 &task.platform,
1812 Debug,
1813 "network",
1814 "discovery-find-node-started",
1815 chain = &task.network[chain_id].log_name,
1816 request_target = target,
1817 requested_peer_id = random_peer_id
1818 );
1819 } else {
1820 log!(
1821 &task.platform,
1822 Debug,
1823 "network",
1824 "discovery-skipped-no-peer",
1825 chain = &task.network[chain_id].log_name
1826 );
1827 }
1828 }
1829 WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1830 peer_id,
1831 expected_peer_id,
1832 id,
1833 }) => {
1834 let remote_addr =
1835 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
1837 {
1838 log!(
1839 &task.platform,
1840 Debug,
1841 "network",
1842 "handshake-finished-peer-id-mismatch",
1843 remote_addr,
1844 expected_peer_id,
1845 actual_peer_id = peer_id
1846 );
1847
1848 let _was_in = task
1849 .peering_strategy
1850 .decrease_address_connections_and_remove_if_zero(
1851 expected_peer_id,
1852 remote_addr.as_ref(),
1853 );
1854 debug_assert!(_was_in.is_ok());
1855 let _ = task.peering_strategy.increase_address_connections(
1856 &peer_id,
1857 remote_addr.into_bytes().to_vec(),
1858 10,
1859 );
1860 } else {
1861 log!(
1862 &task.platform,
1863 Debug,
1864 "network",
1865 "handshake-finished",
1866 remote_addr,
1867 peer_id
1868 );
1869 }
1870 }
1871 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1872 expected_peer_id: Some(_),
1873 ..
1874 })
1875 | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1876 let (address, peer_id, handshake_finished) = match wake_up_reason {
1877 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1878 address,
1879 expected_peer_id: Some(peer_id),
1880 ..
1881 }) => (address, peer_id, false),
1882 WakeUpReason::NetworkEvent(service::Event::Disconnected {
1883 address,
1884 peer_id,
1885 ..
1886 }) => (address, peer_id, true),
1887 _ => unreachable!(),
1888 };
1889
1890 task.peering_strategy
1891 .decrease_address_connections(&peer_id, &address)
1892 .unwrap();
1893 let address = Multiaddr::from_bytes(address).unwrap();
1894 log!(
1895 &task.platform,
1896 Debug,
1897 "network",
1898 "connection-shutdown",
1899 peer_id,
1900 address,
1901 ?handshake_finished
1902 );
1903
1904 let ban_duration = Duration::from_secs(5);
1912 task.network.gossip_remove_desired_all(
1913 &peer_id,
1914 service::GossipKind::ConsensusTransactions,
1915 );
1916 for (&chain_id, what_happened) in task
1917 .peering_strategy
1918 .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
1919 {
1920 if matches!(
1921 what_happened,
1922 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1923 ) {
1924 log!(
1925 &task.platform,
1926 Debug,
1927 "network",
1928 "slot-unassigned",
1929 chain = &task.network[chain_id].log_name,
1930 peer_id,
1931 ?ban_duration,
1932 reason = "pre-handshake-disconnect"
1933 );
1934 }
1935 }
1936 }
1937 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1938 expected_peer_id: None,
1939 ..
1940 }) => {
1941 debug_assert!(false);
1944 }
1945 WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1946 id,
1947 peer_id,
1948 ping_time,
1949 }) => {
1950 let remote_addr =
1951 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); log!(
1953 &task.platform,
1954 Debug,
1955 "network",
1956 "pong",
1957 peer_id,
1958 remote_addr,
1959 ?ping_time
1960 );
1961 }
1962 WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1963 chain_id,
1964 peer_id,
1965 announce,
1966 }) => {
1967 log!(
1968 &task.platform,
1969 Debug,
1970 "network",
1971 "block-announce-received",
1972 chain = &task.network[chain_id].log_name,
1973 peer_id,
1974 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
1975 announce.decode().scale_encoded_header
1976 )),
1977 is_best = announce.decode().is_best
1978 );
1979
1980 let decoded_announce = announce.decode();
1981 if decoded_announce.is_best {
1982 let link = task
1983 .open_gossip_links
1984 .get_mut(&(chain_id, peer_id.clone()))
1985 .unwrap();
1986 if let Ok(decoded) = header::decode(
1987 decoded_announce.scale_encoded_header,
1988 task.network[chain_id].block_number_bytes,
1989 ) {
1990 link.best_block_hash = header::hash_from_scale_encoded_header(
1991 decoded_announce.scale_encoded_header,
1992 );
1993 link.best_block_number = decoded.number;
1994 }
1995 }
1996
1997 debug_assert!(task.event_pending_send.is_none());
1998 task.event_pending_send =
1999 Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
2000 }
2001 WakeUpReason::NetworkEvent(service::Event::GossipConnected {
2002 peer_id,
2003 chain_id,
2004 role,
2005 best_number,
2006 best_hash,
2007 kind: service::GossipKind::ConsensusTransactions,
2008 }) => {
2009 log!(
2010 &task.platform,
2011 Debug,
2012 "network",
2013 "gossip-open-success",
2014 chain = &task.network[chain_id].log_name,
2015 peer_id,
2016 best_number,
2017 best_hash = HashDisplay(&best_hash)
2018 );
2019
2020 let _prev_value = task.open_gossip_links.insert(
2021 (chain_id, peer_id.clone()),
2022 OpenGossipLinkState {
2023 best_block_number: best_number,
2024 best_block_hash: best_hash,
2025 role,
2026 finalized_block_height: None,
2027 },
2028 );
2029 debug_assert!(_prev_value.is_none());
2030
2031 debug_assert!(task.event_pending_send.is_none());
2032 task.event_pending_send = Some((
2033 chain_id,
2034 Event::Connected {
2035 peer_id,
2036 role,
2037 best_block_number: best_number,
2038 best_block_hash: best_hash,
2039 },
2040 ));
2041 }
2042 WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
2043 peer_id,
2044 chain_id,
2045 error,
2046 kind: service::GossipKind::ConsensusTransactions,
2047 }) => {
2048 log!(
2049 &task.platform,
2050 Debug,
2051 "network",
2052 "gossip-open-error",
2053 chain = &task.network[chain_id].log_name,
2054 peer_id,
2055 ?error,
2056 );
2057 let ban_duration = Duration::from_secs(15);
2058
2059 let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
2062 matches!(
2063 task.peering_strategy
2064 .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
2065 basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
2066 )
2067 } else {
2068 matches!(
2069 task.peering_strategy.unassign_slot_and_ban(
2070 &chain_id,
2071 &peer_id,
2072 task.platform.now() + ban_duration,
2073 ),
2074 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2075 )
2076 };
2077
2078 if had_slot {
2079 log!(
2080 &task.platform,
2081 Debug,
2082 "network",
2083 "slot-unassigned",
2084 chain = &task.network[chain_id].log_name,
2085 peer_id,
2086 ?ban_duration,
2087 reason = "gossip-open-failed"
2088 );
2089 task.network.gossip_remove_desired(
2090 chain_id,
2091 &peer_id,
2092 service::GossipKind::ConsensusTransactions,
2093 );
2094 }
2095 }
2096 WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
2097 peer_id,
2098 chain_id,
2099 kind: service::GossipKind::ConsensusTransactions,
2100 }) => {
2101 log!(
2102 &task.platform,
2103 Debug,
2104 "network",
2105 "gossip-closed",
2106 chain = &task.network[chain_id].log_name,
2107 peer_id,
2108 );
2109 let ban_duration = Duration::from_secs(10);
2110
2111 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
2112 debug_assert!(_was_in.is_some());
2113
2114 if matches!(
2117 task.peering_strategy.unassign_slot_and_ban(
2118 &chain_id,
2119 &peer_id,
2120 task.platform.now() + ban_duration,
2121 ),
2122 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
2123 ) {
2124 log!(
2125 &task.platform,
2126 Debug,
2127 "network",
2128 "slot-unassigned",
2129 chain = &task.network[chain_id].log_name,
2130 peer_id,
2131 ?ban_duration,
2132 reason = "gossip-closed"
2133 );
2134 task.network.gossip_remove_desired(
2135 chain_id,
2136 &peer_id,
2137 service::GossipKind::ConsensusTransactions,
2138 );
2139 }
2140
2141 debug_assert!(task.event_pending_send.is_none());
2142 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2143 }
2144 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2145 substream_id,
2146 peer_id,
2147 chain_id,
2148 response: service::RequestResult::Blocks(response),
2149 }) => {
2150 match &response {
2151 Ok(blocks) => {
2152 log!(
2153 &task.platform,
2154 Debug,
2155 "network",
2156 "blocks-request-success",
2157 chain = task.network[chain_id].log_name,
2158 target = peer_id,
2159 num_blocks = blocks.len(),
2160 block_data_total_size =
2161 BytesDisplay(blocks.iter().fold(0, |sum, block| {
2162 let block_size = block.header.as_ref().map_or(0, |h| h.len())
2163 + block
2164 .body
2165 .as_ref()
2166 .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2167 + block
2168 .justifications
2169 .as_ref()
2170 .into_iter()
2171 .flat_map(|l| l.iter())
2172 .fold(0, |s, j| s + j.justification.len());
2173 sum + u64::try_from(block_size).unwrap()
2174 }))
2175 );
2176 }
2177 Err(error) => {
2178 log!(
2179 &task.platform,
2180 Debug,
2181 "network",
2182 "blocks-request-error",
2183 chain = task.network[chain_id].log_name,
2184 target = peer_id,
2185 ?error
2186 );
2187 }
2188 }
2189
2190 match &response {
2191 Ok(_) => {}
2192 Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2193 Err(err) => {
2194 log!(
2195 &task.platform,
2196 Debug,
2197 "network",
2198 format!(
2199 "Error in block request with {}. This might indicate an \
2200 incompatibility. Error: {}",
2201 peer_id, err
2202 )
2203 );
2204 }
2205 }
2206
2207 let _ = task
2208 .blocks_requests
2209 .remove(&substream_id)
2210 .unwrap()
2211 .send(response.map_err(BlocksRequestError::Request));
2212 }
2213 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2214 substream_id,
2215 peer_id,
2216 chain_id,
2217 response: service::RequestResult::GrandpaWarpSync(response),
2218 }) => {
2219 match &response {
2220 Ok(response) => {
2221 let decoded = response.decode();
2223 log!(
2224 &task.platform,
2225 Debug,
2226 "network",
2227 "warp-sync-request-success",
2228 chain = task.network[chain_id].log_name,
2229 target = peer_id,
2230 num_fragments = decoded.fragments.len(),
2231 is_finished = ?decoded.is_finished,
2232 );
2233 }
2234 Err(error) => {
2235 log!(
2236 &task.platform,
2237 Debug,
2238 "network",
2239 "warp-sync-request-error",
2240 chain = task.network[chain_id].log_name,
2241 target = peer_id,
2242 ?error,
2243 );
2244 }
2245 }
2246
2247 let _ = task
2248 .grandpa_warp_sync_requests
2249 .remove(&substream_id)
2250 .unwrap()
2251 .send(response.map_err(WarpSyncRequestError::Request));
2252 }
2253 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2254 substream_id,
2255 peer_id,
2256 chain_id,
2257 response: service::RequestResult::StorageProof(response),
2258 }) => {
2259 match &response {
2260 Ok(items) => {
2261 let decoded = items.decode();
2262 log!(
2263 &task.platform,
2264 Debug,
2265 "network",
2266 "storage-proof-request-success",
2267 chain = task.network[chain_id].log_name,
2268 target = peer_id,
2269 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2270 );
2271 }
2272 Err(error) => {
2273 log!(
2274 &task.platform,
2275 Debug,
2276 "network",
2277 "storage-proof-request-error",
2278 chain = task.network[chain_id].log_name,
2279 target = peer_id,
2280 ?error
2281 );
2282 }
2283 }
2284
2285 if let Some(sender) = task.storage_proof_requests.remove(&substream_id) {
2288 let _ = sender.send(response.map_err(StorageProofRequestError::Request));
2289 } else if let Some(sender) = task.child_storage_proof_requests.remove(&substream_id)
2290 {
2291 let _ = sender.send(response.map_err(ChildStorageProofRequestError::Request));
2292 } else {
2293 unreachable!()
2294 }
2295 }
2296 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2297 substream_id,
2298 peer_id,
2299 chain_id,
2300 response: service::RequestResult::CallProof(response),
2301 }) => {
2302 match &response {
2303 Ok(items) => {
2304 let decoded = items.decode();
2305 log!(
2306 &task.platform,
2307 Debug,
2308 "network",
2309 "call-proof-request-success",
2310 chain = task.network[chain_id].log_name,
2311 target = peer_id,
2312 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2313 );
2314 }
2315 Err(error) => {
2316 log!(
2317 &task.platform,
2318 Debug,
2319 "network",
2320 "call-proof-request-error",
2321 chain = task.network[chain_id].log_name,
2322 target = peer_id,
2323 ?error
2324 );
2325 }
2326 }
2327
2328 let _ = task
2329 .call_proof_requests
2330 .remove(&substream_id)
2331 .unwrap()
2332 .send(response.map_err(CallProofRequestError::Request));
2333 }
2334 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2335 peer_id: requestee_peer_id,
2336 chain_id,
2337 response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2338 ..
2339 }) => {
2340 for (peer_id, mut addrs) in nodes {
2341 if addrs.len() >= 10 {
2344 addrs.truncate(10);
2345 }
2346
2347 let mut valid_addrs = Vec::with_capacity(addrs.len());
2348 for addr in addrs {
2349 match Multiaddr::from_bytes(addr) {
2350 Ok(a) => {
2351 if platform::address_parse::multiaddr_to_address(&a)
2352 .ok()
2353 .map_or(false, |addr| {
2354 task.platform.supports_connection_type((&addr).into())
2355 })
2356 {
2357 valid_addrs.push(a)
2358 } else {
2359 log!(
2360 &task.platform,
2361 Debug,
2362 "network",
2363 "discovered-address-not-supported",
2364 chain = &task.network[chain_id].log_name,
2365 peer_id,
2366 addr = &a,
2367 obtained_from = requestee_peer_id
2368 );
2369 }
2370 }
2371 Err((error, addr)) => {
2372 log!(
2373 &task.platform,
2374 Debug,
2375 "network",
2376 "discovered-address-invalid",
2377 chain = &task.network[chain_id].log_name,
2378 peer_id,
2379 error,
2380 addr = hex::encode(&addr),
2381 obtained_from = requestee_peer_id
2382 );
2383 }
2384 }
2385 }
2386
2387 if !valid_addrs.is_empty() {
2388 let insert_outcome =
2391 task.peering_strategy
2392 .insert_chain_peer(chain_id, peer_id.clone(), 30); if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2395 peer_removed,
2396 } = insert_outcome
2397 {
2398 if let Some(peer_removed) = peer_removed {
2399 log!(
2400 &task.platform,
2401 Debug,
2402 "network",
2403 "peer-purged-from-address-book",
2404 chain = &task.network[chain_id].log_name,
2405 peer_id = peer_removed,
2406 );
2407 }
2408
2409 log!(
2410 &task.platform,
2411 Debug,
2412 "network",
2413 "peer-discovered",
2414 chain = &task.network[chain_id].log_name,
2415 peer_id,
2416 addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), obtained_from = requestee_peer_id
2418 );
2419 }
2420 }
2421
2422 for addr in valid_addrs {
2423 let _insert_result =
2424 task.peering_strategy
2425 .insert_address(&peer_id, addr.into_bytes(), 10); debug_assert!(!matches!(
2427 _insert_result,
2428 basic_peering_strategy::InsertAddressResult::UnknownPeer
2429 ));
2430 }
2431 }
2432 }
2433 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2434 peer_id,
2435 chain_id,
2436 response: service::RequestResult::KademliaFindNode(Err(error)),
2437 ..
2438 }) => {
2439 log!(
2440 &task.platform,
2441 Debug,
2442 "network",
2443 "discovery-find-node-error",
2444 chain = &task.network[chain_id].log_name,
2445 ?error,
2446 find_node_target = peer_id,
2447 );
2448
2449 match error {
2452 service::KademliaFindNodeError::RequestFailed(err)
2453 if !err.is_protocol_error() => {}
2454
2455 service::KademliaFindNodeError::RequestFailed(
2456 service::RequestError::Substream(
2457 connection::established::RequestError::ProtocolNotAvailable,
2458 ),
2459 ) => {
2460 log!(
2462 &task.platform,
2463 Warn,
2464 "network",
2465 format!(
2466 "Problem during discovery on {}: protocol not available. \
2467 This might indicate that the version of Substrate used by \
2468 the chain doesn't include \
2469 <https://github.com/paritytech/substrate/pull/12545>.",
2470 &task.network[chain_id].log_name
2471 )
2472 );
2473 }
2474 _ => {
2475 log!(
2476 &task.platform,
2477 Debug,
2478 "network",
2479 format!(
2480 "Problem during discovery on {}: {}",
2481 &task.network[chain_id].log_name, error
2482 )
2483 );
2484 }
2485 }
2486 }
2487 WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
2488 unreachable!()
2490 }
2491 WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
2492 peer_id,
2493 chain_id,
2494 kind: service::GossipKind::ConsensusTransactions,
2495 }) => {
2496 if task
2501 .network
2502 .opened_gossip_undesired_by_chain(chain_id)
2503 .count()
2504 < 4
2505 {
2506 log!(
2507 &task.platform,
2508 Debug,
2509 "network",
2510 "gossip-in-request",
2511 chain = &task.network[chain_id].log_name,
2512 peer_id,
2513 outcome = "accepted"
2514 );
2515 task.network
2516 .gossip_open(
2517 chain_id,
2518 &peer_id,
2519 service::GossipKind::ConsensusTransactions,
2520 )
2521 .unwrap();
2522 } else {
2523 log!(
2524 &task.platform,
2525 Debug,
2526 "network",
2527 "gossip-in-request",
2528 chain = &task.network[chain_id].log_name,
2529 peer_id,
2530 outcome = "rejected",
2531 );
2532 task.network
2533 .gossip_close(
2534 chain_id,
2535 &peer_id,
2536 service::GossipKind::ConsensusTransactions,
2537 )
2538 .unwrap();
2539 }
2540 }
2541 WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
2542 unreachable!()
2544 }
2545 WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2546 peer_id,
2547 substream_id,
2548 }) => {
2549 log!(
2550 &task.platform,
2551 Debug,
2552 "network",
2553 "identify-request-received",
2554 peer_id,
2555 );
2556 task.network
2557 .respond_identify(substream_id, &task.identify_agent_version);
2558 }
2559 WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
2560 WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
2561 unreachable!()
2563 }
2564 WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2565 chain_id,
2566 peer_id,
2567 state,
2568 }) => {
2569 log!(
2570 &task.platform,
2571 Debug,
2572 "network",
2573 "grandpa-neighbor-packet-received",
2574 chain = &task.network[chain_id].log_name,
2575 peer_id,
2576 round_number = state.round_number,
2577 set_id = state.set_id,
2578 commit_finalized_height = state.commit_finalized_height,
2579 );
2580
2581 task.open_gossip_links
2582 .get_mut(&(chain_id, peer_id.clone()))
2583 .unwrap()
2584 .finalized_block_height = Some(state.commit_finalized_height);
2585
2586 debug_assert!(task.event_pending_send.is_none());
2587 task.event_pending_send = Some((
2588 chain_id,
2589 Event::GrandpaNeighborPacket {
2590 peer_id,
2591 finalized_block_height: state.commit_finalized_height,
2592 },
2593 ));
2594 }
2595 WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2596 chain_id,
2597 peer_id,
2598 message,
2599 }) => {
2600 log!(
2601 &task.platform,
2602 Debug,
2603 "network",
2604 "grandpa-commit-message-received",
2605 chain = &task.network[chain_id].log_name,
2606 peer_id,
2607 target_block_hash = HashDisplay(message.decode().target_hash),
2608 );
2609
2610 debug_assert!(task.event_pending_send.is_none());
2611 task.event_pending_send =
2612 Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
2613 }
2614 WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2615 log!(
2617 &task.platform,
2618 Warn,
2619 "network",
2620 "protocol-error",
2621 peer_id,
2622 ?error
2623 );
2624
2625 }
2627 WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2628 task.peering_strategy.assign_slot(&chain_id, &peer_id);
2629
2630 log!(
2631 &task.platform,
2632 Debug,
2633 "network",
2634 "slot-assigned",
2635 chain = &task.network[chain_id].log_name,
2636 peer_id
2637 );
2638
2639 task.network.gossip_insert_desired(
2640 chain_id,
2641 peer_id,
2642 service::GossipKind::ConsensusTransactions,
2643 );
2644 }
2645 WakeUpReason::NextRecentConnectionRestore => {
2646 task.num_recent_connection_opening =
2647 task.num_recent_connection_opening.saturating_sub(1);
2648 }
2649 WakeUpReason::CanStartConnect(expected_peer_id) => {
2650 let Some(multiaddr) = task
2651 .peering_strategy
2652 .pick_address_and_add_connection(&expected_peer_id)
2653 else {
2654 task.network.gossip_remove_desired_all(
2656 &expected_peer_id,
2657 service::GossipKind::ConsensusTransactions,
2658 );
2659 let ban_duration = Duration::from_secs(10);
2660 for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
2661 &expected_peer_id,
2662 task.platform.now() + ban_duration,
2663 ) {
2664 if matches!(
2665 what_happened,
2666 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2667 ) {
2668 log!(
2669 &task.platform,
2670 Debug,
2671 "network",
2672 "slot-unassigned",
2673 chain = &task.network[chain_id].log_name,
2674 peer_id = expected_peer_id,
2675 ?ban_duration,
2676 reason = "no-address"
2677 );
2678 }
2679 }
2680 continue;
2681 };
2682
2683 let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2684 Ok(a) => a,
2685 Err((multiaddr::FromBytesError, addr)) => {
2686 let _was_in = task
2688 .peering_strategy
2689 .decrease_address_connections_and_remove_if_zero(
2690 &expected_peer_id,
2691 &addr,
2692 );
2693 debug_assert!(_was_in.is_ok());
2694 continue;
2695 }
2696 };
2697
2698 let address = address_parse::multiaddr_to_address(&multiaddr)
2699 .ok()
2700 .filter(|addr| {
2701 task.platform.supports_connection_type(match &addr {
2702 address_parse::AddressOrMultiStreamAddress::Address(addr) => {
2703 From::from(addr)
2704 }
2705 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2706 addr,
2707 ) => From::from(addr),
2708 })
2709 });
2710
2711 let Some(address) = address else {
2712 let _was_in = task
2714 .peering_strategy
2715 .decrease_address_connections_and_remove_if_zero(
2716 &expected_peer_id,
2717 multiaddr.as_ref(),
2718 );
2719 debug_assert!(_was_in.is_ok());
2720 continue;
2721 };
2722
2723 let noise_key = {
2725 let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
2726 task.platform.fill_random_bytes(&mut *noise_static_key);
2727 let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
2728 task.platform.fill_random_bytes(&mut *libp2p_key);
2729 connection::NoiseKey::new(&libp2p_key, &noise_static_key)
2730 };
2731
2732 log!(
2733 &task.platform,
2734 Debug,
2735 "network",
2736 "connection-started",
2737 expected_peer_id,
2738 remote_addr = multiaddr,
2739 local_peer_id =
2740 peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
2741 .into_peer_id(),
2742 );
2743
2744 task.num_recent_connection_opening += 1;
2745
2746 let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
2747 async_channel::bounded(8);
2748 let task_name = format!("connection-{}", multiaddr);
2749
2750 match address {
2751 address_parse::AddressOrMultiStreamAddress::Address(address) => {
2752 let connection = task.platform.connect_stream(address).await;
2755
2756 let (connection_id, connection_task) =
2757 task.network.add_single_stream_connection(
2758 task.platform.now(),
2759 service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2760 is_initiator: true,
2761 noise_key: &noise_key,
2762 },
2763 multiaddr.clone().into_bytes(),
2764 Some(expected_peer_id.clone()),
2765 coordinator_to_connection_tx,
2766 );
2767
2768 task.platform.spawn_task(
2769 task_name.into(),
2770 tasks::single_stream_connection_task::<TPlat>(
2771 connection,
2772 multiaddr.to_string(),
2773 task.platform.clone(),
2774 connection_id,
2775 connection_task,
2776 coordinator_to_connection_rx,
2777 task.tasks_messages_tx.clone(),
2778 ),
2779 );
2780 }
2781 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2782 platform::MultiStreamAddress::WebRtc {
2783 ip,
2784 port,
2785 remote_certificate_sha256,
2786 },
2787 ) => {
2788 let connection = task
2793 .platform
2794 .connect_multistream(platform::MultiStreamAddress::WebRtc {
2795 ip,
2796 port,
2797 remote_certificate_sha256,
2798 })
2799 .await;
2800
2801 let local_tls_certificate_multihash = [18u8, 32]
2803 .into_iter()
2804 .chain(connection.local_tls_certificate_sha256.into_iter())
2805 .collect();
2806 let remote_tls_certificate_multihash = [18u8, 32]
2807 .into_iter()
2808 .chain(remote_certificate_sha256.iter().copied())
2809 .collect();
2810
2811 let (connection_id, connection_task) =
2812 task.network.add_multi_stream_connection(
2813 task.platform.now(),
2814 service::MultiStreamHandshakeKind::WebRtc {
2815 is_initiator: true,
2816 local_tls_certificate_multihash,
2817 remote_tls_certificate_multihash,
2818 noise_key: &noise_key,
2819 },
2820 multiaddr.clone().into_bytes(),
2821 Some(expected_peer_id.clone()),
2822 coordinator_to_connection_tx,
2823 );
2824
2825 task.platform.spawn_task(
2826 task_name.into(),
2827 tasks::webrtc_multi_stream_connection_task::<TPlat>(
2828 connection.connection,
2829 multiaddr.to_string(),
2830 task.platform.clone(),
2831 connection_id,
2832 connection_task,
2833 coordinator_to_connection_rx,
2834 task.tasks_messages_tx.clone(),
2835 ),
2836 );
2837 }
2838 }
2839 }
2840 WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2841 task.network
2842 .gossip_open(
2843 chain_id,
2844 &peer_id,
2845 service::GossipKind::ConsensusTransactions,
2846 )
2847 .unwrap();
2848
2849 log!(
2850 &task.platform,
2851 Debug,
2852 "network",
2853 "gossip-open-start",
2854 chain = &task.network[chain_id].log_name,
2855 peer_id,
2856 );
2857 }
2858 WakeUpReason::MessageToConnection {
2859 connection_id,
2860 message,
2861 } => {
2862 let _send_result = task.network[connection_id].send(message).await;
2870 debug_assert!(_send_result.is_ok());
2871 }
2872 }
2873 }
2874}