1use crate::{
44 log,
45 platform::{self, PlatformRef, address_parse},
46};
47
48use alloc::{
49 borrow::ToOwned as _,
50 boxed::Box,
51 collections::BTreeMap,
52 format,
53 string::{String, ToString as _},
54 sync::Arc,
55 vec::{self, Vec},
56};
57use core::{cmp, mem, num::NonZero, pin::Pin, time::Duration};
58use futures_channel::oneshot;
59use futures_lite::FutureExt as _;
60use futures_util::{StreamExt as _, future, stream};
61use hashbrown::{HashMap, HashSet};
62use rand::seq::IteratorRandom as _;
63use rand_chacha::rand_core::SeedableRng as _;
64use smoldot::{
65 header,
66 informant::{BytesDisplay, HashDisplay},
67 libp2p::{
68 connection,
69 multiaddr::{self, Multiaddr},
70 peer_id,
71 },
72 network::{basic_peering_strategy, codec, service},
73};
74
75pub use codec::{CallProofRequestConfig, Role};
76pub use service::{ChainId, EncodedMerkleProof, PeerId, QueueNotificationError};
77
78mod tasks;
79
80pub struct Config<TPlat> {
82 pub platform: TPlat,
84
85 pub identify_agent_version: String,
87
88 pub chains_capacity: usize,
90
91 pub connections_open_pool_size: u32,
95
96 pub connections_open_pool_restore_delay: Duration,
101}
102
103pub struct ConfigChain {
109 pub log_name: String,
111
112 pub num_out_slots: usize,
115
116 pub genesis_block_hash: [u8; 32],
122
123 pub best_block: (u64, [u8; 32]),
126
127 pub fork_id: Option<String>,
130
131 pub block_number_bytes: usize,
133
134 pub grandpa_protocol_finalized_block_height: Option<u64>,
137}
138
139pub struct NetworkService<TPlat: PlatformRef> {
140 messages_tx: async_channel::Sender<ToBackground<TPlat>>,
142
143 platform: TPlat,
145}
146
147impl<TPlat: PlatformRef> NetworkService<TPlat> {
148 pub fn new(config: Config<TPlat>) -> Arc<Self> {
150 let (main_messages_tx, main_messages_rx) = async_channel::bounded(4);
151
152 let network = service::ChainNetwork::new(service::Config {
153 chains_capacity: config.chains_capacity,
154 connections_capacity: 32,
155 handshake_timeout: Duration::from_secs(8),
156 randomness_seed: {
157 let mut seed = [0; 32];
158 config.platform.fill_random_bytes(&mut seed);
159 seed
160 },
161 });
162
163 let (tasks_messages_tx, tasks_messages_rx) = async_channel::bounded(32);
165 let task = Box::pin(background_task(BackgroundTask {
166 randomness: rand_chacha::ChaCha20Rng::from_seed({
167 let mut seed = [0; 32];
168 config.platform.fill_random_bytes(&mut seed);
169 seed
170 }),
171 identify_agent_version: config.identify_agent_version,
172 tasks_messages_tx,
173 tasks_messages_rx: Box::pin(tasks_messages_rx),
174 peering_strategy: basic_peering_strategy::BasicPeeringStrategy::new(
175 basic_peering_strategy::Config {
176 randomness_seed: {
177 let mut seed = [0; 32];
178 config.platform.fill_random_bytes(&mut seed);
179 seed
180 },
181 peers_capacity: 50, chains_capacity: config.chains_capacity,
183 },
184 ),
185 network,
186 connections_open_pool_size: config.connections_open_pool_size,
187 connections_open_pool_restore_delay: config.connections_open_pool_restore_delay,
188 num_recent_connection_opening: 0,
189 next_recent_connection_restore: None,
190 platform: config.platform.clone(),
191 open_gossip_links: BTreeMap::new(),
192 event_pending_send: None,
193 event_senders: either::Left(Vec::new()),
194 pending_new_subscriptions: Vec::new(),
195 important_nodes: HashSet::with_capacity_and_hasher(16, Default::default()),
196 main_messages_rx: Box::pin(main_messages_rx),
197 messages_rx: stream::SelectAll::new(),
198 blocks_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
199 grandpa_warp_sync_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
200 storage_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
201 call_proof_requests: HashMap::with_capacity_and_hasher(8, Default::default()),
202 chains_by_next_discovery: BTreeMap::new(),
203 }));
204
205 config.platform.spawn_task("network-service".into(), {
206 let platform = config.platform.clone();
207 async move {
208 task.await;
209 log!(&platform, Debug, "network", "shutdown");
210 }
211 });
212
213 Arc::new(NetworkService {
214 messages_tx: main_messages_tx,
215 platform: config.platform,
216 })
217 }
218
219 pub fn add_chain(&self, config: ConfigChain) -> Arc<NetworkServiceChain<TPlat>> {
225 let (messages_tx, messages_rx) = async_channel::bounded(32);
226
227 self.platform.spawn_task("add-chain-message-send".into(), {
229 let config = service::ChainConfig {
230 grandpa_protocol_config: config.grandpa_protocol_finalized_block_height.map(
231 |commit_finalized_height| service::GrandpaState {
232 commit_finalized_height,
233 round_number: 1,
234 set_id: 0,
235 },
236 ),
237 fork_id: config.fork_id.clone(),
238 block_number_bytes: config.block_number_bytes,
239 best_hash: config.best_block.1,
240 best_number: config.best_block.0,
241 genesis_hash: config.genesis_block_hash,
242 role: Role::Light,
243 allow_inbound_block_requests: false,
244 user_data: Chain {
245 log_name: config.log_name,
246 block_number_bytes: config.block_number_bytes,
247 num_out_slots: config.num_out_slots,
248 num_references: NonZero::<usize>::new(1).unwrap(),
249 next_discovery_period: Duration::from_secs(2),
250 next_discovery_when: self.platform.now(),
251 },
252 };
253
254 let messages_tx = self.messages_tx.clone();
255 async move {
256 let _ = messages_tx
257 .send(ToBackground::AddChain {
258 messages_rx,
259 config,
260 })
261 .await;
262 }
263 });
264
265 Arc::new(NetworkServiceChain {
266 _keep_alive_messages_tx: self.messages_tx.clone(),
267 messages_tx,
268 marker: core::marker::PhantomData,
269 })
270 }
271}
272
273pub struct NetworkServiceChain<TPlat: PlatformRef> {
274 _keep_alive_messages_tx: async_channel::Sender<ToBackground<TPlat>>,
277
278 messages_tx: async_channel::Sender<ToBackgroundChain>,
280
281 marker: core::marker::PhantomData<TPlat>,
283}
284
285#[derive(Debug, Copy, Clone, PartialEq, Eq)]
287pub enum BanSeverity {
288 Low,
289 High,
290}
291
292impl<TPlat: PlatformRef> NetworkServiceChain<TPlat> {
293 pub async fn subscribe(&self) -> async_channel::Receiver<Event> {
314 let (tx, rx) = async_channel::bounded(128);
315
316 self.messages_tx
317 .send(ToBackgroundChain::Subscribe { sender: tx })
318 .await
319 .unwrap();
320
321 rx
322 }
323
324 pub async fn ban_and_disconnect(
336 &self,
337 peer_id: PeerId,
338 severity: BanSeverity,
339 reason: &'static str,
340 ) {
341 let _ = self
342 .messages_tx
343 .send(ToBackgroundChain::DisconnectAndBan {
344 peer_id,
345 severity,
346 reason,
347 })
348 .await;
349 }
350
351 pub async fn blocks_request(
354 self: Arc<Self>,
355 target: PeerId,
356 config: codec::BlocksRequestConfig,
357 timeout: Duration,
358 ) -> Result<Vec<codec::BlockData>, BlocksRequestError> {
359 let (tx, rx) = oneshot::channel();
360
361 self.messages_tx
362 .send(ToBackgroundChain::StartBlocksRequest {
363 target: target.clone(),
364 config,
365 timeout,
366 result: tx,
367 })
368 .await
369 .unwrap();
370
371 rx.await.unwrap()
372 }
373
374 pub async fn grandpa_warp_sync_request(
377 self: Arc<Self>,
378 target: PeerId,
379 begin_hash: [u8; 32],
380 timeout: Duration,
381 ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> {
382 let (tx, rx) = oneshot::channel();
383
384 self.messages_tx
385 .send(ToBackgroundChain::StartWarpSyncRequest {
386 target: target.clone(),
387 begin_hash,
388 timeout,
389 result: tx,
390 })
391 .await
392 .unwrap();
393
394 rx.await.unwrap()
395 }
396
397 pub async fn set_local_best_block(&self, best_hash: [u8; 32], best_number: u64) {
398 self.messages_tx
399 .send(ToBackgroundChain::SetLocalBestBlock {
400 best_hash,
401 best_number,
402 })
403 .await
404 .unwrap();
405 }
406
407 pub async fn set_local_grandpa_state(&self, grandpa_state: service::GrandpaState) {
408 self.messages_tx
409 .send(ToBackgroundChain::SetLocalGrandpaState { grandpa_state })
410 .await
411 .unwrap();
412 }
413
414 pub async fn storage_proof_request(
417 self: Arc<Self>,
418 target: PeerId, config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>,
420 timeout: Duration,
421 ) -> Result<service::EncodedMerkleProof, StorageProofRequestError> {
422 let (tx, rx) = oneshot::channel();
423
424 self.messages_tx
425 .send(ToBackgroundChain::StartStorageProofRequest {
426 target: target.clone(),
427 config: codec::StorageProofRequestConfig {
428 block_hash: config.block_hash,
429 keys: config
430 .keys
431 .map(|key| key.as_ref().to_vec()) .collect::<Vec<_>>()
433 .into_iter(),
434 },
435 timeout,
436 result: tx,
437 })
438 .await
439 .unwrap();
440
441 rx.await.unwrap()
442 }
443
444 pub async fn call_proof_request(
449 self: Arc<Self>,
450 target: PeerId, config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>,
452 timeout: Duration,
453 ) -> Result<EncodedMerkleProof, CallProofRequestError> {
454 let (tx, rx) = oneshot::channel();
455
456 self.messages_tx
457 .send(ToBackgroundChain::StartCallProofRequest {
458 target: target.clone(),
459 config: codec::CallProofRequestConfig {
460 block_hash: config.block_hash,
461 method: config.method.into_owned().into(),
462 parameter_vectored: config
463 .parameter_vectored
464 .map(|v| v.as_ref().to_vec()) .collect::<Vec<_>>()
466 .into_iter(),
467 },
468 timeout,
469 result: tx,
470 })
471 .await
472 .unwrap();
473
474 rx.await.unwrap()
475 }
476
477 pub async fn announce_transaction(self: Arc<Self>, transaction: &[u8]) -> Vec<PeerId> {
487 let (tx, rx) = oneshot::channel();
488
489 self.messages_tx
490 .send(ToBackgroundChain::AnnounceTransaction {
491 transaction: transaction.to_vec(), result: tx,
493 })
494 .await
495 .unwrap();
496
497 rx.await.unwrap()
498 }
499
500 pub async fn send_block_announce(
502 self: Arc<Self>,
503 target: &PeerId,
504 scale_encoded_header: &[u8],
505 is_best: bool,
506 ) -> Result<(), QueueNotificationError> {
507 let (tx, rx) = oneshot::channel();
508
509 self.messages_tx
510 .send(ToBackgroundChain::SendBlockAnnounce {
511 target: target.clone(), scale_encoded_header: scale_encoded_header.to_vec(), is_best,
514 result: tx,
515 })
516 .await
517 .unwrap();
518
519 rx.await.unwrap()
520 }
521
522 pub async fn discover(
528 &self,
529 list: impl IntoIterator<Item = (PeerId, impl IntoIterator<Item = Multiaddr>)>,
530 important_nodes: bool,
531 ) {
532 self.messages_tx
533 .send(ToBackgroundChain::Discover {
534 list: list
536 .into_iter()
537 .map(|(peer_id, addrs)| {
538 (peer_id, addrs.into_iter().collect::<Vec<_>>().into_iter())
539 })
540 .collect::<Vec<_>>()
541 .into_iter(),
542 important_nodes,
543 })
544 .await
545 .unwrap();
546 }
547
548 pub async fn discovered_nodes(
555 &self,
556 ) -> impl Iterator<Item = (PeerId, impl Iterator<Item = Multiaddr>)> {
557 let (tx, rx) = oneshot::channel();
558
559 self.messages_tx
560 .send(ToBackgroundChain::DiscoveredNodes { result: tx })
561 .await
562 .unwrap();
563
564 rx.await
565 .unwrap()
566 .into_iter()
567 .map(|(peer_id, addrs)| (peer_id, addrs.into_iter()))
568 }
569
570 pub async fn peers_list(&self) -> impl Iterator<Item = PeerId> {
573 let (tx, rx) = oneshot::channel();
574 self.messages_tx
575 .send(ToBackgroundChain::PeersList { result: tx })
576 .await
577 .unwrap();
578 rx.await.unwrap().into_iter()
579 }
580}
581
582#[derive(Debug, Clone)]
584pub enum Event {
585 Connected {
586 peer_id: PeerId,
587 role: Role,
588 best_block_number: u64,
589 best_block_hash: [u8; 32],
590 },
591 Disconnected {
592 peer_id: PeerId,
593 },
594 BlockAnnounce {
595 peer_id: PeerId,
596 announce: service::EncodedBlockAnnounce,
597 },
598 GrandpaNeighborPacket {
599 peer_id: PeerId,
600 finalized_block_height: u64,
601 },
602 GrandpaCommitMessage {
604 peer_id: PeerId,
605 message: service::EncodedGrandpaCommitMessage,
606 },
607}
608
609#[derive(Debug, derive_more::Display, derive_more::Error)]
611pub enum BlocksRequestError {
612 NoConnection,
614 #[display("{_0}")]
616 Request(service::BlocksRequestError),
617}
618
619#[derive(Debug, derive_more::Display, derive_more::Error)]
621pub enum WarpSyncRequestError {
622 NoConnection,
624 #[display("{_0}")]
626 Request(service::GrandpaWarpSyncRequestError),
627}
628
629#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
631pub enum StorageProofRequestError {
632 NoConnection,
634 RequestTooLarge,
636 #[display("{_0}")]
638 Request(service::StorageProofRequestError),
639}
640
641#[derive(Debug, derive_more::Display, derive_more::Error, Clone)]
643pub enum CallProofRequestError {
644 NoConnection,
646 RequestTooLarge,
648 #[display("{_0}")]
650 Request(service::CallProofRequestError),
651}
652
653impl CallProofRequestError {
654 pub fn is_network_problem(&self) -> bool {
657 match self {
658 CallProofRequestError::Request(err) => err.is_network_problem(),
659 CallProofRequestError::RequestTooLarge => false,
660 CallProofRequestError::NoConnection => true,
661 }
662 }
663}
664
665enum ToBackground<TPlat: PlatformRef> {
666 AddChain {
667 messages_rx: async_channel::Receiver<ToBackgroundChain>,
668 config: service::ChainConfig<Chain<TPlat>>,
669 },
670}
671
672enum ToBackgroundChain {
673 RemoveChain,
674 Subscribe {
675 sender: async_channel::Sender<Event>,
676 },
677 DisconnectAndBan {
678 peer_id: PeerId,
679 severity: BanSeverity,
680 reason: &'static str,
681 },
682 StartBlocksRequest {
684 target: PeerId, config: codec::BlocksRequestConfig,
686 timeout: Duration,
687 result: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
688 },
689 StartWarpSyncRequest {
691 target: PeerId,
692 begin_hash: [u8; 32],
693 timeout: Duration,
694 result:
695 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
696 },
697 StartStorageProofRequest {
699 target: PeerId,
700 config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>,
701 timeout: Duration,
702 result: oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
703 },
704 StartCallProofRequest {
706 target: PeerId, config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>,
708 timeout: Duration,
709 result: oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
710 },
711 SetLocalBestBlock {
712 best_hash: [u8; 32],
713 best_number: u64,
714 },
715 SetLocalGrandpaState {
716 grandpa_state: service::GrandpaState,
717 },
718 AnnounceTransaction {
719 transaction: Vec<u8>,
720 result: oneshot::Sender<Vec<PeerId>>,
721 },
722 SendBlockAnnounce {
723 target: PeerId,
724 scale_encoded_header: Vec<u8>,
725 is_best: bool,
726 result: oneshot::Sender<Result<(), QueueNotificationError>>,
727 },
728 Discover {
729 list: vec::IntoIter<(PeerId, vec::IntoIter<Multiaddr>)>,
730 important_nodes: bool,
731 },
732 DiscoveredNodes {
733 result: oneshot::Sender<Vec<(PeerId, Vec<Multiaddr>)>>,
734 },
735 PeersList {
736 result: oneshot::Sender<Vec<PeerId>>,
737 },
738}
739
740struct BackgroundTask<TPlat: PlatformRef> {
741 platform: TPlat,
743
744 randomness: rand_chacha::ChaCha20Rng,
746
747 identify_agent_version: String,
749
750 tasks_messages_tx:
752 async_channel::Sender<(service::ConnectionId, service::ConnectionToCoordinator)>,
753
754 tasks_messages_rx: Pin<
756 Box<async_channel::Receiver<(service::ConnectionId, service::ConnectionToCoordinator)>>,
757 >,
758
759 network: service::ChainNetwork<
761 Chain<TPlat>,
762 async_channel::Sender<service::CoordinatorToConnection>,
763 TPlat::Instant,
764 >,
765
766 peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, TPlat::Instant>,
768
769 connections_open_pool_size: u32,
771
772 connections_open_pool_restore_delay: Duration,
774
775 num_recent_connection_opening: u32,
779
780 next_recent_connection_restore: Option<Pin<Box<TPlat::Delay>>>,
782
783 open_gossip_links: BTreeMap<(ChainId, PeerId), OpenGossipLinkState>,
786
787 important_nodes: HashSet<PeerId, fnv::FnvBuildHasher>,
790
791 event_pending_send: Option<(ChainId, Event)>,
793
794 event_senders: either::Either<
800 Vec<(ChainId, async_channel::Sender<Event>)>,
801 Pin<Box<dyn Future<Output = Vec<(ChainId, async_channel::Sender<Event>)>> + Send>>,
802 >,
803
804 pending_new_subscriptions: Vec<(ChainId, async_channel::Sender<Event>)>,
807
808 main_messages_rx: Pin<Box<async_channel::Receiver<ToBackground<TPlat>>>>,
809
810 messages_rx:
811 stream::SelectAll<Pin<Box<dyn stream::Stream<Item = (ChainId, ToBackgroundChain)> + Send>>>,
812
813 blocks_requests: HashMap<
814 service::SubstreamId,
815 oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>,
816 fnv::FnvBuildHasher,
817 >,
818
819 grandpa_warp_sync_requests: HashMap<
820 service::SubstreamId,
821 oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>,
822 fnv::FnvBuildHasher,
823 >,
824
825 storage_proof_requests: HashMap<
826 service::SubstreamId,
827 oneshot::Sender<Result<service::EncodedMerkleProof, StorageProofRequestError>>,
828 fnv::FnvBuildHasher,
829 >,
830
831 call_proof_requests: HashMap<
832 service::SubstreamId,
833 oneshot::Sender<Result<service::EncodedMerkleProof, CallProofRequestError>>,
834 fnv::FnvBuildHasher,
835 >,
836
837 chains_by_next_discovery: BTreeMap<(TPlat::Instant, ChainId), Pin<Box<TPlat::Delay>>>,
839}
840
841struct Chain<TPlat: PlatformRef> {
842 log_name: String,
843
844 num_references: NonZero<usize>,
846
847 block_number_bytes: usize,
850
851 num_out_slots: usize,
853
854 next_discovery_when: TPlat::Instant,
856
857 next_discovery_period: Duration,
860}
861
862#[derive(Clone)]
863struct OpenGossipLinkState {
864 role: Role,
865 best_block_number: u64,
866 best_block_hash: [u8; 32],
867 finalized_block_height: Option<u64>,
869}
870
871async fn background_task<TPlat: PlatformRef>(mut task: BackgroundTask<TPlat>) {
872 loop {
873 futures_lite::future::yield_now().await;
875
876 enum WakeUpReason<TPlat: PlatformRef> {
877 ForegroundClosed,
878 Message(ToBackground<TPlat>),
879 MessageForChain(ChainId, ToBackgroundChain),
880 NetworkEvent(service::Event<async_channel::Sender<service::CoordinatorToConnection>>),
881 CanAssignSlot(PeerId, ChainId),
882 NextRecentConnectionRestore,
883 CanStartConnect(PeerId),
884 CanOpenGossip(PeerId, ChainId),
885 MessageFromConnection {
886 connection_id: service::ConnectionId,
887 message: service::ConnectionToCoordinator,
888 },
889 MessageToConnection {
890 connection_id: service::ConnectionId,
891 message: service::CoordinatorToConnection,
892 },
893 EventSendersReady,
894 StartDiscovery(ChainId),
895 }
896
897 let wake_up_reason = {
898 let message_received = async {
899 task.main_messages_rx
900 .next()
901 .await
902 .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message)
903 };
904 let message_for_chain_received = async {
905 let Some((chain_id, message)) = task.messages_rx.next().await else {
910 future::pending().await
911 };
912 WakeUpReason::MessageForChain(chain_id, message)
913 };
914 let message_from_task_received = async {
915 let (connection_id, message) = task.tasks_messages_rx.next().await.unwrap();
916 WakeUpReason::MessageFromConnection {
917 connection_id,
918 message,
919 }
920 };
921 let service_event = async {
922 if let Some(event) = (task.event_pending_send.is_none()
923 && task.pending_new_subscriptions.is_empty())
924 .then(|| task.network.next_event())
925 .flatten()
926 {
927 WakeUpReason::NetworkEvent(event)
928 } else if let Some(start_connect) = {
929 let x = (task.num_recent_connection_opening < task.connections_open_pool_size)
930 .then(|| {
931 task.network
932 .unconnected_desired()
933 .choose(&mut task.randomness)
934 .cloned()
935 })
936 .flatten();
937 x
938 } {
939 WakeUpReason::CanStartConnect(start_connect)
940 } else if let Some((peer_id, chain_id)) = {
941 let x = task
942 .network
943 .connected_unopened_gossip_desired()
944 .choose(&mut task.randomness)
945 .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id));
946 x
947 } {
948 WakeUpReason::CanOpenGossip(peer_id, chain_id)
949 } else if let Some((connection_id, message)) =
950 task.network.pull_message_to_connection()
951 {
952 WakeUpReason::MessageToConnection {
953 connection_id,
954 message,
955 }
956 } else {
957 'search: loop {
958 let mut earlier_unban = None;
959
960 for chain_id in task.network.chains().collect::<Vec<_>>() {
961 if task.network.gossip_desired_num(
962 chain_id,
963 service::GossipKind::ConsensusTransactions,
964 ) >= task.network[chain_id].num_out_slots
965 {
966 continue;
967 }
968
969 match task
970 .peering_strategy
971 .pick_assignable_peer(&chain_id, &task.platform.now())
972 {
973 basic_peering_strategy::AssignablePeer::Assignable(peer_id) => {
974 break 'search WakeUpReason::CanAssignSlot(
975 peer_id.clone(),
976 chain_id,
977 );
978 }
979 basic_peering_strategy::AssignablePeer::AllPeersBanned {
980 next_unban,
981 } => {
982 if earlier_unban.as_ref().map_or(true, |b| b > next_unban) {
983 earlier_unban = Some(next_unban.clone());
984 }
985 }
986 basic_peering_strategy::AssignablePeer::NoPeer => continue,
987 }
988 }
989
990 if let Some(earlier_unban) = earlier_unban {
991 task.platform.sleep_until(earlier_unban).await;
992 } else {
993 future::pending::<()>().await;
994 }
995 }
996 }
997 };
998 let next_recent_connection_restore = async {
999 if task.num_recent_connection_opening != 0
1000 && task.next_recent_connection_restore.is_none()
1001 {
1002 task.next_recent_connection_restore = Some(Box::pin(
1003 task.platform
1004 .sleep(task.connections_open_pool_restore_delay),
1005 ));
1006 }
1007 if let Some(delay) = task.next_recent_connection_restore.as_mut() {
1008 delay.await;
1009 task.next_recent_connection_restore = None;
1010 WakeUpReason::NextRecentConnectionRestore
1011 } else {
1012 future::pending().await
1013 }
1014 };
1015 let finished_sending_event = async {
1016 if let either::Right(event_sending_future) = &mut task.event_senders {
1017 let event_senders = event_sending_future.await;
1018 task.event_senders = either::Left(event_senders);
1019 WakeUpReason::EventSendersReady
1020 } else if task.event_pending_send.is_some()
1021 || !task.pending_new_subscriptions.is_empty()
1022 {
1023 WakeUpReason::EventSendersReady
1024 } else {
1025 future::pending().await
1026 }
1027 };
1028 let start_discovery = async {
1029 let Some(mut next_discovery) = task.chains_by_next_discovery.first_entry() else {
1030 future::pending().await
1031 };
1032 next_discovery.get_mut().await;
1033 let ((_, chain_id), _) = next_discovery.remove_entry();
1034 WakeUpReason::StartDiscovery(chain_id)
1035 };
1036
1037 message_for_chain_received
1038 .or(message_received)
1039 .or(message_from_task_received)
1040 .or(service_event)
1041 .or(next_recent_connection_restore)
1042 .or(finished_sending_event)
1043 .or(start_discovery)
1044 .await
1045 };
1046
1047 match wake_up_reason {
1048 WakeUpReason::ForegroundClosed => {
1049 return;
1051 }
1052 WakeUpReason::Message(ToBackground::AddChain {
1053 messages_rx,
1054 config,
1055 }) => {
1056 let chain_id = match task.network.add_chain(config) {
1058 Ok(id) => id,
1059 Err(service::AddChainError::Duplicate { existing_identical }) => {
1060 task.network[existing_identical].num_references = task.network
1061 [existing_identical]
1062 .num_references
1063 .checked_add(1)
1064 .unwrap();
1065 existing_identical
1066 }
1067 };
1068
1069 task.chains_by_next_discovery.insert(
1070 (task.network[chain_id].next_discovery_when.clone(), chain_id),
1071 Box::pin(
1072 task.platform
1073 .sleep_until(task.network[chain_id].next_discovery_when.clone()),
1074 ),
1075 );
1076
1077 task.messages_rx
1078 .push(Box::pin(
1079 messages_rx
1080 .map(move |msg| (chain_id, msg))
1081 .chain(stream::once(future::ready((
1082 chain_id,
1083 ToBackgroundChain::RemoveChain,
1084 )))),
1085 ) as Pin<Box<_>>);
1086
1087 log!(
1088 &task.platform,
1089 Debug,
1090 "network",
1091 "chain-added",
1092 id = task.network[chain_id].log_name
1093 );
1094 }
1095 WakeUpReason::EventSendersReady => {
1096 let either::Left(event_senders) = &mut task.event_senders else {
1100 unreachable!()
1101 };
1102
1103 if let Some((event_to_dispatch_chain_id, event_to_dispatch)) =
1104 task.event_pending_send.take()
1105 {
1106 let mut event_senders = mem::take(event_senders);
1107 task.event_senders = either::Right(Box::pin(async move {
1108 for index in (0..event_senders.len()).rev() {
1111 let (event_sender_chain_id, event_sender) =
1112 event_senders.swap_remove(index);
1113 if event_sender_chain_id == event_to_dispatch_chain_id {
1114 if event_sender.send(event_to_dispatch.clone()).await.is_err() {
1115 continue;
1116 }
1117 }
1118 event_senders.push((event_sender_chain_id, event_sender));
1119 }
1120 event_senders
1121 }));
1122 } else if !task.pending_new_subscriptions.is_empty() {
1123 let pending_new_subscriptions = mem::take(&mut task.pending_new_subscriptions);
1124 let mut event_senders = mem::take(event_senders);
1125 let open_gossip_links = task.open_gossip_links.clone();
1127 task.event_senders = either::Right(Box::pin(async move {
1128 for (chain_id, new_subscription) in pending_new_subscriptions {
1129 for ((link_chain_id, peer_id), state) in &open_gossip_links {
1130 if *link_chain_id != chain_id {
1132 continue;
1133 }
1134
1135 let _ = new_subscription
1136 .send(Event::Connected {
1137 peer_id: peer_id.clone(),
1138 role: state.role,
1139 best_block_number: state.best_block_number,
1140 best_block_hash: state.best_block_hash,
1141 })
1142 .await;
1143
1144 if let Some(finalized_block_height) = state.finalized_block_height {
1145 let _ = new_subscription
1146 .send(Event::GrandpaNeighborPacket {
1147 peer_id: peer_id.clone(),
1148 finalized_block_height,
1149 })
1150 .await;
1151 }
1152 }
1153
1154 event_senders.push((chain_id, new_subscription));
1155 }
1156
1157 event_senders
1158 }));
1159 }
1160 }
1161 WakeUpReason::MessageFromConnection {
1162 connection_id,
1163 message,
1164 } => {
1165 task.network
1166 .inject_connection_message(connection_id, message);
1167 }
1168 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::RemoveChain) => {
1169 if let Some(new_ref) =
1170 NonZero::<usize>::new(task.network[chain_id].num_references.get() - 1)
1171 {
1172 task.network[chain_id].num_references = new_ref;
1173 continue;
1174 }
1175
1176 for peer_id in task
1177 .network
1178 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1179 .cloned()
1180 .collect::<Vec<_>>()
1181 {
1182 task.network
1183 .gossip_close(
1184 chain_id,
1185 &peer_id,
1186 service::GossipKind::ConsensusTransactions,
1187 )
1188 .unwrap();
1189
1190 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id));
1191 debug_assert!(_was_in.is_some());
1192 }
1193
1194 let _was_in = task
1195 .chains_by_next_discovery
1196 .remove(&(task.network[chain_id].next_discovery_when.clone(), chain_id));
1197 debug_assert!(_was_in.is_some());
1198
1199 log!(
1200 &task.platform,
1201 Debug,
1202 "network",
1203 "chain-removed",
1204 id = task.network[chain_id].log_name
1205 );
1206 task.network.remove_chain(chain_id).unwrap();
1207 task.peering_strategy.remove_chain_peers(&chain_id);
1208 }
1209 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::Subscribe { sender }) => {
1210 task.pending_new_subscriptions.push((chain_id, sender));
1211 }
1212 WakeUpReason::MessageForChain(
1213 chain_id,
1214 ToBackgroundChain::DisconnectAndBan {
1215 peer_id,
1216 severity,
1217 reason,
1218 },
1219 ) => {
1220 let ban_duration = Duration::from_secs(match severity {
1221 BanSeverity::Low => 10,
1222 BanSeverity::High => 40,
1223 });
1224
1225 let had_slot = matches!(
1226 task.peering_strategy.unassign_slot_and_ban(
1227 &chain_id,
1228 &peer_id,
1229 task.platform.now() + ban_duration,
1230 ),
1231 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1232 );
1233
1234 if had_slot {
1235 log!(
1236 &task.platform,
1237 Debug,
1238 "network",
1239 "slot-unassigned",
1240 chain = &task.network[chain_id].log_name,
1241 peer_id,
1242 ?ban_duration,
1243 reason = "user-ban",
1244 user_reason = reason
1245 );
1246 task.network.gossip_remove_desired(
1247 chain_id,
1248 &peer_id,
1249 service::GossipKind::ConsensusTransactions,
1250 );
1251 }
1252
1253 if task.network.gossip_is_connected(
1254 chain_id,
1255 &peer_id,
1256 service::GossipKind::ConsensusTransactions,
1257 ) {
1258 let _closed_result = task.network.gossip_close(
1259 chain_id,
1260 &peer_id,
1261 service::GossipKind::ConsensusTransactions,
1262 );
1263 debug_assert!(_closed_result.is_ok());
1264
1265 log!(
1266 &task.platform,
1267 Debug,
1268 "network",
1269 "gossip-closed",
1270 chain = &task.network[chain_id].log_name,
1271 peer_id,
1272 );
1273
1274 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1275 debug_assert!(_was_in.is_some());
1276
1277 debug_assert!(task.event_pending_send.is_none());
1278 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
1279 }
1280 }
1281 WakeUpReason::MessageForChain(
1282 chain_id,
1283 ToBackgroundChain::StartBlocksRequest {
1284 target,
1285 config,
1286 timeout,
1287 result,
1288 },
1289 ) => {
1290 match &config.start {
1291 codec::BlocksRequestConfigStart::Hash(hash) => {
1292 log!(
1293 &task.platform,
1294 Debug,
1295 "network",
1296 "blocks-request-started",
1297 chain = task.network[chain_id].log_name, target,
1298 start = HashDisplay(hash),
1299 num = config.desired_count.get(),
1300 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1301 header = ?config.fields.header, body = ?config.fields.body,
1302 justifications = ?config.fields.justifications
1303 );
1304 }
1305 codec::BlocksRequestConfigStart::Number(number) => {
1306 log!(
1307 &task.platform,
1308 Debug,
1309 "network",
1310 "blocks-request-started",
1311 chain = task.network[chain_id].log_name, target, start = number,
1312 num = config.desired_count.get(),
1313 descending = ?matches!(config.direction, codec::BlocksRequestDirection::Descending),
1314 header = ?config.fields.header, body = ?config.fields.body, justifications = ?config.fields.justifications
1315 );
1316 }
1317 }
1318
1319 match task
1320 .network
1321 .start_blocks_request(&target, chain_id, config.clone(), timeout)
1322 {
1323 Ok(substream_id) => {
1324 task.blocks_requests.insert(substream_id, result);
1325 }
1326 Err(service::StartRequestError::NoConnection) => {
1327 log!(
1328 &task.platform,
1329 Debug,
1330 "network",
1331 "blocks-request-error",
1332 chain = task.network[chain_id].log_name,
1333 target,
1334 error = "NoConnection"
1335 );
1336 let _ = result.send(Err(BlocksRequestError::NoConnection));
1337 }
1338 }
1339 }
1340 WakeUpReason::MessageForChain(
1341 chain_id,
1342 ToBackgroundChain::StartWarpSyncRequest {
1343 target,
1344 begin_hash,
1345 timeout,
1346 result,
1347 },
1348 ) => {
1349 log!(
1350 &task.platform,
1351 Debug,
1352 "network",
1353 "warp-sync-request-started",
1354 chain = task.network[chain_id].log_name,
1355 target,
1356 start = HashDisplay(&begin_hash)
1357 );
1358
1359 match task
1360 .network
1361 .start_grandpa_warp_sync_request(&target, chain_id, begin_hash, timeout)
1362 {
1363 Ok(substream_id) => {
1364 task.grandpa_warp_sync_requests.insert(substream_id, result);
1365 }
1366 Err(service::StartRequestError::NoConnection) => {
1367 log!(
1368 &task.platform,
1369 Debug,
1370 "network",
1371 "warp-sync-request-error",
1372 chain = task.network[chain_id].log_name,
1373 target,
1374 error = "NoConnection"
1375 );
1376 let _ = result.send(Err(WarpSyncRequestError::NoConnection));
1377 }
1378 }
1379 }
1380 WakeUpReason::MessageForChain(
1381 chain_id,
1382 ToBackgroundChain::StartStorageProofRequest {
1383 target,
1384 config,
1385 timeout,
1386 result,
1387 },
1388 ) => {
1389 log!(
1390 &task.platform,
1391 Debug,
1392 "network",
1393 "storage-proof-request-started",
1394 chain = task.network[chain_id].log_name,
1395 target,
1396 block_hash = HashDisplay(&config.block_hash)
1397 );
1398
1399 match task.network.start_storage_proof_request(
1400 &target,
1401 chain_id,
1402 config.clone(),
1403 timeout,
1404 ) {
1405 Ok(substream_id) => {
1406 task.storage_proof_requests.insert(substream_id, result);
1407 }
1408 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1409 log!(
1410 &task.platform,
1411 Debug,
1412 "network",
1413 "storage-proof-request-error",
1414 chain = task.network[chain_id].log_name,
1415 target,
1416 error = "NoConnection"
1417 );
1418 let _ = result.send(Err(StorageProofRequestError::NoConnection));
1419 }
1420 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1421 log!(
1422 &task.platform,
1423 Debug,
1424 "network",
1425 "storage-proof-request-error",
1426 chain = task.network[chain_id].log_name,
1427 target,
1428 error = "RequestTooLarge"
1429 );
1430 let _ = result.send(Err(StorageProofRequestError::RequestTooLarge));
1431 }
1432 };
1433 }
1434 WakeUpReason::MessageForChain(
1435 chain_id,
1436 ToBackgroundChain::StartCallProofRequest {
1437 target,
1438 config,
1439 timeout,
1440 result,
1441 },
1442 ) => {
1443 log!(
1444 &task.platform,
1445 Debug,
1446 "network",
1447 "call-proof-request-started",
1448 chain = task.network[chain_id].log_name,
1449 target,
1450 block_hash = HashDisplay(&config.block_hash),
1451 function = config.method
1452 );
1453 match task.network.start_call_proof_request(
1456 &target,
1457 chain_id,
1458 config.clone(),
1459 timeout,
1460 ) {
1461 Ok(substream_id) => {
1462 task.call_proof_requests.insert(substream_id, result);
1463 }
1464 Err(service::StartRequestMaybeTooLargeError::NoConnection) => {
1465 log!(
1466 &task.platform,
1467 Debug,
1468 "network",
1469 "call-proof-request-error",
1470 chain = task.network[chain_id].log_name,
1471 target,
1472 error = "NoConnection"
1473 );
1474 let _ = result.send(Err(CallProofRequestError::NoConnection));
1475 }
1476 Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => {
1477 log!(
1478 &task.platform,
1479 Debug,
1480 "network",
1481 "call-proof-request-error",
1482 chain = task.network[chain_id].log_name,
1483 target,
1484 error = "RequestTooLarge"
1485 );
1486 let _ = result.send(Err(CallProofRequestError::RequestTooLarge));
1487 }
1488 };
1489 }
1490 WakeUpReason::MessageForChain(
1491 chain_id,
1492 ToBackgroundChain::SetLocalBestBlock {
1493 best_hash,
1494 best_number,
1495 },
1496 ) => {
1497 task.network
1498 .set_chain_local_best_block(chain_id, best_hash, best_number);
1499 }
1500 WakeUpReason::MessageForChain(
1501 chain_id,
1502 ToBackgroundChain::SetLocalGrandpaState { grandpa_state },
1503 ) => {
1504 log!(
1505 &task.platform,
1506 Debug,
1507 "network",
1508 "local-grandpa-state-announced",
1509 chain = task.network[chain_id].log_name,
1510 set_id = grandpa_state.set_id,
1511 commit_finalized_height = grandpa_state.commit_finalized_height,
1512 );
1513
1514 task.network
1517 .gossip_broadcast_grandpa_state_and_update(chain_id, grandpa_state);
1518 }
1519 WakeUpReason::MessageForChain(
1520 chain_id,
1521 ToBackgroundChain::AnnounceTransaction {
1522 transaction,
1523 result,
1524 },
1525 ) => {
1526 let peers_to_send = task
1529 .network
1530 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1531 .cloned()
1532 .collect::<Vec<_>>();
1533
1534 let mut peers_sent = Vec::with_capacity(peers_to_send.len());
1535 let mut peers_queue_full = Vec::with_capacity(peers_to_send.len());
1536 for peer in &peers_to_send {
1537 match task
1538 .network
1539 .gossip_send_transaction(peer, chain_id, &transaction)
1540 {
1541 Ok(()) => peers_sent.push(peer.to_base58()),
1542 Err(QueueNotificationError::QueueFull) => {
1543 peers_queue_full.push(peer.to_base58())
1544 }
1545 Err(QueueNotificationError::NoConnection) => unreachable!(),
1546 }
1547 }
1548
1549 log!(
1550 &task.platform,
1551 Debug,
1552 "network",
1553 "transaction-announced",
1554 chain = task.network[chain_id].log_name,
1555 transaction =
1556 hex::encode(blake2_rfc::blake2b::blake2b(32, &[], &transaction).as_bytes()),
1557 size = transaction.len(),
1558 peers_sent = peers_sent.join(", "),
1559 peers_queue_full = peers_queue_full.join(", "),
1560 );
1561
1562 let _ = result.send(peers_to_send);
1563 }
1564 WakeUpReason::MessageForChain(
1565 chain_id,
1566 ToBackgroundChain::SendBlockAnnounce {
1567 target,
1568 scale_encoded_header,
1569 is_best,
1570 result,
1571 },
1572 ) => {
1573 let _ = result.send(task.network.gossip_send_block_announce(
1575 &target,
1576 chain_id,
1577 &scale_encoded_header,
1578 is_best,
1579 ));
1580 }
1581 WakeUpReason::MessageForChain(
1582 chain_id,
1583 ToBackgroundChain::Discover {
1584 list,
1585 important_nodes,
1586 },
1587 ) => {
1588 for (peer_id, addrs) in list {
1589 if important_nodes {
1590 task.important_nodes.insert(peer_id.clone());
1591 }
1592
1593 task.peering_strategy
1596 .insert_chain_peer(chain_id, peer_id.clone(), 30); for addr in addrs {
1599 let _ =
1600 task.peering_strategy
1601 .insert_address(&peer_id, addr.into_bytes(), 10);
1602 }
1604 }
1605 }
1606 WakeUpReason::MessageForChain(
1607 chain_id,
1608 ToBackgroundChain::DiscoveredNodes { result },
1609 ) => {
1610 let _ = result.send(
1612 task.peering_strategy
1613 .chain_peers_unordered(&chain_id)
1614 .map(|peer_id| {
1615 let addrs = task
1616 .peering_strategy
1617 .peer_addresses(peer_id)
1618 .map(|a| Multiaddr::from_bytes(a.to_owned()).unwrap())
1619 .collect::<Vec<_>>();
1620 (peer_id.clone(), addrs)
1621 })
1622 .collect::<Vec<_>>(),
1623 );
1624 }
1625 WakeUpReason::MessageForChain(chain_id, ToBackgroundChain::PeersList { result }) => {
1626 let _ = result.send(
1627 task.network
1628 .gossip_connected_peers(
1629 chain_id,
1630 service::GossipKind::ConsensusTransactions,
1631 )
1632 .cloned()
1633 .collect(),
1634 );
1635 }
1636 WakeUpReason::StartDiscovery(chain_id) => {
1637 let chain = &mut task.network[chain_id];
1639 chain.next_discovery_when = task.platform.now() + chain.next_discovery_period;
1640 chain.next_discovery_period =
1641 cmp::min(chain.next_discovery_period * 2, Duration::from_secs(120));
1642 task.chains_by_next_discovery.insert(
1643 (chain.next_discovery_when.clone(), chain_id),
1644 Box::pin(
1645 task.platform
1646 .sleep(task.network[chain_id].next_discovery_period),
1647 ),
1648 );
1649
1650 let random_peer_id = {
1651 let mut pub_key = [0; 32];
1652 rand_chacha::rand_core::RngCore::fill_bytes(&mut task.randomness, &mut pub_key);
1653 PeerId::from_public_key(&peer_id::PublicKey::Ed25519(pub_key))
1654 };
1655
1656 let target = task
1658 .network
1659 .gossip_connected_peers(chain_id, service::GossipKind::ConsensusTransactions)
1660 .next()
1661 .cloned();
1662
1663 if let Some(target) = target {
1664 match task.network.start_kademlia_find_node_request(
1665 &target,
1666 chain_id,
1667 &random_peer_id,
1668 Duration::from_secs(20),
1669 ) {
1670 Ok(_) => {}
1671 Err(service::StartRequestError::NoConnection) => unreachable!(),
1672 };
1673
1674 log!(
1675 &task.platform,
1676 Debug,
1677 "network",
1678 "discovery-find-node-started",
1679 chain = &task.network[chain_id].log_name,
1680 request_target = target,
1681 requested_peer_id = random_peer_id
1682 );
1683 } else {
1684 log!(
1685 &task.platform,
1686 Debug,
1687 "network",
1688 "discovery-skipped-no-peer",
1689 chain = &task.network[chain_id].log_name
1690 );
1691 }
1692 }
1693 WakeUpReason::NetworkEvent(service::Event::HandshakeFinished {
1694 peer_id,
1695 expected_peer_id,
1696 id,
1697 }) => {
1698 let remote_addr =
1699 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id)
1701 {
1702 log!(
1703 &task.platform,
1704 Debug,
1705 "network",
1706 "handshake-finished-peer-id-mismatch",
1707 remote_addr,
1708 expected_peer_id,
1709 actual_peer_id = peer_id
1710 );
1711
1712 let _was_in = task
1713 .peering_strategy
1714 .decrease_address_connections_and_remove_if_zero(
1715 expected_peer_id,
1716 remote_addr.as_ref(),
1717 );
1718 debug_assert!(_was_in.is_ok());
1719 let _ = task.peering_strategy.increase_address_connections(
1720 &peer_id,
1721 remote_addr.into_bytes().to_vec(),
1722 10,
1723 );
1724 } else {
1725 log!(
1726 &task.platform,
1727 Debug,
1728 "network",
1729 "handshake-finished",
1730 remote_addr,
1731 peer_id
1732 );
1733 }
1734 }
1735 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1736 expected_peer_id: Some(_),
1737 ..
1738 })
1739 | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => {
1740 let (address, peer_id, handshake_finished) = match wake_up_reason {
1741 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1742 address,
1743 expected_peer_id: Some(peer_id),
1744 ..
1745 }) => (address, peer_id, false),
1746 WakeUpReason::NetworkEvent(service::Event::Disconnected {
1747 address,
1748 peer_id,
1749 ..
1750 }) => (address, peer_id, true),
1751 _ => unreachable!(),
1752 };
1753
1754 task.peering_strategy
1755 .decrease_address_connections(&peer_id, &address)
1756 .unwrap();
1757 let address = Multiaddr::from_bytes(address).unwrap();
1758 log!(
1759 &task.platform,
1760 Debug,
1761 "network",
1762 "connection-shutdown",
1763 peer_id,
1764 address,
1765 ?handshake_finished
1766 );
1767
1768 let ban_duration = Duration::from_secs(5);
1776 task.network.gossip_remove_desired_all(
1777 &peer_id,
1778 service::GossipKind::ConsensusTransactions,
1779 );
1780 for (&chain_id, what_happened) in task
1781 .peering_strategy
1782 .unassign_slots_and_ban(&peer_id, task.platform.now() + ban_duration)
1783 {
1784 if matches!(
1785 what_happened,
1786 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
1787 ) {
1788 log!(
1789 &task.platform,
1790 Debug,
1791 "network",
1792 "slot-unassigned",
1793 chain = &task.network[chain_id].log_name,
1794 peer_id,
1795 ?ban_duration,
1796 reason = "pre-handshake-disconnect"
1797 );
1798 }
1799 }
1800 }
1801 WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected {
1802 expected_peer_id: None,
1803 ..
1804 }) => {
1805 debug_assert!(false);
1808 }
1809 WakeUpReason::NetworkEvent(service::Event::PingOutSuccess {
1810 id,
1811 peer_id,
1812 ping_time,
1813 }) => {
1814 let remote_addr =
1815 Multiaddr::from_bytes(task.network.connection_remote_addr(id)).unwrap(); log!(
1817 &task.platform,
1818 Debug,
1819 "network",
1820 "pong",
1821 peer_id,
1822 remote_addr,
1823 ?ping_time
1824 );
1825 }
1826 WakeUpReason::NetworkEvent(service::Event::BlockAnnounce {
1827 chain_id,
1828 peer_id,
1829 announce,
1830 }) => {
1831 log!(
1832 &task.platform,
1833 Debug,
1834 "network",
1835 "block-announce-received",
1836 chain = &task.network[chain_id].log_name,
1837 peer_id,
1838 block_hash = HashDisplay(&header::hash_from_scale_encoded_header(
1839 announce.decode().scale_encoded_header
1840 )),
1841 is_best = announce.decode().is_best
1842 );
1843
1844 let decoded_announce = announce.decode();
1845 if decoded_announce.is_best {
1846 let link = task
1847 .open_gossip_links
1848 .get_mut(&(chain_id, peer_id.clone()))
1849 .unwrap();
1850 if let Ok(decoded) = header::decode(
1851 decoded_announce.scale_encoded_header,
1852 task.network[chain_id].block_number_bytes,
1853 ) {
1854 link.best_block_hash = header::hash_from_scale_encoded_header(
1855 decoded_announce.scale_encoded_header,
1856 );
1857 link.best_block_number = decoded.number;
1858 }
1859 }
1860
1861 debug_assert!(task.event_pending_send.is_none());
1862 task.event_pending_send =
1863 Some((chain_id, Event::BlockAnnounce { peer_id, announce }));
1864 }
1865 WakeUpReason::NetworkEvent(service::Event::GossipConnected {
1866 peer_id,
1867 chain_id,
1868 role,
1869 best_number,
1870 best_hash,
1871 kind: service::GossipKind::ConsensusTransactions,
1872 }) => {
1873 log!(
1874 &task.platform,
1875 Debug,
1876 "network",
1877 "gossip-open-success",
1878 chain = &task.network[chain_id].log_name,
1879 peer_id,
1880 best_number,
1881 best_hash = HashDisplay(&best_hash)
1882 );
1883
1884 let _prev_value = task.open_gossip_links.insert(
1885 (chain_id, peer_id.clone()),
1886 OpenGossipLinkState {
1887 best_block_number: best_number,
1888 best_block_hash: best_hash,
1889 role,
1890 finalized_block_height: None,
1891 },
1892 );
1893 debug_assert!(_prev_value.is_none());
1894
1895 debug_assert!(task.event_pending_send.is_none());
1896 task.event_pending_send = Some((
1897 chain_id,
1898 Event::Connected {
1899 peer_id,
1900 role,
1901 best_block_number: best_number,
1902 best_block_hash: best_hash,
1903 },
1904 ));
1905 }
1906 WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed {
1907 peer_id,
1908 chain_id,
1909 error,
1910 kind: service::GossipKind::ConsensusTransactions,
1911 }) => {
1912 log!(
1913 &task.platform,
1914 Debug,
1915 "network",
1916 "gossip-open-error",
1917 chain = &task.network[chain_id].log_name,
1918 peer_id,
1919 ?error,
1920 );
1921 let ban_duration = Duration::from_secs(15);
1922
1923 let had_slot = if let service::GossipConnectError::GenesisMismatch { .. } = error {
1926 matches!(
1927 task.peering_strategy
1928 .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id),
1929 basic_peering_strategy::UnassignSlotAndRemoveChainPeer::HadSlot
1930 )
1931 } else {
1932 matches!(
1933 task.peering_strategy.unassign_slot_and_ban(
1934 &chain_id,
1935 &peer_id,
1936 task.platform.now() + ban_duration,
1937 ),
1938 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1939 )
1940 };
1941
1942 if had_slot {
1943 log!(
1944 &task.platform,
1945 Debug,
1946 "network",
1947 "slot-unassigned",
1948 chain = &task.network[chain_id].log_name,
1949 peer_id,
1950 ?ban_duration,
1951 reason = "gossip-open-failed"
1952 );
1953 task.network.gossip_remove_desired(
1954 chain_id,
1955 &peer_id,
1956 service::GossipKind::ConsensusTransactions,
1957 );
1958 }
1959 }
1960 WakeUpReason::NetworkEvent(service::Event::GossipDisconnected {
1961 peer_id,
1962 chain_id,
1963 kind: service::GossipKind::ConsensusTransactions,
1964 }) => {
1965 log!(
1966 &task.platform,
1967 Debug,
1968 "network",
1969 "gossip-closed",
1970 chain = &task.network[chain_id].log_name,
1971 peer_id,
1972 );
1973 let ban_duration = Duration::from_secs(10);
1974
1975 let _was_in = task.open_gossip_links.remove(&(chain_id, peer_id.clone()));
1976 debug_assert!(_was_in.is_some());
1977
1978 if matches!(
1981 task.peering_strategy.unassign_slot_and_ban(
1982 &chain_id,
1983 &peer_id,
1984 task.platform.now() + ban_duration,
1985 ),
1986 basic_peering_strategy::UnassignSlotAndBan::Banned { had_slot: true }
1987 ) {
1988 log!(
1989 &task.platform,
1990 Debug,
1991 "network",
1992 "slot-unassigned",
1993 chain = &task.network[chain_id].log_name,
1994 peer_id,
1995 ?ban_duration,
1996 reason = "gossip-closed"
1997 );
1998 task.network.gossip_remove_desired(
1999 chain_id,
2000 &peer_id,
2001 service::GossipKind::ConsensusTransactions,
2002 );
2003 }
2004
2005 debug_assert!(task.event_pending_send.is_none());
2006 task.event_pending_send = Some((chain_id, Event::Disconnected { peer_id }));
2007 }
2008 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2009 substream_id,
2010 peer_id,
2011 chain_id,
2012 response: service::RequestResult::Blocks(response),
2013 }) => {
2014 match &response {
2015 Ok(blocks) => {
2016 log!(
2017 &task.platform,
2018 Debug,
2019 "network",
2020 "blocks-request-success",
2021 chain = task.network[chain_id].log_name,
2022 target = peer_id,
2023 num_blocks = blocks.len(),
2024 block_data_total_size =
2025 BytesDisplay(blocks.iter().fold(0, |sum, block| {
2026 let block_size = block.header.as_ref().map_or(0, |h| h.len())
2027 + block
2028 .body
2029 .as_ref()
2030 .map_or(0, |b| b.iter().fold(0, |s, e| s + e.len()))
2031 + block
2032 .justifications
2033 .as_ref()
2034 .into_iter()
2035 .flat_map(|l| l.iter())
2036 .fold(0, |s, j| s + j.justification.len());
2037 sum + u64::try_from(block_size).unwrap()
2038 }))
2039 );
2040 }
2041 Err(error) => {
2042 log!(
2043 &task.platform,
2044 Debug,
2045 "network",
2046 "blocks-request-error",
2047 chain = task.network[chain_id].log_name,
2048 target = peer_id,
2049 ?error
2050 );
2051 }
2052 }
2053
2054 match &response {
2055 Ok(_) => {}
2056 Err(service::BlocksRequestError::Request(err)) if !err.is_protocol_error() => {}
2057 Err(err) => {
2058 log!(
2059 &task.platform,
2060 Debug,
2061 "network",
2062 format!(
2063 "Error in block request with {}. This might indicate an \
2064 incompatibility. Error: {}",
2065 peer_id, err
2066 )
2067 );
2068 }
2069 }
2070
2071 let _ = task
2072 .blocks_requests
2073 .remove(&substream_id)
2074 .unwrap()
2075 .send(response.map_err(BlocksRequestError::Request));
2076 }
2077 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2078 substream_id,
2079 peer_id,
2080 chain_id,
2081 response: service::RequestResult::GrandpaWarpSync(response),
2082 }) => {
2083 match &response {
2084 Ok(response) => {
2085 let decoded = response.decode();
2087 log!(
2088 &task.platform,
2089 Debug,
2090 "network",
2091 "warp-sync-request-success",
2092 chain = task.network[chain_id].log_name,
2093 target = peer_id,
2094 num_fragments = decoded.fragments.len(),
2095 is_finished = ?decoded.is_finished,
2096 );
2097 }
2098 Err(error) => {
2099 log!(
2100 &task.platform,
2101 Debug,
2102 "network",
2103 "warp-sync-request-error",
2104 chain = task.network[chain_id].log_name,
2105 target = peer_id,
2106 ?error,
2107 );
2108 }
2109 }
2110
2111 let _ = task
2112 .grandpa_warp_sync_requests
2113 .remove(&substream_id)
2114 .unwrap()
2115 .send(response.map_err(WarpSyncRequestError::Request));
2116 }
2117 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2118 substream_id,
2119 peer_id,
2120 chain_id,
2121 response: service::RequestResult::StorageProof(response),
2122 }) => {
2123 match &response {
2124 Ok(items) => {
2125 let decoded = items.decode();
2126 log!(
2127 &task.platform,
2128 Debug,
2129 "network",
2130 "storage-proof-request-success",
2131 chain = task.network[chain_id].log_name,
2132 target = peer_id,
2133 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap()),
2134 );
2135 }
2136 Err(error) => {
2137 log!(
2138 &task.platform,
2139 Debug,
2140 "network",
2141 "storage-proof-request-error",
2142 chain = task.network[chain_id].log_name,
2143 target = peer_id,
2144 ?error
2145 );
2146 }
2147 }
2148
2149 let _ = task
2150 .storage_proof_requests
2151 .remove(&substream_id)
2152 .unwrap()
2153 .send(response.map_err(StorageProofRequestError::Request));
2154 }
2155 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2156 substream_id,
2157 peer_id,
2158 chain_id,
2159 response: service::RequestResult::CallProof(response),
2160 }) => {
2161 match &response {
2162 Ok(items) => {
2163 let decoded = items.decode();
2164 log!(
2165 &task.platform,
2166 Debug,
2167 "network",
2168 "call-proof-request-success",
2169 chain = task.network[chain_id].log_name,
2170 target = peer_id,
2171 total_size = BytesDisplay(u64::try_from(decoded.len()).unwrap())
2172 );
2173 }
2174 Err(error) => {
2175 log!(
2176 &task.platform,
2177 Debug,
2178 "network",
2179 "call-proof-request-error",
2180 chain = task.network[chain_id].log_name,
2181 target = peer_id,
2182 ?error
2183 );
2184 }
2185 }
2186
2187 let _ = task
2188 .call_proof_requests
2189 .remove(&substream_id)
2190 .unwrap()
2191 .send(response.map_err(CallProofRequestError::Request));
2192 }
2193 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2194 peer_id: requestee_peer_id,
2195 chain_id,
2196 response: service::RequestResult::KademliaFindNode(Ok(nodes)),
2197 ..
2198 }) => {
2199 for (peer_id, mut addrs) in nodes {
2200 if addrs.len() >= 10 {
2203 addrs.truncate(10);
2204 }
2205
2206 let mut valid_addrs = Vec::with_capacity(addrs.len());
2207 for addr in addrs {
2208 match Multiaddr::from_bytes(addr) {
2209 Ok(a) => {
2210 if platform::address_parse::multiaddr_to_address(&a)
2211 .ok()
2212 .map_or(false, |addr| {
2213 task.platform.supports_connection_type((&addr).into())
2214 })
2215 {
2216 valid_addrs.push(a)
2217 } else {
2218 log!(
2219 &task.platform,
2220 Debug,
2221 "network",
2222 "discovered-address-not-supported",
2223 chain = &task.network[chain_id].log_name,
2224 peer_id,
2225 addr = &a,
2226 obtained_from = requestee_peer_id
2227 );
2228 }
2229 }
2230 Err((error, addr)) => {
2231 log!(
2232 &task.platform,
2233 Debug,
2234 "network",
2235 "discovered-address-invalid",
2236 chain = &task.network[chain_id].log_name,
2237 peer_id,
2238 error,
2239 addr = hex::encode(&addr),
2240 obtained_from = requestee_peer_id
2241 );
2242 }
2243 }
2244 }
2245
2246 if !valid_addrs.is_empty() {
2247 let insert_outcome =
2250 task.peering_strategy
2251 .insert_chain_peer(chain_id, peer_id.clone(), 30); if let basic_peering_strategy::InsertChainPeerResult::Inserted {
2254 peer_removed,
2255 } = insert_outcome
2256 {
2257 if let Some(peer_removed) = peer_removed {
2258 log!(
2259 &task.platform,
2260 Debug,
2261 "network",
2262 "peer-purged-from-address-book",
2263 chain = &task.network[chain_id].log_name,
2264 peer_id = peer_removed,
2265 );
2266 }
2267
2268 log!(
2269 &task.platform,
2270 Debug,
2271 "network",
2272 "peer-discovered",
2273 chain = &task.network[chain_id].log_name,
2274 peer_id,
2275 addrs = ?valid_addrs.iter().map(|a| a.to_string()).collect::<Vec<_>>(), obtained_from = requestee_peer_id
2277 );
2278 }
2279 }
2280
2281 for addr in valid_addrs {
2282 let _insert_result =
2283 task.peering_strategy
2284 .insert_address(&peer_id, addr.into_bytes(), 10); debug_assert!(!matches!(
2286 _insert_result,
2287 basic_peering_strategy::InsertAddressResult::UnknownPeer
2288 ));
2289 }
2290 }
2291 }
2292 WakeUpReason::NetworkEvent(service::Event::RequestResult {
2293 peer_id,
2294 chain_id,
2295 response: service::RequestResult::KademliaFindNode(Err(error)),
2296 ..
2297 }) => {
2298 log!(
2299 &task.platform,
2300 Debug,
2301 "network",
2302 "discovery-find-node-error",
2303 chain = &task.network[chain_id].log_name,
2304 ?error,
2305 find_node_target = peer_id,
2306 );
2307
2308 match error {
2311 service::KademliaFindNodeError::RequestFailed(err)
2312 if !err.is_protocol_error() => {}
2313
2314 service::KademliaFindNodeError::RequestFailed(
2315 service::RequestError::Substream(
2316 connection::established::RequestError::ProtocolNotAvailable,
2317 ),
2318 ) => {
2319 log!(
2321 &task.platform,
2322 Warn,
2323 "network",
2324 format!(
2325 "Problem during discovery on {}: protocol not available. \
2326 This might indicate that the version of Substrate used by \
2327 the chain doesn't include \
2328 <https://github.com/paritytech/substrate/pull/12545>.",
2329 &task.network[chain_id].log_name
2330 )
2331 );
2332 }
2333 _ => {
2334 log!(
2335 &task.platform,
2336 Debug,
2337 "network",
2338 format!(
2339 "Problem during discovery on {}: {}",
2340 &task.network[chain_id].log_name, error
2341 )
2342 );
2343 }
2344 }
2345 }
2346 WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => {
2347 unreachable!()
2349 }
2350 WakeUpReason::NetworkEvent(service::Event::GossipInDesired {
2351 peer_id,
2352 chain_id,
2353 kind: service::GossipKind::ConsensusTransactions,
2354 }) => {
2355 if task
2360 .network
2361 .opened_gossip_undesired_by_chain(chain_id)
2362 .count()
2363 < 4
2364 {
2365 log!(
2366 &task.platform,
2367 Debug,
2368 "network",
2369 "gossip-in-request",
2370 chain = &task.network[chain_id].log_name,
2371 peer_id,
2372 outcome = "accepted"
2373 );
2374 task.network
2375 .gossip_open(
2376 chain_id,
2377 &peer_id,
2378 service::GossipKind::ConsensusTransactions,
2379 )
2380 .unwrap();
2381 } else {
2382 log!(
2383 &task.platform,
2384 Debug,
2385 "network",
2386 "gossip-in-request",
2387 chain = &task.network[chain_id].log_name,
2388 peer_id,
2389 outcome = "rejected",
2390 );
2391 task.network
2392 .gossip_close(
2393 chain_id,
2394 &peer_id,
2395 service::GossipKind::ConsensusTransactions,
2396 )
2397 .unwrap();
2398 }
2399 }
2400 WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => {
2401 unreachable!()
2403 }
2404 WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn {
2405 peer_id,
2406 substream_id,
2407 }) => {
2408 log!(
2409 &task.platform,
2410 Debug,
2411 "network",
2412 "identify-request-received",
2413 peer_id,
2414 );
2415 task.network
2416 .respond_identify(substream_id, &task.identify_agent_version);
2417 }
2418 WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { .. }) => unreachable!(),
2419 WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => {
2420 unreachable!()
2422 }
2423 WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket {
2424 chain_id,
2425 peer_id,
2426 state,
2427 }) => {
2428 log!(
2429 &task.platform,
2430 Debug,
2431 "network",
2432 "grandpa-neighbor-packet-received",
2433 chain = &task.network[chain_id].log_name,
2434 peer_id,
2435 round_number = state.round_number,
2436 set_id = state.set_id,
2437 commit_finalized_height = state.commit_finalized_height,
2438 );
2439
2440 task.open_gossip_links
2441 .get_mut(&(chain_id, peer_id.clone()))
2442 .unwrap()
2443 .finalized_block_height = Some(state.commit_finalized_height);
2444
2445 debug_assert!(task.event_pending_send.is_none());
2446 task.event_pending_send = Some((
2447 chain_id,
2448 Event::GrandpaNeighborPacket {
2449 peer_id,
2450 finalized_block_height: state.commit_finalized_height,
2451 },
2452 ));
2453 }
2454 WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage {
2455 chain_id,
2456 peer_id,
2457 message,
2458 }) => {
2459 log!(
2460 &task.platform,
2461 Debug,
2462 "network",
2463 "grandpa-commit-message-received",
2464 chain = &task.network[chain_id].log_name,
2465 peer_id,
2466 target_block_hash = HashDisplay(message.decode().target_hash),
2467 );
2468
2469 debug_assert!(task.event_pending_send.is_none());
2470 task.event_pending_send =
2471 Some((chain_id, Event::GrandpaCommitMessage { peer_id, message }));
2472 }
2473 WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => {
2474 log!(
2476 &task.platform,
2477 Warn,
2478 "network",
2479 "protocol-error",
2480 peer_id,
2481 ?error
2482 );
2483
2484 }
2486 WakeUpReason::CanAssignSlot(peer_id, chain_id) => {
2487 task.peering_strategy.assign_slot(&chain_id, &peer_id);
2488
2489 log!(
2490 &task.platform,
2491 Debug,
2492 "network",
2493 "slot-assigned",
2494 chain = &task.network[chain_id].log_name,
2495 peer_id
2496 );
2497
2498 task.network.gossip_insert_desired(
2499 chain_id,
2500 peer_id,
2501 service::GossipKind::ConsensusTransactions,
2502 );
2503 }
2504 WakeUpReason::NextRecentConnectionRestore => {
2505 task.num_recent_connection_opening =
2506 task.num_recent_connection_opening.saturating_sub(1);
2507 }
2508 WakeUpReason::CanStartConnect(expected_peer_id) => {
2509 let Some(multiaddr) = task
2510 .peering_strategy
2511 .pick_address_and_add_connection(&expected_peer_id)
2512 else {
2513 task.network.gossip_remove_desired_all(
2515 &expected_peer_id,
2516 service::GossipKind::ConsensusTransactions,
2517 );
2518 let ban_duration = Duration::from_secs(10);
2519 for (&chain_id, what_happened) in task.peering_strategy.unassign_slots_and_ban(
2520 &expected_peer_id,
2521 task.platform.now() + ban_duration,
2522 ) {
2523 if matches!(
2524 what_happened,
2525 basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true }
2526 ) {
2527 log!(
2528 &task.platform,
2529 Debug,
2530 "network",
2531 "slot-unassigned",
2532 chain = &task.network[chain_id].log_name,
2533 peer_id = expected_peer_id,
2534 ?ban_duration,
2535 reason = "no-address"
2536 );
2537 }
2538 }
2539 continue;
2540 };
2541
2542 let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) {
2543 Ok(a) => a,
2544 Err((multiaddr::FromBytesError, addr)) => {
2545 let _was_in = task
2547 .peering_strategy
2548 .decrease_address_connections_and_remove_if_zero(
2549 &expected_peer_id,
2550 &addr,
2551 );
2552 debug_assert!(_was_in.is_ok());
2553 continue;
2554 }
2555 };
2556
2557 let address = address_parse::multiaddr_to_address(&multiaddr)
2558 .ok()
2559 .filter(|addr| {
2560 task.platform.supports_connection_type(match &addr {
2561 address_parse::AddressOrMultiStreamAddress::Address(addr) => {
2562 From::from(addr)
2563 }
2564 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2565 addr,
2566 ) => From::from(addr),
2567 })
2568 });
2569
2570 let Some(address) = address else {
2571 let _was_in = task
2573 .peering_strategy
2574 .decrease_address_connections_and_remove_if_zero(
2575 &expected_peer_id,
2576 multiaddr.as_ref(),
2577 );
2578 debug_assert!(_was_in.is_ok());
2579 continue;
2580 };
2581
2582 let noise_key = {
2584 let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
2585 task.platform.fill_random_bytes(&mut *noise_static_key);
2586 let mut libp2p_key = zeroize::Zeroizing::new([0u8; 32]);
2587 task.platform.fill_random_bytes(&mut *libp2p_key);
2588 connection::NoiseKey::new(&libp2p_key, &noise_static_key)
2589 };
2590
2591 log!(
2592 &task.platform,
2593 Debug,
2594 "network",
2595 "connection-started",
2596 expected_peer_id,
2597 remote_addr = multiaddr,
2598 local_peer_id =
2599 peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key())
2600 .into_peer_id(),
2601 );
2602
2603 task.num_recent_connection_opening += 1;
2604
2605 let (coordinator_to_connection_tx, coordinator_to_connection_rx) =
2606 async_channel::bounded(8);
2607 let task_name = format!("connection-{}", multiaddr);
2608
2609 match address {
2610 address_parse::AddressOrMultiStreamAddress::Address(address) => {
2611 let connection = task.platform.connect_stream(address).await;
2614
2615 let (connection_id, connection_task) =
2616 task.network.add_single_stream_connection(
2617 task.platform.now(),
2618 service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux {
2619 is_initiator: true,
2620 noise_key: &noise_key,
2621 },
2622 multiaddr.clone().into_bytes(),
2623 Some(expected_peer_id.clone()),
2624 coordinator_to_connection_tx,
2625 );
2626
2627 task.platform.spawn_task(
2628 task_name.into(),
2629 tasks::single_stream_connection_task::<TPlat>(
2630 connection,
2631 multiaddr.to_string(),
2632 task.platform.clone(),
2633 connection_id,
2634 connection_task,
2635 coordinator_to_connection_rx,
2636 task.tasks_messages_tx.clone(),
2637 ),
2638 );
2639 }
2640 address_parse::AddressOrMultiStreamAddress::MultiStreamAddress(
2641 platform::MultiStreamAddress::WebRtc {
2642 ip,
2643 port,
2644 remote_certificate_sha256,
2645 },
2646 ) => {
2647 let connection = task
2652 .platform
2653 .connect_multistream(platform::MultiStreamAddress::WebRtc {
2654 ip,
2655 port,
2656 remote_certificate_sha256,
2657 })
2658 .await;
2659
2660 let local_tls_certificate_multihash = [18u8, 32]
2662 .into_iter()
2663 .chain(connection.local_tls_certificate_sha256.into_iter())
2664 .collect();
2665 let remote_tls_certificate_multihash = [18u8, 32]
2666 .into_iter()
2667 .chain(remote_certificate_sha256.iter().copied())
2668 .collect();
2669
2670 let (connection_id, connection_task) =
2671 task.network.add_multi_stream_connection(
2672 task.platform.now(),
2673 service::MultiStreamHandshakeKind::WebRtc {
2674 is_initiator: true,
2675 local_tls_certificate_multihash,
2676 remote_tls_certificate_multihash,
2677 noise_key: &noise_key,
2678 },
2679 multiaddr.clone().into_bytes(),
2680 Some(expected_peer_id.clone()),
2681 coordinator_to_connection_tx,
2682 );
2683
2684 task.platform.spawn_task(
2685 task_name.into(),
2686 tasks::webrtc_multi_stream_connection_task::<TPlat>(
2687 connection.connection,
2688 multiaddr.to_string(),
2689 task.platform.clone(),
2690 connection_id,
2691 connection_task,
2692 coordinator_to_connection_rx,
2693 task.tasks_messages_tx.clone(),
2694 ),
2695 );
2696 }
2697 }
2698 }
2699 WakeUpReason::CanOpenGossip(peer_id, chain_id) => {
2700 task.network
2701 .gossip_open(
2702 chain_id,
2703 &peer_id,
2704 service::GossipKind::ConsensusTransactions,
2705 )
2706 .unwrap();
2707
2708 log!(
2709 &task.platform,
2710 Debug,
2711 "network",
2712 "gossip-open-start",
2713 chain = &task.network[chain_id].log_name,
2714 peer_id,
2715 );
2716 }
2717 WakeUpReason::MessageToConnection {
2718 connection_id,
2719 message,
2720 } => {
2721 let _send_result = task.network[connection_id].send(message).await;
2729 debug_assert!(_send_result.is_ok());
2730 }
2731 }
2732 }
2733}