/__w/smoldot/smoldot/repo/full-node/src/network_service.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | //! Background network service. |
19 | | //! |
20 | | //! The [`NetworkService`] manages background tasks dedicated to connecting to other nodes. |
21 | | //! Importantly, its design is oriented towards the particular use case of the full node. |
22 | | //! |
23 | | //! The [`NetworkService`] spawns one background task (using the [`Config::tasks_executor`]) for |
24 | | //! each active TCP socket, plus one for each TCP listening socket. Messages are exchanged between |
25 | | //! the service and these background tasks. |
26 | | |
27 | | // TODO: doc |
28 | | // TODO: re-review this once finished |
29 | | |
30 | | use crate::{database_thread, jaeger_service, LogCallback, LogLevel}; |
31 | | |
32 | | use core::{cmp, future::Future, mem, pin::Pin, task::Poll, time::Duration}; |
33 | | use futures_channel::oneshot; |
34 | | use futures_lite::FutureExt as _; |
35 | | use futures_util::stream::{self, SelectAll}; |
36 | | use hashbrown::HashMap; |
37 | | use smol::{ |
38 | | channel, future, |
39 | | lock::Mutex, |
40 | | net::TcpStream, |
41 | | stream::{Stream, StreamExt as _}, |
42 | | }; |
43 | | use smoldot::{ |
44 | | database::full_sqlite, |
45 | | header, |
46 | | informant::{BytesDisplay, HashDisplay}, |
47 | | libp2p::{ |
48 | | connection, |
49 | | multiaddr::{self, Multiaddr, Protocol}, |
50 | | peer_id::{self, PeerId}, |
51 | | }, |
52 | | network::{basic_peering_strategy, codec, service}, |
53 | | }; |
54 | | use std::{ |
55 | | io, |
56 | | net::{IpAddr, SocketAddr}, |
57 | | sync::Arc, |
58 | | time::Instant, |
59 | | vec, |
60 | | }; |
61 | | |
62 | | pub use smoldot::network::service::ChainId; |
63 | | |
64 | | mod tasks; |
65 | | |
66 | | /// Configuration for a [`NetworkService`]. |
67 | | pub struct Config { |
68 | | /// Closure that spawns background tasks. |
69 | | pub tasks_executor: Box<dyn FnMut(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>, |
70 | | |
71 | | /// Function called in order to notify of something. |
72 | | pub log_callback: Arc<dyn LogCallback + Send + Sync>, |
73 | | |
74 | | /// Number of event receivers returned by [`NetworkService::new`]. |
75 | | pub num_events_receivers: usize, |
76 | | |
77 | | /// Addresses to listen for incoming connections. |
78 | | pub listen_addresses: Vec<Multiaddr>, |
79 | | |
80 | | /// List of block chains to be connected to. |
81 | | pub chains: Vec<ChainConfig>, |
82 | | |
83 | | /// Value sent back for the agent version when receiving an identification request. |
84 | | pub identify_agent_version: String, |
85 | | |
86 | | /// Key used for the encryption layer. |
87 | | /// This is a Noise static key, according to the Noise specification. |
88 | | /// Signed using the actual libp2p key. |
89 | | pub noise_key: connection::NoiseKey, |
90 | | |
91 | | /// Service to use to report traces. |
92 | | pub jaeger_service: Arc<jaeger_service::JaegerService>, |
93 | | } |
94 | | |
95 | | /// Configuration for one chain. |
96 | | pub struct ChainConfig { |
97 | | /// Name of the chain to use for logging purposes. |
98 | | pub log_name: String, |
99 | | |
100 | | /// List of node identities and addresses that are known to belong to the chain's peer-to-pee |
101 | | /// network. |
102 | | pub bootstrap_nodes: Vec<(PeerId, Multiaddr)>, |
103 | | |
104 | | /// Database to use to read blocks from when answering requests. |
105 | | pub database: Arc<database_thread::DatabaseThread>, |
106 | | |
107 | | /// Hash of the genesis block of the chain. Sent to other nodes in order to determine whether |
108 | | /// the chains match. |
109 | | pub genesis_block_hash: [u8; 32], |
110 | | |
111 | | /// Number and hash of the current best block. Can later be updated with // TODO: which function? |
112 | | pub best_block: (u64, [u8; 32]), |
113 | | |
114 | | /// Optional identifier to insert into the networking protocol names. Used to differentiate |
115 | | /// between chains with the same genesis hash. |
116 | | pub fork_id: Option<String>, |
117 | | |
118 | | /// Number of bytes of the block number in the networking protocol. |
119 | | pub block_number_bytes: usize, |
120 | | |
121 | | /// Must be `Some` if and only if the chain uses the GrandPa networking protocol. Contains the |
122 | | /// number of the finalized block at the time of the initialization. |
123 | | pub grandpa_protocol_finalized_block_height: Option<u64>, |
124 | | |
125 | | /// Maximum number of peers that have slots attributed to them. |
126 | | pub max_slots: usize, |
127 | | |
128 | | /// Maximum number of peers that have gossip links open but without having slots attributed |
129 | | /// to them. |
130 | | pub max_in_peers: usize, |
131 | | } |
132 | | |
133 | | /// Event generated by the events reporters returned by [`NetworkService::new`]. |
134 | | #[derive(Debug, Clone)] |
135 | | pub enum Event { |
136 | | Connected { |
137 | | chain_id: ChainId, |
138 | | peer_id: PeerId, |
139 | | best_block_number: u64, |
140 | | best_block_hash: [u8; 32], |
141 | | }, |
142 | | Disconnected { |
143 | | chain_id: ChainId, |
144 | | peer_id: PeerId, |
145 | | }, |
146 | | BlockAnnounce { |
147 | | chain_id: ChainId, |
148 | | peer_id: PeerId, |
149 | | scale_encoded_header: Vec<u8>, |
150 | | is_best: bool, |
151 | | }, |
152 | | GrandpaNeighborPacket { |
153 | | chain_id: ChainId, |
154 | | peer_id: PeerId, |
155 | | finalized_block_height: u64, |
156 | | }, |
157 | | } |
158 | | |
159 | | pub struct NetworkService { |
160 | | /// Identity of the local node. |
161 | | local_peer_id: PeerId, |
162 | | |
163 | | /// Service to use to report traces. |
164 | | // TODO: unused |
165 | | _jaeger_service: Arc<jaeger_service::JaegerService>, |
166 | | |
167 | | /// Channel to send messages to the background task. |
168 | | to_background_tx: Mutex<channel::Sender<ToBackground>>, |
169 | | |
170 | | /// Name of all the chains that have been registered, for logging purposes. |
171 | | chain_names: hashbrown::HashMap<ChainId, String, fnv::FnvBuildHasher>, |
172 | | } |
173 | | |
174 | | enum ToBackground { |
175 | | ForegroundDisconnectAndBan { |
176 | | peer_id: PeerId, |
177 | | chain_id: ChainId, |
178 | | severity: BanSeverity, |
179 | | reason: &'static str, |
180 | | }, |
181 | | ForegroundAnnounceBlock { |
182 | | target: PeerId, |
183 | | chain_id: ChainId, |
184 | | scale_encoded_header: Vec<u8>, |
185 | | is_best: bool, |
186 | | result_tx: oneshot::Sender<Result<(), service::QueueNotificationError>>, |
187 | | }, |
188 | | ForegroundSetLocalBestBlock { |
189 | | chain_id: ChainId, |
190 | | best_hash: [u8; 32], |
191 | | best_number: u64, |
192 | | }, |
193 | | ForegroundBlocksRequest { |
194 | | target: PeerId, |
195 | | chain_id: ChainId, |
196 | | config: codec::BlocksRequestConfig, |
197 | | result_tx: oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>, |
198 | | }, |
199 | | ForegroundWarpSyncRequest { |
200 | | target: PeerId, |
201 | | chain_id: ChainId, |
202 | | begin_hash: [u8; 32], |
203 | | result_tx: |
204 | | oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>, |
205 | | }, |
206 | | ForegroundStorageProofRequest { |
207 | | target: PeerId, |
208 | | chain_id: ChainId, |
209 | | config: codec::StorageProofRequestConfig<vec::IntoIter<Vec<u8>>>, |
210 | | result_tx: oneshot::Sender<Result<service::EncodedMerkleProof, ()>>, |
211 | | }, |
212 | | ForegroundCallProofRequest { |
213 | | target: PeerId, // TODO: takes by value because of futures longevity issue |
214 | | chain_id: ChainId, |
215 | | config: codec::CallProofRequestConfig<'static, vec::IntoIter<Vec<u8>>>, |
216 | | result_tx: oneshot::Sender<Result<service::EncodedMerkleProof, ()>>, |
217 | | }, |
218 | | ForegroundGetNumConnections { |
219 | | result_tx: oneshot::Sender<usize>, |
220 | | }, |
221 | | ForegroundGetNumPeers { |
222 | | chain_id: ChainId, |
223 | | result_tx: oneshot::Sender<usize>, |
224 | | }, |
225 | | ForegroundGetNumTotalPeers { |
226 | | result_tx: oneshot::Sender<usize>, |
227 | | }, |
228 | | } |
229 | | |
230 | | struct Inner { |
231 | | /// Value provided through [`Config::identify_agent_version`]. |
232 | | identify_agent_version: String, |
233 | | |
234 | | /// Sending events through the public API. |
235 | | /// |
236 | | /// Contains either senders, or a `Future` that is currently sending an event and will yield |
237 | | /// the senders back once it is finished. |
238 | | event_senders: either::Either< |
239 | | Vec<channel::Sender<Event>>, |
240 | | Pin<Box<dyn Future<Output = Vec<channel::Sender<Event>>> + Send>>, |
241 | | >, |
242 | | |
243 | | /// Event about to be sent on the senders of [`Inner::event_senders`]. |
244 | | event_pending_send: Option<Event>, |
245 | | |
246 | | /// Identity of the local node. |
247 | | noise_key: service::NoiseKey, |
248 | | |
249 | | /// Identity of the local node. Can be derived from [`Inner::noise_key`]. |
250 | | local_peer_id: PeerId, |
251 | | |
252 | | /// Service to use to report traces. |
253 | | jaeger_service: Arc<jaeger_service::JaegerService>, |
254 | | |
255 | | /// Data structure holding the entire state of the networking. |
256 | | network: |
257 | | service::ChainNetwork<Chain, channel::Sender<service::CoordinatorToConnection>, Instant>, |
258 | | |
259 | | /// Data structure holding the addresses and assigned slots. |
260 | | peering_strategy: basic_peering_strategy::BasicPeeringStrategy<ChainId, Instant>, |
261 | | |
262 | | /// Current number of outgoing connection attempts. |
263 | | /// |
264 | | /// This counter is used to limit the number of simultaneous connection attempts, as some |
265 | | /// ISPs/cloud providers don't like seeing too many dialing connections at the same time. |
266 | | num_pending_out_attempts: usize, |
267 | | |
268 | | /// Stream of incoming connections. |
269 | | incoming_connections: SelectAll<Pin<Box<dyn Stream<Item = (TcpStream, SocketAddr)> + Send>>>, |
270 | | |
271 | | /// See [`Config::tasks_executor`]. |
272 | | tasks_executor: Box<dyn FnMut(Pin<Box<dyn Future<Output = ()> + Send>>) + Send>, |
273 | | |
274 | | /// See [`Config::log_callback`]. |
275 | | log_callback: Arc<dyn LogCallback + Send + Sync>, |
276 | | |
277 | | /// Channel for the frontend to send messages to the background task. |
278 | | to_background_rx: Pin<Box<channel::Receiver<ToBackground>>>, |
279 | | |
280 | | /// Channel where connections send messages destined to the coordinator. |
281 | | from_connections_rx: Pin< |
282 | | Box< |
283 | | channel::Receiver<( |
284 | | service::ConnectionId, |
285 | | Option<service::ConnectionToCoordinator>, |
286 | | )>, |
287 | | >, |
288 | | >, |
289 | | |
290 | | /// Sending side of [`Inner::from_connections_rx`]. |
291 | | from_connections_tx: channel::Sender<( |
292 | | service::ConnectionId, |
293 | | Option<service::ConnectionToCoordinator>, |
294 | | )>, |
295 | | |
296 | | /// List of all block requests that have been started but not finished yet. |
297 | | blocks_requests: HashMap< |
298 | | service::SubstreamId, |
299 | | oneshot::Sender<Result<Vec<codec::BlockData>, BlocksRequestError>>, |
300 | | fnv::FnvBuildHasher, |
301 | | >, |
302 | | |
303 | | /// List of all warp sync requests that have been started but not finished yet. |
304 | | warp_sync_requests: HashMap< |
305 | | service::SubstreamId, |
306 | | oneshot::Sender<Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError>>, |
307 | | fnv::FnvBuildHasher, |
308 | | >, |
309 | | |
310 | | /// List of all storage requests that have been started but not finished yet. |
311 | | storage_requests: HashMap< |
312 | | service::SubstreamId, |
313 | | oneshot::Sender<Result<service::EncodedMerkleProof, ()>>, |
314 | | fnv::FnvBuildHasher, |
315 | | >, |
316 | | |
317 | | /// List of all call proof requests that have been started but not finished yet. |
318 | | call_proof_requests: HashMap< |
319 | | service::SubstreamId, |
320 | | oneshot::Sender<Result<service::EncodedMerkleProof, ()>>, |
321 | | fnv::FnvBuildHasher, |
322 | | >, |
323 | | |
324 | | /// When to start the next discovery process. |
325 | | next_discovery: smol::Timer, |
326 | | |
327 | | /// Time between [`Inner::next_discovery`] and the follow-up discovery. |
328 | | next_discovery_period: Duration, |
329 | | } |
330 | | |
331 | | /// Extra information of a chain. |
332 | | struct Chain { |
333 | | /// Name of the chain to use for logging purposes. |
334 | | log_name: String, |
335 | | |
336 | | /// How to access data to answer requests from the remotes. |
337 | | database: Arc<database_thread::DatabaseThread>, |
338 | | |
339 | | /// Maximum number of peers that have slots attributed to them. |
340 | | max_slots: usize, |
341 | | |
342 | | /// Maximum number of peers that have gossip links open but without having slots attributed |
343 | | /// to them. |
344 | | max_in_peers: usize, |
345 | | } |
346 | | |
347 | | /// Severity of a ban. See [`NetworkService::ban_and_disconnect`]. |
348 | | #[derive(Debug, Copy, Clone, PartialEq, Eq)] |
349 | | pub enum BanSeverity { |
350 | | Low, |
351 | | High, |
352 | | } |
353 | | |
354 | | impl NetworkService { |
355 | | /// Initializes the network service with the given configuration. |
356 | 21 | pub async fn new( |
357 | 21 | config: Config, |
358 | 21 | ) -> Result< |
359 | 21 | ( |
360 | 21 | Arc<Self>, |
361 | 21 | Vec<ChainId>, |
362 | 21 | Vec<Pin<Box<dyn Stream<Item = Event> + Send>>>, |
363 | 21 | ), |
364 | 21 | InitError, |
365 | 21 | > { _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService3new Line | Count | Source | 356 | 21 | pub async fn new( | 357 | 21 | config: Config, | 358 | 21 | ) -> Result< | 359 | 21 | ( | 360 | 21 | Arc<Self>, | 361 | 21 | Vec<ChainId>, | 362 | 21 | Vec<Pin<Box<dyn Stream<Item = Event> + Send>>>, | 363 | 21 | ), | 364 | 21 | InitError, | 365 | 21 | > { |
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService3new |
366 | 21 | let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers) |
367 | 42 | .map(|_| channel::bounded(16)) _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 367 | 4 | .map(|_| channel::bounded(16)) |
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00B8_ Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CscDgN54JpMGG_6author _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 367 | 38 | .map(|_| channel::bounded(16)) |
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new00B8_ |
368 | 21 | .unzip(); |
369 | 21 | |
370 | 21 | let mut network = service::ChainNetwork::new(service::Config { |
371 | 21 | chains_capacity: config.chains.len(), |
372 | 21 | connections_capacity: 100, // TODO: ? |
373 | 21 | handshake_timeout: Duration::from_secs(8), |
374 | 21 | randomness_seed: rand::random(), |
375 | 21 | }); |
376 | 21 | |
377 | 21 | let mut peering_strategy = |
378 | 21 | basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config { |
379 | 21 | randomness_seed: rand::random(), |
380 | 21 | peers_capacity: 200, // TODO: ? |
381 | 21 | chains_capacity: config.chains.len(), |
382 | 21 | }); |
383 | 21 | |
384 | 21 | let mut chain_names = |
385 | 21 | hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default()); |
386 | | |
387 | 42 | for chain21 in config.chains { |
388 | 21 | let chain_id = network |
389 | 21 | .add_chain(service::ChainConfig { |
390 | 21 | fork_id: chain.fork_id.clone(), |
391 | 21 | block_number_bytes: chain.block_number_bytes, |
392 | 21 | best_hash: chain.best_block.1, |
393 | 21 | best_number: chain.best_block.0, |
394 | 21 | genesis_hash: chain.genesis_block_hash, |
395 | 21 | role: codec::Role::Full, |
396 | 21 | grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map( |
397 | 21 | // TODO: dummy values |
398 | 21 | |commit_finalized_height| service::GrandpaState { |
399 | 21 | commit_finalized_height, |
400 | 21 | round_number: 1, |
401 | 21 | set_id: 0, |
402 | 21 | }, _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 398 | 2 | |commit_finalized_height| service::GrandpaState { | 399 | 2 | commit_finalized_height, | 400 | 2 | round_number: 1, | 401 | 2 | set_id: 0, | 402 | 2 | }, |
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0B8_ Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CscDgN54JpMGG_6author _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 398 | 19 | |commit_finalized_height| service::GrandpaState { | 399 | 19 | commit_finalized_height, | 400 | 19 | round_number: 1, | 401 | 19 | set_id: 0, | 402 | 19 | }, |
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s_0B8_ |
403 | 21 | ), |
404 | 21 | allow_inbound_block_requests: true, |
405 | 21 | user_data: Chain { |
406 | 21 | log_name: chain.log_name.clone(), |
407 | 21 | database: chain.database, |
408 | 21 | max_in_peers: chain.max_in_peers, |
409 | 21 | max_slots: chain.max_slots, |
410 | 21 | }, |
411 | 21 | }) |
412 | 21 | .unwrap(); // TODO: don't unwrap? |
413 | | |
414 | 21 | for (peer_id, addr0 ) in chain.bootstrap_nodes { |
415 | 0 | // Note that we must call this function before `insert_address`, as documented |
416 | 0 | // in `basic_peering_strategy`. |
417 | 0 | peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX); |
418 | 0 | peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX); |
419 | 0 | } |
420 | | |
421 | 21 | chain_names.insert(chain_id, chain.log_name); |
422 | | } |
423 | | |
424 | 21 | let (to_background_tx, to_background_rx) = channel::bounded(16); |
425 | 21 | let (from_connections_tx, from_connections_rx) = channel::bounded(64); |
426 | 21 | |
427 | 21 | let local_peer_id = |
428 | 21 | peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key()) |
429 | 21 | .into_peer_id(); |
430 | 21 | |
431 | 21 | // For each listening address in the configuration, create a background task dedicated to |
432 | 21 | // listening on that address. |
433 | 21 | let mut incoming_connections = SelectAll::new(); |
434 | 21 | for listen_address0 in config.listen_addresses { |
435 | 0 | // Try to parse the requested address and create the corresponding listening socket. |
436 | 0 | let tcp_listener: smol::net::TcpListener = { |
437 | 0 | let addr = { |
438 | 0 | let mut iter = listen_address.iter(); |
439 | 0 | let proto1 = iter.next(); |
440 | 0 | let proto2 = iter.next(); |
441 | 0 | let proto3 = iter.next(); |
442 | 0 | match (proto1, proto2, proto3) { |
443 | 0 | (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => { |
444 | 0 | Some(SocketAddr::from((ip, port))) |
445 | | } |
446 | 0 | (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => { |
447 | 0 | Some(SocketAddr::from((ip, port))) |
448 | | } |
449 | 0 | _ => None, |
450 | | } |
451 | | }; |
452 | | |
453 | 0 | if let Some(addr) = addr { |
454 | 0 | match smol::net::TcpListener::bind(addr).await { |
455 | 0 | Ok(l) => l, |
456 | 0 | Err(err) => { |
457 | 0 | return Err(InitError::ListenerIo(listen_address, err)); |
458 | | } |
459 | | } |
460 | | } else { |
461 | | // TODO: support WebSocket server |
462 | 0 | return Err(InitError::BadListenMultiaddr(listen_address)); |
463 | | } |
464 | | }; |
465 | | |
466 | | // Add a task dedicated to this listener. |
467 | 0 | let log_callback = config.log_callback.clone(); |
468 | 0 | incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| { |
469 | 0 | let log_callback = log_callback.clone(); |
470 | 0 | async move { |
471 | | loop { |
472 | 0 | match tcp_listener.accept().await { |
473 | 0 | Ok((socket, socket_addr)) => { |
474 | 0 | break Some(((socket, socket_addr), tcp_listener)) |
475 | | } |
476 | 0 | Err(error) => { |
477 | 0 | // Errors here can happen if the accept failed, for example |
478 | 0 | // if no file descriptor is available. |
479 | 0 | // A wait is added in order to avoid having a busy-loop |
480 | 0 | // failing to accept connections. |
481 | 0 | log_callback.log( |
482 | 0 | LogLevel::Warn, |
483 | 0 | format!("tcp-accept-error; error={}", error), |
484 | 0 | ); |
485 | 0 | smol::Timer::after(Duration::from_secs(2)).await; |
486 | | } |
487 | | } |
488 | | } |
489 | 0 | } Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00Ba_ Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s0_00Ba_ |
490 | 0 | })) as Pin<Box<_>>); Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0B8_ Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s0_0B8_ |
491 | | } |
492 | | |
493 | | // Initialize the inner network service. |
494 | 21 | run(Inner { |
495 | 21 | local_peer_id: local_peer_id.clone(), |
496 | 21 | identify_agent_version: config.identify_agent_version, |
497 | 21 | event_senders: either::Left(event_senders), |
498 | 21 | event_pending_send: None, |
499 | 21 | num_pending_out_attempts: 0, |
500 | 21 | to_background_rx: Box::pin(to_background_rx), |
501 | 21 | from_connections_rx: Box::pin(from_connections_rx), |
502 | 21 | from_connections_tx, |
503 | 21 | tasks_executor: config.tasks_executor, |
504 | 21 | log_callback: config.log_callback, |
505 | 21 | network, |
506 | 21 | noise_key: config.noise_key, |
507 | 21 | peering_strategy, |
508 | 21 | blocks_requests: hashbrown::HashMap::with_capacity_and_hasher( |
509 | 21 | 50, // TODO: ? |
510 | 21 | Default::default(), |
511 | 21 | ), |
512 | 21 | warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher( |
513 | 21 | 2, // TODO: ? |
514 | 21 | Default::default(), |
515 | 21 | ), |
516 | 21 | storage_requests: hashbrown::HashMap::with_capacity_and_hasher( |
517 | 21 | 5, // TODO: ? |
518 | 21 | Default::default(), |
519 | 21 | ), |
520 | 21 | call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher( |
521 | 21 | 5, // TODO: ? |
522 | 21 | Default::default(), |
523 | 21 | ), |
524 | 21 | jaeger_service: config.jaeger_service.clone(), |
525 | 21 | next_discovery: smol::Timer::after(Duration::from_secs(1)), |
526 | 21 | next_discovery_period: Duration::from_secs(1), |
527 | 21 | incoming_connections, |
528 | 21 | }); |
529 | 21 | |
530 | 21 | // Build the final network service. |
531 | 21 | let network_service = Arc::new(NetworkService { |
532 | 21 | local_peer_id, |
533 | 21 | chain_names, |
534 | 21 | _jaeger_service: config.jaeger_service, |
535 | 21 | to_background_tx: Mutex::new(to_background_tx), |
536 | 21 | }); |
537 | 21 | |
538 | 21 | // Adjust the receivers to keep the `network_service` alive. |
539 | 21 | // TODO: no, hacky |
540 | 21 | let receivers = event_receivers |
541 | 21 | .into_iter() |
542 | 42 | .map(|rx| { |
543 | 42 | let mut network_service = Some(network_service.clone()); |
544 | 42 | rx.chain(smol::stream::poll_fn(move |_| { |
545 | 0 | drop(network_service.take()); |
546 | 0 | Poll::Ready(None) |
547 | 42 | })) Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00Ba_ Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB8_14NetworkService3new0s1_00Ba_ |
548 | 42 | .boxed() |
549 | 42 | }) _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 542 | 4 | .map(|rx| { | 543 | 4 | let mut network_service = Some(network_service.clone()); | 544 | 4 | rx.chain(smol::stream::poll_fn(move |_| { | 545 | | drop(network_service.take()); | 546 | | Poll::Ready(None) | 547 | 4 | })) | 548 | 4 | .boxed() | 549 | 4 | }) |
Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0B8_ Unexecuted instantiation: _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CscDgN54JpMGG_6author _RNCNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 542 | 38 | .map(|rx| { | 543 | 38 | let mut network_service = Some(network_service.clone()); | 544 | 38 | rx.chain(smol::stream::poll_fn(move |_| { | 545 | | drop(network_service.take()); | 546 | | Poll::Ready(None) | 547 | 38 | })) | 548 | 38 | .boxed() | 549 | 38 | }) |
Unexecuted instantiation: _RNCNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB6_14NetworkService3new0s1_0B8_ |
550 | 21 | .collect(); |
551 | 21 | |
552 | 21 | let chain_ids = network_service.chain_names.keys().cloned().collect(); |
553 | 21 | Ok((network_service, chain_ids, receivers)) |
554 | 21 | } _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 365 | 2 | > { | 366 | 2 | let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers) | 367 | 2 | .map(|_| channel::bounded(16)) | 368 | 2 | .unzip(); | 369 | 2 | | 370 | 2 | let mut network = service::ChainNetwork::new(service::Config { | 371 | 2 | chains_capacity: config.chains.len(), | 372 | 2 | connections_capacity: 100, // TODO: ? | 373 | 2 | handshake_timeout: Duration::from_secs(8), | 374 | 2 | randomness_seed: rand::random(), | 375 | 2 | }); | 376 | 2 | | 377 | 2 | let mut peering_strategy = | 378 | 2 | basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config { | 379 | 2 | randomness_seed: rand::random(), | 380 | 2 | peers_capacity: 200, // TODO: ? | 381 | 2 | chains_capacity: config.chains.len(), | 382 | 2 | }); | 383 | 2 | | 384 | 2 | let mut chain_names = | 385 | 2 | hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default()); | 386 | | | 387 | 4 | for chain2 in config.chains { | 388 | 2 | let chain_id = network | 389 | 2 | .add_chain(service::ChainConfig { | 390 | 2 | fork_id: chain.fork_id.clone(), | 391 | 2 | block_number_bytes: chain.block_number_bytes, | 392 | 2 | best_hash: chain.best_block.1, | 393 | 2 | best_number: chain.best_block.0, | 394 | 2 | genesis_hash: chain.genesis_block_hash, | 395 | 2 | role: codec::Role::Full, | 396 | 2 | grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map( | 397 | 2 | // TODO: dummy values | 398 | 2 | |commit_finalized_height| service::GrandpaState { | 399 | | commit_finalized_height, | 400 | | round_number: 1, | 401 | | set_id: 0, | 402 | 2 | }, | 403 | 2 | ), | 404 | 2 | allow_inbound_block_requests: true, | 405 | 2 | user_data: Chain { | 406 | 2 | log_name: chain.log_name.clone(), | 407 | 2 | database: chain.database, | 408 | 2 | max_in_peers: chain.max_in_peers, | 409 | 2 | max_slots: chain.max_slots, | 410 | 2 | }, | 411 | 2 | }) | 412 | 2 | .unwrap(); // TODO: don't unwrap? | 413 | | | 414 | 2 | for (peer_id, addr0 ) in chain.bootstrap_nodes { | 415 | 0 | // Note that we must call this function before `insert_address`, as documented | 416 | 0 | // in `basic_peering_strategy`. | 417 | 0 | peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX); | 418 | 0 | peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX); | 419 | 0 | } | 420 | | | 421 | 2 | chain_names.insert(chain_id, chain.log_name); | 422 | | } | 423 | | | 424 | 2 | let (to_background_tx, to_background_rx) = channel::bounded(16); | 425 | 2 | let (from_connections_tx, from_connections_rx) = channel::bounded(64); | 426 | 2 | | 427 | 2 | let local_peer_id = | 428 | 2 | peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key()) | 429 | 2 | .into_peer_id(); | 430 | 2 | | 431 | 2 | // For each listening address in the configuration, create a background task dedicated to | 432 | 2 | // listening on that address. | 433 | 2 | let mut incoming_connections = SelectAll::new(); | 434 | 2 | for listen_address0 in config.listen_addresses { | 435 | 0 | // Try to parse the requested address and create the corresponding listening socket. | 436 | 0 | let tcp_listener: smol::net::TcpListener = { | 437 | 0 | let addr = { | 438 | 0 | let mut iter = listen_address.iter(); | 439 | 0 | let proto1 = iter.next(); | 440 | 0 | let proto2 = iter.next(); | 441 | 0 | let proto3 = iter.next(); | 442 | 0 | match (proto1, proto2, proto3) { | 443 | 0 | (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => { | 444 | 0 | Some(SocketAddr::from((ip, port))) | 445 | | } | 446 | 0 | (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => { | 447 | 0 | Some(SocketAddr::from((ip, port))) | 448 | | } | 449 | 0 | _ => None, | 450 | | } | 451 | | }; | 452 | | | 453 | 0 | if let Some(addr) = addr { | 454 | 0 | match smol::net::TcpListener::bind(addr).await { | 455 | 0 | Ok(l) => l, | 456 | 0 | Err(err) => { | 457 | 0 | return Err(InitError::ListenerIo(listen_address, err)); | 458 | | } | 459 | | } | 460 | | } else { | 461 | | // TODO: support WebSocket server | 462 | 0 | return Err(InitError::BadListenMultiaddr(listen_address)); | 463 | | } | 464 | | }; | 465 | | | 466 | | // Add a task dedicated to this listener. | 467 | 0 | let log_callback = config.log_callback.clone(); | 468 | 0 | incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| { | 469 | | let log_callback = log_callback.clone(); | 470 | | async move { | 471 | | loop { | 472 | | match tcp_listener.accept().await { | 473 | | Ok((socket, socket_addr)) => { | 474 | | break Some(((socket, socket_addr), tcp_listener)) | 475 | | } | 476 | | Err(error) => { | 477 | | // Errors here can happen if the accept failed, for example | 478 | | // if no file descriptor is available. | 479 | | // A wait is added in order to avoid having a busy-loop | 480 | | // failing to accept connections. | 481 | | log_callback.log( | 482 | | LogLevel::Warn, | 483 | | format!("tcp-accept-error; error={}", error), | 484 | | ); | 485 | | smol::Timer::after(Duration::from_secs(2)).await; | 486 | | } | 487 | | } | 488 | | } | 489 | | } | 490 | 0 | })) as Pin<Box<_>>); | 491 | | } | 492 | | | 493 | | // Initialize the inner network service. | 494 | 2 | run(Inner { | 495 | 2 | local_peer_id: local_peer_id.clone(), | 496 | 2 | identify_agent_version: config.identify_agent_version, | 497 | 2 | event_senders: either::Left(event_senders), | 498 | 2 | event_pending_send: None, | 499 | 2 | num_pending_out_attempts: 0, | 500 | 2 | to_background_rx: Box::pin(to_background_rx), | 501 | 2 | from_connections_rx: Box::pin(from_connections_rx), | 502 | 2 | from_connections_tx, | 503 | 2 | tasks_executor: config.tasks_executor, | 504 | 2 | log_callback: config.log_callback, | 505 | 2 | network, | 506 | 2 | noise_key: config.noise_key, | 507 | 2 | peering_strategy, | 508 | 2 | blocks_requests: hashbrown::HashMap::with_capacity_and_hasher( | 509 | 2 | 50, // TODO: ? | 510 | 2 | Default::default(), | 511 | 2 | ), | 512 | 2 | warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher( | 513 | 2 | 2, // TODO: ? | 514 | 2 | Default::default(), | 515 | 2 | ), | 516 | 2 | storage_requests: hashbrown::HashMap::with_capacity_and_hasher( | 517 | 2 | 5, // TODO: ? | 518 | 2 | Default::default(), | 519 | 2 | ), | 520 | 2 | call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher( | 521 | 2 | 5, // TODO: ? | 522 | 2 | Default::default(), | 523 | 2 | ), | 524 | 2 | jaeger_service: config.jaeger_service.clone(), | 525 | 2 | next_discovery: smol::Timer::after(Duration::from_secs(1)), | 526 | 2 | next_discovery_period: Duration::from_secs(1), | 527 | 2 | incoming_connections, | 528 | 2 | }); | 529 | 2 | | 530 | 2 | // Build the final network service. | 531 | 2 | let network_service = Arc::new(NetworkService { | 532 | 2 | local_peer_id, | 533 | 2 | chain_names, | 534 | 2 | _jaeger_service: config.jaeger_service, | 535 | 2 | to_background_tx: Mutex::new(to_background_tx), | 536 | 2 | }); | 537 | 2 | | 538 | 2 | // Adjust the receivers to keep the `network_service` alive. | 539 | 2 | // TODO: no, hacky | 540 | 2 | let receivers = event_receivers | 541 | 2 | .into_iter() | 542 | 2 | .map(|rx| { | 543 | | let mut network_service = Some(network_service.clone()); | 544 | | rx.chain(smol::stream::poll_fn(move |_| { | 545 | | drop(network_service.take()); | 546 | | Poll::Ready(None) | 547 | | })) | 548 | | .boxed() | 549 | 2 | }) | 550 | 2 | .collect(); | 551 | 2 | | 552 | 2 | let chain_ids = network_service.chain_names.keys().cloned().collect(); | 553 | 2 | Ok((network_service, chain_ids, receivers)) | 554 | 2 | } |
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CscDgN54JpMGG_6author _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 365 | 19 | > { | 366 | 19 | let (event_senders, event_receivers): (Vec<_>, Vec<_>) = (0..config.num_events_receivers) | 367 | 19 | .map(|_| channel::bounded(16)) | 368 | 19 | .unzip(); | 369 | 19 | | 370 | 19 | let mut network = service::ChainNetwork::new(service::Config { | 371 | 19 | chains_capacity: config.chains.len(), | 372 | 19 | connections_capacity: 100, // TODO: ? | 373 | 19 | handshake_timeout: Duration::from_secs(8), | 374 | 19 | randomness_seed: rand::random(), | 375 | 19 | }); | 376 | 19 | | 377 | 19 | let mut peering_strategy = | 378 | 19 | basic_peering_strategy::BasicPeeringStrategy::new(basic_peering_strategy::Config { | 379 | 19 | randomness_seed: rand::random(), | 380 | 19 | peers_capacity: 200, // TODO: ? | 381 | 19 | chains_capacity: config.chains.len(), | 382 | 19 | }); | 383 | 19 | | 384 | 19 | let mut chain_names = | 385 | 19 | hashbrown::HashMap::with_capacity_and_hasher(config.chains.len(), Default::default()); | 386 | | | 387 | 38 | for chain19 in config.chains { | 388 | 19 | let chain_id = network | 389 | 19 | .add_chain(service::ChainConfig { | 390 | 19 | fork_id: chain.fork_id.clone(), | 391 | 19 | block_number_bytes: chain.block_number_bytes, | 392 | 19 | best_hash: chain.best_block.1, | 393 | 19 | best_number: chain.best_block.0, | 394 | 19 | genesis_hash: chain.genesis_block_hash, | 395 | 19 | role: codec::Role::Full, | 396 | 19 | grandpa_protocol_config: chain.grandpa_protocol_finalized_block_height.map( | 397 | 19 | // TODO: dummy values | 398 | 19 | |commit_finalized_height| service::GrandpaState { | 399 | | commit_finalized_height, | 400 | | round_number: 1, | 401 | | set_id: 0, | 402 | 19 | }, | 403 | 19 | ), | 404 | 19 | allow_inbound_block_requests: true, | 405 | 19 | user_data: Chain { | 406 | 19 | log_name: chain.log_name.clone(), | 407 | 19 | database: chain.database, | 408 | 19 | max_in_peers: chain.max_in_peers, | 409 | 19 | max_slots: chain.max_slots, | 410 | 19 | }, | 411 | 19 | }) | 412 | 19 | .unwrap(); // TODO: don't unwrap? | 413 | | | 414 | 19 | for (peer_id, addr0 ) in chain.bootstrap_nodes { | 415 | 0 | // Note that we must call this function before `insert_address`, as documented | 416 | 0 | // in `basic_peering_strategy`. | 417 | 0 | peering_strategy.insert_chain_peer(chain_id, peer_id.clone(), usize::MAX); | 418 | 0 | peering_strategy.insert_address(&peer_id, addr.into_bytes(), usize::MAX); | 419 | 0 | } | 420 | | | 421 | 19 | chain_names.insert(chain_id, chain.log_name); | 422 | | } | 423 | | | 424 | 19 | let (to_background_tx, to_background_rx) = channel::bounded(16); | 425 | 19 | let (from_connections_tx, from_connections_rx) = channel::bounded(64); | 426 | 19 | | 427 | 19 | let local_peer_id = | 428 | 19 | peer_id::PublicKey::Ed25519(*config.noise_key.libp2p_public_ed25519_key()) | 429 | 19 | .into_peer_id(); | 430 | 19 | | 431 | 19 | // For each listening address in the configuration, create a background task dedicated to | 432 | 19 | // listening on that address. | 433 | 19 | let mut incoming_connections = SelectAll::new(); | 434 | 19 | for listen_address0 in config.listen_addresses { | 435 | 0 | // Try to parse the requested address and create the corresponding listening socket. | 436 | 0 | let tcp_listener: smol::net::TcpListener = { | 437 | 0 | let addr = { | 438 | 0 | let mut iter = listen_address.iter(); | 439 | 0 | let proto1 = iter.next(); | 440 | 0 | let proto2 = iter.next(); | 441 | 0 | let proto3 = iter.next(); | 442 | 0 | match (proto1, proto2, proto3) { | 443 | 0 | (Some(Protocol::Ip4(ip)), Some(Protocol::Tcp(port)), None) => { | 444 | 0 | Some(SocketAddr::from((ip, port))) | 445 | | } | 446 | 0 | (Some(Protocol::Ip6(ip)), Some(Protocol::Tcp(port)), None) => { | 447 | 0 | Some(SocketAddr::from((ip, port))) | 448 | | } | 449 | 0 | _ => None, | 450 | | } | 451 | | }; | 452 | | | 453 | 0 | if let Some(addr) = addr { | 454 | 0 | match smol::net::TcpListener::bind(addr).await { | 455 | 0 | Ok(l) => l, | 456 | 0 | Err(err) => { | 457 | 0 | return Err(InitError::ListenerIo(listen_address, err)); | 458 | | } | 459 | | } | 460 | | } else { | 461 | | // TODO: support WebSocket server | 462 | 0 | return Err(InitError::BadListenMultiaddr(listen_address)); | 463 | | } | 464 | | }; | 465 | | | 466 | | // Add a task dedicated to this listener. | 467 | 0 | let log_callback = config.log_callback.clone(); | 468 | 0 | incoming_connections.push(Box::pin(stream::unfold(tcp_listener, move |tcp_listener| { | 469 | | let log_callback = log_callback.clone(); | 470 | | async move { | 471 | | loop { | 472 | | match tcp_listener.accept().await { | 473 | | Ok((socket, socket_addr)) => { | 474 | | break Some(((socket, socket_addr), tcp_listener)) | 475 | | } | 476 | | Err(error) => { | 477 | | // Errors here can happen if the accept failed, for example | 478 | | // if no file descriptor is available. | 479 | | // A wait is added in order to avoid having a busy-loop | 480 | | // failing to accept connections. | 481 | | log_callback.log( | 482 | | LogLevel::Warn, | 483 | | format!("tcp-accept-error; error={}", error), | 484 | | ); | 485 | | smol::Timer::after(Duration::from_secs(2)).await; | 486 | | } | 487 | | } | 488 | | } | 489 | | } | 490 | 0 | })) as Pin<Box<_>>); | 491 | | } | 492 | | | 493 | | // Initialize the inner network service. | 494 | 19 | run(Inner { | 495 | 19 | local_peer_id: local_peer_id.clone(), | 496 | 19 | identify_agent_version: config.identify_agent_version, | 497 | 19 | event_senders: either::Left(event_senders), | 498 | 19 | event_pending_send: None, | 499 | 19 | num_pending_out_attempts: 0, | 500 | 19 | to_background_rx: Box::pin(to_background_rx), | 501 | 19 | from_connections_rx: Box::pin(from_connections_rx), | 502 | 19 | from_connections_tx, | 503 | 19 | tasks_executor: config.tasks_executor, | 504 | 19 | log_callback: config.log_callback, | 505 | 19 | network, | 506 | 19 | noise_key: config.noise_key, | 507 | 19 | peering_strategy, | 508 | 19 | blocks_requests: hashbrown::HashMap::with_capacity_and_hasher( | 509 | 19 | 50, // TODO: ? | 510 | 19 | Default::default(), | 511 | 19 | ), | 512 | 19 | warp_sync_requests: hashbrown::HashMap::with_capacity_and_hasher( | 513 | 19 | 2, // TODO: ? | 514 | 19 | Default::default(), | 515 | 19 | ), | 516 | 19 | storage_requests: hashbrown::HashMap::with_capacity_and_hasher( | 517 | 19 | 5, // TODO: ? | 518 | 19 | Default::default(), | 519 | 19 | ), | 520 | 19 | call_proof_requests: hashbrown::HashMap::with_capacity_and_hasher( | 521 | 19 | 5, // TODO: ? | 522 | 19 | Default::default(), | 523 | 19 | ), | 524 | 19 | jaeger_service: config.jaeger_service.clone(), | 525 | 19 | next_discovery: smol::Timer::after(Duration::from_secs(1)), | 526 | 19 | next_discovery_period: Duration::from_secs(1), | 527 | 19 | incoming_connections, | 528 | 19 | }); | 529 | 19 | | 530 | 19 | // Build the final network service. | 531 | 19 | let network_service = Arc::new(NetworkService { | 532 | 19 | local_peer_id, | 533 | 19 | chain_names, | 534 | 19 | _jaeger_service: config.jaeger_service, | 535 | 19 | to_background_tx: Mutex::new(to_background_tx), | 536 | 19 | }); | 537 | 19 | | 538 | 19 | // Adjust the receivers to keep the `network_service` alive. | 539 | 19 | // TODO: no, hacky | 540 | 19 | let receivers = event_receivers | 541 | 19 | .into_iter() | 542 | 19 | .map(|rx| { | 543 | | let mut network_service = Some(network_service.clone()); | 544 | | rx.chain(smol::stream::poll_fn(move |_| { | 545 | | drop(network_service.take()); | 546 | | Poll::Ready(None) | 547 | | })) | 548 | | .boxed() | 549 | 19 | }) | 550 | 19 | .collect(); | 551 | 19 | | 552 | 19 | let chain_ids = network_service.chain_names.keys().cloned().collect(); | 553 | 19 | Ok((network_service, chain_ids, receivers)) | 554 | 19 | } |
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService3new0B6_ |
555 | | |
556 | | /// Returns the peer ID of the local node. |
557 | 1 | pub fn local_peer_id(&self) -> &PeerId { |
558 | 1 | &self.local_peer_id |
559 | 1 | } _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService13local_peer_id Line | Count | Source | 557 | 1 | pub fn local_peer_id(&self) -> &PeerId { | 558 | 1 | &self.local_peer_id | 559 | 1 | } |
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService13local_peer_id |
560 | | |
561 | | /// Returns the number of connections, both handshaking or established, both incoming and |
562 | | /// outgoing. |
563 | 0 | pub async fn num_connections(&self) -> usize { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_connections Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_connections |
564 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
565 | 0 |
|
566 | 0 | let _ = self |
567 | 0 | .to_background_tx |
568 | 0 | .lock() |
569 | 0 | .await |
570 | 0 | .send(ToBackground::ForegroundGetNumConnections { result_tx }) |
571 | 0 | .await; |
572 | | |
573 | 0 | result_rx.await.unwrap() |
574 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_connections0B6_ Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_connections0B6_ |
575 | | |
576 | | /// Returns the number of peers we have a substream with,. |
577 | 1 | pub async fn num_peers(&self, chain_id: ChainId) -> usize { _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService9num_peers Line | Count | Source | 577 | 1 | pub async fn num_peers(&self, chain_id: ChainId) -> usize { |
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService9num_peers |
578 | 1 | let (result_tx, result_rx) = oneshot::channel(); |
579 | 1 | |
580 | 1 | let _ = self |
581 | 1 | .to_background_tx |
582 | 1 | .lock() |
583 | 0 | .await |
584 | 1 | .send(ToBackground::ForegroundGetNumPeers { |
585 | 1 | chain_id, |
586 | 1 | result_tx, |
587 | 1 | }) |
588 | 0 | .await; |
589 | | |
590 | 1 | result_rx.await.unwrap() |
591 | 1 | } _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService9num_peers0B6_ Line | Count | Source | 577 | 1 | pub async fn num_peers(&self, chain_id: ChainId) -> usize { | 578 | 1 | let (result_tx, result_rx) = oneshot::channel(); | 579 | 1 | | 580 | 1 | let _ = self | 581 | 1 | .to_background_tx | 582 | 1 | .lock() | 583 | 0 | .await | 584 | 1 | .send(ToBackground::ForegroundGetNumPeers { | 585 | 1 | chain_id, | 586 | 1 | result_tx, | 587 | 1 | }) | 588 | 0 | .await; | 589 | | | 590 | 1 | result_rx.await.unwrap() | 591 | 1 | } |
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService9num_peers0B6_ |
592 | | |
593 | | /// Returns the number of peers we have a substream with, all chains added together. |
594 | 0 | pub async fn num_total_peers(&self) -> usize { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_total_peers Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService15num_total_peers |
595 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
596 | 0 |
|
597 | 0 | let _ = self |
598 | 0 | .to_background_tx |
599 | 0 | .lock() |
600 | 0 | .await |
601 | 0 | .send(ToBackground::ForegroundGetNumTotalPeers { result_tx }) |
602 | 0 | .await; |
603 | | |
604 | 0 | result_rx.await.unwrap() |
605 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_total_peers0B6_ Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService15num_total_peers0B6_ |
606 | | |
607 | 21 | pub async fn set_local_best_block( |
608 | 21 | &self, |
609 | 21 | chain_id: ChainId, |
610 | 21 | best_hash: [u8; 32], |
611 | 21 | best_number: u64, |
612 | 21 | ) { _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService20set_local_best_block Line | Count | Source | 607 | 21 | pub async fn set_local_best_block( | 608 | 21 | &self, | 609 | 21 | chain_id: ChainId, | 610 | 21 | best_hash: [u8; 32], | 611 | 21 | best_number: u64, | 612 | 21 | ) { |
Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService20set_local_best_block |
613 | 21 | let _ = self |
614 | 21 | .to_background_tx |
615 | 21 | .lock() |
616 | 0 | .await |
617 | 21 | .send(ToBackground::ForegroundSetLocalBestBlock { |
618 | 21 | chain_id, |
619 | 21 | best_hash, |
620 | 21 | best_number, |
621 | 21 | }) |
622 | 0 | .await; |
623 | 21 | } _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CsiLzmwikkc22_14json_rpc_basic Line | Count | Source | 612 | 2 | ) { | 613 | 2 | let _ = self | 614 | 2 | .to_background_tx | 615 | 2 | .lock() | 616 | 0 | .await | 617 | 2 | .send(ToBackground::ForegroundSetLocalBestBlock { | 618 | 2 | chain_id, | 619 | 2 | best_hash, | 620 | 2 | best_number, | 621 | 2 | }) | 622 | 0 | .await; | 623 | 2 | } |
Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CscDgN54JpMGG_6author _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0CsibGXYHQB8Ea_25json_rpc_general_requests Line | Count | Source | 612 | 19 | ) { | 613 | 19 | let _ = self | 614 | 19 | .to_background_tx | 615 | 19 | .lock() | 616 | 0 | .await | 617 | 19 | .send(ToBackground::ForegroundSetLocalBestBlock { | 618 | 19 | chain_id, | 619 | 19 | best_hash, | 620 | 19 | best_number, | 621 | 19 | }) | 622 | 0 | .await; | 623 | 19 | } |
Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService20set_local_best_block0B6_ |
624 | | |
625 | | /// Starts asynchronously disconnecting the given peer. A [`Event::Disconnected`] will later be |
626 | | /// generated. Prevents a new gossip link with the same peer from being reopened for a |
627 | | /// little while. |
628 | | /// |
629 | | /// `reason` is a human-readable string printed in the logs. |
630 | | /// |
631 | | /// Due to race conditions, it is possible to reconnect to the peer soon after, in case the |
632 | | /// reconnection was already happening as the call to this function is still being processed. |
633 | | /// If that happens another [`Event::Disconnected`] will be delivered afterwards. In other |
634 | | /// words, this function guarantees that we will be disconnected in the future rather than |
635 | | /// guarantees that we will disconnect. |
636 | 0 | pub async fn ban_and_disconnect( |
637 | 0 | &self, |
638 | 0 | peer_id: PeerId, |
639 | 0 | chain_id: ChainId, |
640 | 0 | severity: BanSeverity, |
641 | 0 | reason: &'static str, |
642 | 0 | ) { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService18ban_and_disconnect Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService18ban_and_disconnect |
643 | 0 | let _ = self |
644 | 0 | .to_background_tx |
645 | 0 | .lock() |
646 | 0 | .await |
647 | 0 | .send(ToBackground::ForegroundDisconnectAndBan { |
648 | 0 | peer_id, |
649 | 0 | chain_id, |
650 | 0 | severity, |
651 | 0 | reason, |
652 | 0 | }) |
653 | 0 | .await; |
654 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService18ban_and_disconnect0B6_ |
655 | | |
656 | 0 | pub async fn send_block_announce( |
657 | 0 | self: Arc<Self>, |
658 | 0 | target: PeerId, |
659 | 0 | chain_id: ChainId, |
660 | 0 | scale_encoded_header: Vec<u8>, |
661 | 0 | is_best: bool, |
662 | 0 | ) -> Result<(), service::QueueNotificationError> { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService19send_block_announce Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService19send_block_announce |
663 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
664 | 0 |
|
665 | 0 | let _ = self |
666 | 0 | .to_background_tx |
667 | 0 | .lock() |
668 | 0 | .await |
669 | 0 | .send(ToBackground::ForegroundAnnounceBlock { |
670 | 0 | target, |
671 | 0 | chain_id, |
672 | 0 | scale_encoded_header, |
673 | 0 | is_best, |
674 | 0 | result_tx, |
675 | 0 | }) |
676 | 0 | .await; |
677 | | |
678 | 0 | result_rx.await.unwrap() |
679 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService19send_block_announce0B6_ |
680 | | |
681 | | /// Sends a blocks request to the given peer. |
682 | | // TODO: more docs |
683 | | // TODO: proper error type |
684 | 0 | pub async fn blocks_request( |
685 | 0 | self: Arc<Self>, |
686 | 0 | target: PeerId, // TODO: by value? |
687 | 0 | chain_id: ChainId, |
688 | 0 | config: codec::BlocksRequestConfig, |
689 | 0 | ) -> Result<Vec<codec::BlockData>, BlocksRequestError> { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService14blocks_request Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService14blocks_request |
690 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
691 | 0 |
|
692 | 0 | let _ = self |
693 | 0 | .to_background_tx |
694 | 0 | .lock() |
695 | 0 | .await |
696 | 0 | .send(ToBackground::ForegroundBlocksRequest { |
697 | 0 | target: target.clone(), |
698 | 0 | chain_id, |
699 | 0 | config, |
700 | 0 | result_tx, |
701 | 0 | }) |
702 | 0 | .await; |
703 | | |
704 | 0 | result_rx.await.unwrap() |
705 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService14blocks_request0B6_ |
706 | | |
707 | | /// Sends a warp sync request to the given peer. |
708 | | // TODO: more docs |
709 | | // TODO: proper error type |
710 | 0 | pub async fn warp_sync_request( |
711 | 0 | self: Arc<Self>, |
712 | 0 | target: PeerId, // TODO: by value? |
713 | 0 | chain_id: ChainId, |
714 | 0 | begin_hash: [u8; 32], |
715 | 0 | ) -> Result<service::EncodedGrandpaWarpSyncResponse, WarpSyncRequestError> { Unexecuted instantiation: _RNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB2_14NetworkService17warp_sync_request Unexecuted instantiation: _RNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB2_14NetworkService17warp_sync_request |
716 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
717 | 0 |
|
718 | 0 | let _ = self |
719 | 0 | .to_background_tx |
720 | 0 | .lock() |
721 | 0 | .await |
722 | 0 | .send(ToBackground::ForegroundWarpSyncRequest { |
723 | 0 | target: target.clone(), |
724 | 0 | chain_id, |
725 | 0 | begin_hash, |
726 | 0 | result_tx, |
727 | 0 | }) |
728 | 0 | .await; |
729 | | |
730 | 0 | result_rx.await.unwrap() |
731 | 0 | } Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0B6_ Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB4_14NetworkService17warp_sync_request0B6_ |
732 | | |
733 | | /// Sends a storage proof request to the given peer. |
734 | | // TODO: more docs |
735 | | // TODO: proper error type |
736 | 0 | pub async fn storage_request( |
737 | 0 | self: Arc<Self>, |
738 | 0 | target: PeerId, // TODO: by value? |
739 | 0 | chain_id: ChainId, |
740 | 0 | config: codec::StorageProofRequestConfig<impl Iterator<Item = impl AsRef<[u8]> + Clone>>, |
741 | 0 | ) -> Result<service::EncodedMerkleProof, ()> { Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestppEB5_ Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECscDgN54JpMGG_6author Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1y_9into_iter8IntoIterB1v_EECsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB3_14NetworkService15storage_requestppEB5_ |
742 | 0 | // TODO: logs and jaeger integration |
743 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
744 | 0 |
|
745 | 0 | let _ = self |
746 | 0 | .to_background_tx |
747 | 0 | .lock() |
748 | 0 | .await |
749 | 0 | .send(ToBackground::ForegroundStorageProofRequest { |
750 | 0 | target: target.clone(), |
751 | 0 | chain_id, |
752 | 0 | config: codec::StorageProofRequestConfig { |
753 | 0 | block_hash: config.block_hash, |
754 | 0 | keys: config |
755 | 0 | .keys |
756 | 0 | .map(|key| key.as_ref().to_vec()) // TODO: to_vec() overhead Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestppE00B9_ Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1C_9into_iter8IntoIterB1z_EE00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB7_14NetworkService15storage_requestppE00B9_ |
757 | 0 | .collect::<Vec<_>>() |
758 | 0 | .into_iter(), |
759 | 0 | }, |
760 | 0 | result_tx, |
761 | 0 | }) |
762 | 0 | .await; |
763 | | |
764 | 0 | result_rx.await.unwrap() |
765 | 0 | } Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestppE0B7_ Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestINtNtCsdZExvAaxgia_5alloc3vec3VechEINtNtB1A_9into_iter8IntoIterB1x_EE0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_14NetworkService15storage_requestppE0B7_ |
766 | | |
767 | | /// Sends a call proof request to the given peer. |
768 | | // TODO: more docs |
769 | | // TODO: proper error type |
770 | 0 | pub async fn call_proof_request( |
771 | 0 | self: Arc<Self>, |
772 | 0 | target: PeerId, // TODO: by value? |
773 | 0 | chain_id: ChainId, |
774 | 0 | config: codec::CallProofRequestConfig<'_, impl Iterator<Item = impl AsRef<[u8]>>>, |
775 | 0 | ) -> Result<service::EncodedMerkleProof, ()> { Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestppEB5_ Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECscDgN54JpMGG_6author Unexecuted instantiation: _RINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1y_EECsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB3_14NetworkService18call_proof_requestppEB5_ |
776 | 0 | // TODO: logs and jaeger integration |
777 | 0 | let (result_tx, result_rx) = oneshot::channel(); |
778 | 0 |
|
779 | 0 | let _ = self |
780 | 0 | .to_background_tx |
781 | 0 | .lock() |
782 | 0 | .await |
783 | 0 | .send(ToBackground::ForegroundCallProofRequest { |
784 | 0 | target: target.clone(), |
785 | 0 | chain_id, |
786 | 0 | config: codec::CallProofRequestConfig { |
787 | 0 | block_hash: config.block_hash, |
788 | 0 | method: config.method.into_owned().into(), |
789 | 0 | parameter_vectored: config |
790 | 0 | .parameter_vectored |
791 | 0 | .map(|v| v.as_ref().to_vec()) // TODO: to_vec() overhead Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestppE00B9_ Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CscDgN54JpMGG_6author Unexecuted instantiation: _RNCNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1C_EE00CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB7_14NetworkService18call_proof_requestppE00B9_ |
792 | 0 | .collect::<Vec<_>>() |
793 | 0 | .into_iter(), |
794 | 0 | }, |
795 | 0 | result_tx, |
796 | 0 | }) |
797 | 0 | .await; |
798 | | |
799 | 0 | result_rx.await.unwrap() |
800 | 0 | } Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CsiLzmwikkc22_14json_rpc_basic Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestppE0B7_ Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CscDgN54JpMGG_6author Unexecuted instantiation: _RNCINvMNtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestINtNtCsdZExvAaxgia_5alloc6borrow3CowShEINtNtNtNtCsaYZPK01V26L_4core4iter7sources4once4OnceB1A_EE0CsibGXYHQB8Ea_25json_rpc_general_requests Unexecuted instantiation: _RNCINvMNtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_14NetworkService18call_proof_requestppE0B7_ |
801 | | } |
802 | | |
803 | | /// Error when initializing the network service. |
804 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXs8_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs8_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_9InitErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
805 | | pub enum InitError { |
806 | | /// I/O error when initializing a listener. |
807 | | #[display(fmt = "I/O error when creating listener for {_0}: {_1}")] |
808 | | ListenerIo(Multiaddr, io::Error), |
809 | | /// A listening address passed through the configuration isn't valid. |
810 | | #[display(fmt = "A listening address passed through the configuration isn't valid: {_0}")] |
811 | | BadListenMultiaddr(Multiaddr), |
812 | | } |
813 | | |
814 | | /// Error returned by [`NetworkService::blocks_request`]. |
815 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsa_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_18BlocksRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsa_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_18BlocksRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
816 | | pub enum BlocksRequestError { |
817 | | /// No established connection with the target. |
818 | | NoConnection, |
819 | | /// Error during the request. |
820 | | #[display(fmt = "{_0}")] |
821 | | Request(service::BlocksRequestError), |
822 | | } |
823 | | |
824 | | /// Error returned by [`NetworkService::warp_sync_request`]. |
825 | 0 | #[derive(Debug, derive_more::Display)] Unexecuted instantiation: _RNvXsc_NtCsiUjFBJteJ7x_17smoldot_full_node15network_serviceNtB5_20WarpSyncRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXsc_NtCshBwayKnNXDT_17smoldot_full_node15network_serviceNtB5_20WarpSyncRequestErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
826 | | pub enum WarpSyncRequestError { |
827 | | /// No established connection with the target. |
828 | | NoConnection, |
829 | | /// Error during the request. |
830 | | #[display(fmt = "{_0}")] |
831 | | Request(service::GrandpaWarpSyncRequestError), |
832 | | } |
833 | | |
834 | 21 | fn run(mut inner: Inner) { |
835 | 21 | // This function is a small hack because I didn't find a better way to store the executor |
836 | 21 | // within `Inner` while at the same time spawning the `Inner` using said executor. |
837 | 21 | let mut actual_executor = mem::replace(&mut inner.tasks_executor, Box::new(|_| unreachable!()0 )); Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3run0B5_ Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3run0B5_ |
838 | 21 | let (tx, rx) = oneshot::channel(); |
839 | 21 | actual_executor(Box::pin(async move { |
840 | 21 | let actual_executor = rx.await0 .unwrap(); |
841 | 21 | inner.tasks_executor = actual_executor; |
842 | 122 | background_task(inner).await; |
843 | 21 | }0 )); _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3runs_0B5_ Line | Count | Source | 839 | 21 | actual_executor(Box::pin(async move { | 840 | 21 | let actual_executor = rx.await0 .unwrap(); | 841 | 21 | inner.tasks_executor = actual_executor; | 842 | 122 | background_task(inner).await; | 843 | 0 | })); |
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3runs_0B5_ |
844 | 21 | tx.send(actual_executor).unwrap_or_else(|_| panic!()0 ); Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3runs0_0B5_ Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3runs0_0B5_ |
845 | 21 | } _RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service3run Line | Count | Source | 834 | 21 | fn run(mut inner: Inner) { | 835 | 21 | // This function is a small hack because I didn't find a better way to store the executor | 836 | 21 | // within `Inner` while at the same time spawning the `Inner` using said executor. | 837 | 21 | let mut actual_executor = mem::replace(&mut inner.tasks_executor, Box::new(|_| unreachable!())); | 838 | 21 | let (tx, rx) = oneshot::channel(); | 839 | 21 | actual_executor(Box::pin(async move { | 840 | | let actual_executor = rx.await.unwrap(); | 841 | | inner.tasks_executor = actual_executor; | 842 | | background_task(inner).await; | 843 | 21 | })); | 844 | 21 | tx.send(actual_executor).unwrap_or_else(|_| panic!()); | 845 | 21 | } |
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service3run |
846 | | |
847 | 21 | async fn background_task(mut inner: Inner) { _RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task Line | Count | Source | 847 | 21 | async fn background_task(mut inner: Inner) { |
Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task |
848 | | loop { |
849 | | enum WakeUpReason { |
850 | | IncomingConnection { |
851 | | socket: TcpStream, |
852 | | socket_addr: SocketAddr, |
853 | | }, |
854 | | NetworkEvent(service::Event<channel::Sender<service::CoordinatorToConnection>>), |
855 | | Message(ToBackground), |
856 | | ForegroundClosed, |
857 | | FromConnectionTask { |
858 | | connection_id: service::ConnectionId, |
859 | | // TODO: this Option is weird |
860 | | message: Option<service::ConnectionToCoordinator>, |
861 | | }, |
862 | | EventSendersReady, |
863 | | CanAssignSlot(PeerId, ChainId), |
864 | | CanStartConnect(PeerId), |
865 | | CanOpenGossip(PeerId, ChainId), |
866 | | StartKademliaDiscoveries, |
867 | | MessageToConnection { |
868 | | connection_id: service::ConnectionId, |
869 | | message: service::CoordinatorToConnection, |
870 | | }, |
871 | | } |
872 | | |
873 | 147 | let wake_up_reason = 126 async { |
874 | 147 | inner |
875 | 147 | .to_background_rx |
876 | 147 | .next() |
877 | 122 | .await |
878 | 22 | .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message) |
879 | 22 | } _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task00B7_ Line | Count | Source | 873 | 147 | let wake_up_reason = async { | 874 | 147 | inner | 875 | 147 | .to_background_rx | 876 | 147 | .next() | 877 | 122 | .await | 878 | 22 | .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message) | 879 | 22 | } |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task00B7_ |
880 | 147 | .or({ |
881 | 147 | let event_senders_ready = matches!0 (inner.event_senders, either::Left(_)); |
882 | 147 | let event_pending_send = &inner.event_pending_send; |
883 | 147 | let network = &mut inner.network; |
884 | 147 | let peering_strategy = &mut inner.peering_strategy; |
885 | 147 | let num_pending_out_attempts = &inner.num_pending_out_attempts; |
886 | 147 | async move { |
887 | 147 | if let Some(event0 ) = (event_senders_ready && event_pending_send.is_none()) |
888 | 147 | .then(|| network.next_event()) _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_00B9_ Line | Count | Source | 888 | 147 | .then(|| network.next_event()) |
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_00B9_ |
889 | 147 | .flatten() |
890 | | { |
891 | 0 | WakeUpReason::NetworkEvent(event) |
892 | 147 | } else if let Some((connection_id, message0 )) = network.pull_message_to_connection() |
893 | | { |
894 | 0 | WakeUpReason::MessageToConnection { |
895 | 0 | connection_id, |
896 | 0 | message, |
897 | 0 | } |
898 | 147 | } else if let Some((peer_id, chain_id0 )) = network |
899 | 147 | .connected_unopened_gossip_desired() |
900 | 147 | .next() |
901 | 147 | .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id)0 ) Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s_0B9_ Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s_0B9_ |
902 | | { |
903 | 0 | WakeUpReason::CanOpenGossip(peer_id, chain_id) |
904 | 147 | } else if let Some(peer_id0 ) = (*num_pending_out_attempts < 16) |
905 | 147 | .then(|| network.unconnected_desired().next().cloned()) _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s0_0B9_ Line | Count | Source | 905 | 147 | .then(|| network.unconnected_desired().next().cloned()) |
Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s0_0B9_ |
906 | 147 | .flatten() |
907 | | { |
908 | 0 | WakeUpReason::CanStartConnect(peer_id) |
909 | | } else { |
910 | 147 | 'search: loop { |
911 | 147 | let mut earlier_unban = None; |
912 | | |
913 | 147 | for chain_id in network.chains().collect::<Vec<_>>() { |
914 | 147 | if network.gossip_desired_num( |
915 | 147 | chain_id, |
916 | 147 | service::GossipKind::ConsensusTransactions, |
917 | 147 | ) >= network[chain_id].max_slots |
918 | | { |
919 | 0 | continue; |
920 | 147 | } |
921 | 147 | |
922 | 147 | match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now()) |
923 | | { |
924 | 0 | basic_peering_strategy::AssignablePeer::Assignable(peer_id) => { |
925 | 0 | break 'search WakeUpReason::CanAssignSlot( |
926 | 0 | peer_id.clone(), |
927 | 0 | chain_id, |
928 | 0 | ) |
929 | | } |
930 | | basic_peering_strategy::AssignablePeer::AllPeersBanned { |
931 | 0 | next_unban, |
932 | 0 | } => { |
933 | 0 | if earlier_unban.as_ref().map_or(true, |b| b > next_unban) { Unexecuted instantiation: _RNCNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0s1_0B9_ Unexecuted instantiation: _RNCNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0s1_0B9_ |
934 | 0 | earlier_unban = Some(next_unban.clone()); |
935 | 0 | } |
936 | | } |
937 | 147 | basic_peering_strategy::AssignablePeer::NoPeer => continue, |
938 | | } |
939 | | } |
940 | | |
941 | 147 | if let Some(earlier_unban0 ) = earlier_unban { |
942 | 0 | smol::Timer::at(earlier_unban).await; |
943 | | } else { |
944 | 147 | future::pending::<()>().await100 ; |
945 | | } |
946 | | } |
947 | | } |
948 | 0 | } _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s_0B7_ Line | Count | Source | 886 | 147 | async move { | 887 | 147 | if let Some(event0 ) = (event_senders_ready && event_pending_send.is_none()) | 888 | 147 | .then(|| network.next_event()) | 889 | 147 | .flatten() | 890 | | { | 891 | 0 | WakeUpReason::NetworkEvent(event) | 892 | 147 | } else if let Some((connection_id, message0 )) = network.pull_message_to_connection() | 893 | | { | 894 | 0 | WakeUpReason::MessageToConnection { | 895 | 0 | connection_id, | 896 | 0 | message, | 897 | 0 | } | 898 | 147 | } else if let Some((peer_id, chain_id0 )) = network | 899 | 147 | .connected_unopened_gossip_desired() | 900 | 147 | .next() | 901 | 147 | .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id)) | 902 | | { | 903 | 0 | WakeUpReason::CanOpenGossip(peer_id, chain_id) | 904 | 147 | } else if let Some(peer_id0 ) = (*num_pending_out_attempts < 16) | 905 | 147 | .then(|| network.unconnected_desired().next().cloned()) | 906 | 147 | .flatten() | 907 | | { | 908 | 0 | WakeUpReason::CanStartConnect(peer_id) | 909 | | } else { | 910 | 147 | 'search: loop { | 911 | 147 | let mut earlier_unban = None; | 912 | | | 913 | 147 | for chain_id in network.chains().collect::<Vec<_>>() { | 914 | 147 | if network.gossip_desired_num( | 915 | 147 | chain_id, | 916 | 147 | service::GossipKind::ConsensusTransactions, | 917 | 147 | ) >= network[chain_id].max_slots | 918 | | { | 919 | 0 | continue; | 920 | 147 | } | 921 | 147 | | 922 | 147 | match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now()) | 923 | | { | 924 | 0 | basic_peering_strategy::AssignablePeer::Assignable(peer_id) => { | 925 | 0 | break 'search WakeUpReason::CanAssignSlot( | 926 | 0 | peer_id.clone(), | 927 | 0 | chain_id, | 928 | 0 | ) | 929 | | } | 930 | | basic_peering_strategy::AssignablePeer::AllPeersBanned { | 931 | 0 | next_unban, | 932 | 0 | } => { | 933 | 0 | if earlier_unban.as_ref().map_or(true, |b| b > next_unban) { | 934 | 0 | earlier_unban = Some(next_unban.clone()); | 935 | 0 | } | 936 | | } | 937 | 147 | basic_peering_strategy::AssignablePeer::NoPeer => continue, | 938 | | } | 939 | | } | 940 | | | 941 | 147 | if let Some(earlier_unban0 ) = earlier_unban { | 942 | 0 | smol::Timer::at(earlier_unban).await; | 943 | | } else { | 944 | 147 | future::pending::<()>().await100 ; | 945 | | } | 946 | | } | 947 | | } | 948 | 0 | } |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s_0B7_ |
949 | | }) |
950 | 147 | .or(async { |
951 | 147 | if let either::Right(sending0 ) = &mut inner.event_senders { |
952 | 0 | let event_senders = sending.await; |
953 | 0 | inner.event_senders = either::Left(event_senders); |
954 | 0 | WakeUpReason::EventSendersReady |
955 | 147 | } else if inner.event_pending_send.is_some() { |
956 | 0 | WakeUpReason::EventSendersReady |
957 | | } else { |
958 | 147 | future::pending().await100 |
959 | | } |
960 | 147 | }0 ) _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s0_0B7_ Line | Count | Source | 950 | 147 | .or(async { | 951 | 147 | if let either::Right(sending0 ) = &mut inner.event_senders { | 952 | 0 | let event_senders = sending.await; | 953 | 0 | inner.event_senders = either::Left(event_senders); | 954 | 0 | WakeUpReason::EventSendersReady | 955 | 147 | } else if inner.event_pending_send.is_some() { | 956 | 0 | WakeUpReason::EventSendersReady | 957 | | } else { | 958 | 147 | future::pending().await100 | 959 | | } | 960 | 0 | }) |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s0_0B7_ |
961 | 147 | .or(async { |
962 | 147 | (&mut inner.next_discovery).await100 ; |
963 | 104 | inner.next_discovery = smol::Timer::after(inner.next_discovery_period); |
964 | 104 | inner.next_discovery_period = |
965 | 104 | cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120)); |
966 | 104 | WakeUpReason::StartKademliaDiscoveries |
967 | 147 | }) _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s1_0B7_ Line | Count | Source | 961 | 147 | .or(async { | 962 | 147 | (&mut inner.next_discovery).await100 ; | 963 | 104 | inner.next_discovery = smol::Timer::after(inner.next_discovery_period); | 964 | 104 | inner.next_discovery_period = | 965 | 104 | cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120)); | 966 | 104 | WakeUpReason::StartKademliaDiscoveries | 967 | 104 | }) |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s1_0B7_ |
968 | 147 | .or(async {143 |
969 | 143 | let (connection_id, message0 ) = inner.from_connections_rx.next().await0 .unwrap(); |
970 | 0 | WakeUpReason::FromConnectionTask { |
971 | 0 | connection_id, |
972 | 0 | message, |
973 | 0 | } |
974 | 147 | }) _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s2_0B7_ Line | Count | Source | 968 | 143 | .or(async { | 969 | 143 | let (connection_id, message0 ) = inner.from_connections_rx.next().await0 .unwrap(); | 970 | 0 | WakeUpReason::FromConnectionTask { | 971 | 0 | connection_id, | 972 | 0 | message, | 973 | 0 | } | 974 | 0 | }) |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s2_0B7_ |
975 | 147 | .or(async {143 |
976 | 143 | let Some((socket, socket_addr0 )) = inner.incoming_connections.next().await0 else { |
977 | 143 | future::pending().await0 |
978 | | }; |
979 | 0 | WakeUpReason::IncomingConnection { |
980 | 0 | socket, |
981 | 0 | socket_addr, |
982 | 0 | } |
983 | 147 | }) _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s3_0B7_ Line | Count | Source | 975 | 143 | .or(async { | 976 | 143 | let Some((socket, socket_addr0 )) = inner.incoming_connections.next().await0 else { | 977 | 143 | future::pending().await0 | 978 | | }; | 979 | 0 | WakeUpReason::IncomingConnection { | 980 | 0 | socket, | 981 | 0 | socket_addr, | 982 | 0 | } | 983 | 0 | }) |
Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s3_0B7_ |
984 | 122 | .await; |
985 | | |
986 | 0 | match wake_up_reason { |
987 | | WakeUpReason::MessageToConnection { |
988 | 0 | connection_id, |
989 | 0 | message, |
990 | 0 | } => { |
991 | 0 | // Note that it is critical for the sending to not take too long here, in order to |
992 | 0 | // not block the process of the network service. |
993 | 0 | // In particular, if sending the message to the connection is blocked due to |
994 | 0 | // sending a message on the connection-to-coordinator channel, this will result |
995 | 0 | // in a deadlock. |
996 | 0 | // For this reason, the connection task is always ready to immediately accept a |
997 | 0 | // message on the coordinator-to-connection channel. |
998 | 0 | inner.network[connection_id].send(message).await.unwrap(); |
999 | | } |
1000 | | |
1001 | | WakeUpReason::FromConnectionTask { |
1002 | 0 | connection_id, |
1003 | 0 | message, |
1004 | | } => { |
1005 | 0 | if let Some(message) = message { |
1006 | 0 | inner |
1007 | 0 | .network |
1008 | 0 | .inject_connection_message(connection_id, message); |
1009 | 0 | } |
1010 | | } |
1011 | | |
1012 | | WakeUpReason::IncomingConnection { |
1013 | 0 | socket, |
1014 | 0 | socket_addr, |
1015 | 0 | } => { |
1016 | 0 | // The Nagle algorithm, implemented in the kernel, consists in buffering the |
1017 | 0 | // data to be sent out and waiting a bit before actually sending it out, in |
1018 | 0 | // order to potentially merge multiple writes in a row into one packet. In |
1019 | 0 | // the implementation below, it is guaranteed that the buffer in `WithBuffers` |
1020 | 0 | // is filled with as much data as possible before the operating system gets |
1021 | 0 | // involved. As such, we disable the Nagle algorithm, in order to avoid adding |
1022 | 0 | // an artificial delay to all sends. |
1023 | 0 | let _ = socket.set_nodelay(true); |
1024 | | |
1025 | 0 | let multiaddr = [ |
1026 | 0 | match socket_addr.ip() { |
1027 | 0 | IpAddr::V4(ip) => Protocol::<&[u8]>::Ip4(ip.octets()), |
1028 | 0 | IpAddr::V6(ip) => Protocol::Ip6(ip.octets()), |
1029 | | }, |
1030 | 0 | Protocol::Tcp(socket_addr.port()), |
1031 | 0 | ] |
1032 | 0 | .into_iter() |
1033 | 0 | .collect::<Multiaddr>(); |
1034 | 0 |
|
1035 | 0 | inner.log_callback.log( |
1036 | 0 | LogLevel::Debug, |
1037 | 0 | format!("incoming-connection; multiaddr={}", multiaddr), |
1038 | 0 | ); |
1039 | 0 |
|
1040 | 0 | let (tx, rx) = channel::bounded(16); // TODO: ?! |
1041 | 0 |
|
1042 | 0 | let (connection_id, connection_task) = inner.network.add_single_stream_connection( |
1043 | 0 | Instant::now(), |
1044 | 0 | service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { |
1045 | 0 | is_initiator: false, |
1046 | 0 | noise_key: &inner.noise_key, |
1047 | 0 | }, |
1048 | 0 | multiaddr.clone().into_bytes(), |
1049 | 0 | None, |
1050 | 0 | tx, |
1051 | 0 | ); |
1052 | 0 |
|
1053 | 0 | (inner.tasks_executor)(Box::pin(tasks::connection_task( |
1054 | 0 | inner.log_callback.clone(), |
1055 | 0 | multiaddr.to_string(), |
1056 | 0 | async move { Ok(socket) }, Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s4_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s4_0B7_ |
1057 | 0 | connection_id, |
1058 | 0 | connection_task, |
1059 | 0 | rx, |
1060 | 0 | inner.from_connections_tx.clone(), |
1061 | 0 | ))); |
1062 | | } |
1063 | | |
1064 | | WakeUpReason::StartKademliaDiscoveries => { |
1065 | 104 | for chain_id in inner.network.chains().collect::<Vec<_>>() { |
1066 | 104 | let random_peer_id = |
1067 | 104 | PeerId::from_public_key(&peer_id::PublicKey::Ed25519(rand::random())); |
1068 | 104 | |
1069 | 104 | // TODO: select target closest to the random peer instead |
1070 | 104 | let target = inner |
1071 | 104 | .network |
1072 | 104 | .gossip_connected_peers( |
1073 | 104 | chain_id, |
1074 | 104 | service::GossipKind::ConsensusTransactions, |
1075 | 104 | ) |
1076 | 104 | .next() |
1077 | 104 | .cloned(); |
1078 | | |
1079 | 104 | if let Some(target0 ) = target { |
1080 | 0 | match inner.network.start_kademlia_find_node_request( |
1081 | 0 | &target, |
1082 | 0 | chain_id, |
1083 | 0 | &random_peer_id, |
1084 | 0 | Duration::from_secs(20), |
1085 | 0 | ) { |
1086 | 0 | Ok(_) => {} |
1087 | 0 | Err(service::StartRequestError::NoConnection) => unreachable!(), |
1088 | | }; |
1089 | 104 | } else { |
1090 | 104 | // TODO: log message |
1091 | 104 | } |
1092 | | } |
1093 | | } |
1094 | | |
1095 | | WakeUpReason::ForegroundClosed => { |
1096 | | // TODO: do a clean shutdown of all the connections |
1097 | 0 | return; |
1098 | | } |
1099 | | |
1100 | | WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan { |
1101 | 0 | peer_id, |
1102 | 0 | chain_id, |
1103 | 0 | severity, |
1104 | 0 | reason, |
1105 | 0 | }) => { |
1106 | 0 | // Note that peer doesn't necessarily have an out slot. |
1107 | 0 | inner.peering_strategy.unassign_slot_and_ban( |
1108 | 0 | &chain_id, |
1109 | 0 | &peer_id, |
1110 | 0 | Instant::now() |
1111 | 0 | + Duration::from_secs(match severity { |
1112 | 0 | BanSeverity::Low => 10, |
1113 | 0 | BanSeverity::High => 40, |
1114 | | }), |
1115 | | ); |
1116 | 0 | if inner.network.gossip_remove_desired( |
1117 | 0 | chain_id, |
1118 | 0 | &peer_id, |
1119 | 0 | service::GossipKind::ConsensusTransactions, |
1120 | 0 | ) { |
1121 | 0 | inner.log_callback.log( |
1122 | 0 | LogLevel::Debug, |
1123 | 0 | format!( |
1124 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", |
1125 | 0 | peer_id, inner.network[chain_id].log_name, reason |
1126 | 0 | ), |
1127 | 0 | ); |
1128 | 0 | } |
1129 | | |
1130 | 0 | if inner.network.gossip_is_connected( |
1131 | 0 | chain_id, |
1132 | 0 | &peer_id, |
1133 | 0 | service::GossipKind::ConsensusTransactions, |
1134 | 0 | ) { |
1135 | 0 | let _close_result = inner.network.gossip_close( |
1136 | 0 | chain_id, |
1137 | 0 | &peer_id, |
1138 | 0 | service::GossipKind::ConsensusTransactions, |
1139 | 0 | ); |
1140 | 0 | debug_assert!(_close_result.is_ok()); |
1141 | | |
1142 | 0 | inner.log_callback.log( |
1143 | 0 | LogLevel::Debug, |
1144 | 0 | format!( |
1145 | 0 | "chain-disconnected; peer_id={}; chain={}", |
1146 | 0 | peer_id, inner.network[chain_id].log_name |
1147 | 0 | ), |
1148 | 0 | ); |
1149 | 0 |
|
1150 | 0 | debug_assert!(inner.event_pending_send.is_none()); |
1151 | 0 | inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); |
1152 | 0 | } |
1153 | | } |
1154 | | |
1155 | | WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { |
1156 | 0 | target, |
1157 | 0 | chain_id, |
1158 | 0 | scale_encoded_header, |
1159 | 0 | is_best, |
1160 | 0 | result_tx, |
1161 | 0 | }) => { |
1162 | 0 | let _ = result_tx.send(inner.network.gossip_send_block_announce( |
1163 | 0 | &target, |
1164 | 0 | chain_id, |
1165 | 0 | &scale_encoded_header, |
1166 | 0 | is_best, |
1167 | 0 | )); |
1168 | 0 | } |
1169 | | WakeUpReason::Message(ToBackground::ForegroundSetLocalBestBlock { |
1170 | 21 | chain_id, |
1171 | 21 | best_hash, |
1172 | 21 | best_number, |
1173 | 21 | }) => { |
1174 | 21 | inner |
1175 | 21 | .network |
1176 | 21 | .set_chain_local_best_block(chain_id, best_hash, best_number); |
1177 | 21 | } |
1178 | | WakeUpReason::Message(ToBackground::ForegroundBlocksRequest { |
1179 | 0 | target, |
1180 | 0 | chain_id, |
1181 | 0 | config, |
1182 | 0 | result_tx, |
1183 | 0 | }) => { |
1184 | 0 | inner.log_callback.log( |
1185 | 0 | LogLevel::Debug, |
1186 | 0 | format!( |
1187 | 0 | "blocks-request-start; peer_id={}; chain={}; start={}; desired_count={}; direction={}", |
1188 | 0 | target, |
1189 | 0 | inner.network[chain_id].log_name, |
1190 | 0 | match &config.start { |
1191 | 0 | codec::BlocksRequestConfigStart::Hash(h) => either::Left(HashDisplay(h)), |
1192 | 0 | codec::BlocksRequestConfigStart::Number(n) => either::Right(n), |
1193 | | }, |
1194 | | config.desired_count, |
1195 | 0 | match config.direction { |
1196 | 0 | codec::BlocksRequestDirection::Ascending => "ascending", |
1197 | 0 | codec::BlocksRequestDirection::Descending => "descending", |
1198 | | }, |
1199 | | ), |
1200 | | ); |
1201 | | |
1202 | 0 | match inner.network.start_blocks_request( |
1203 | 0 | &target, |
1204 | 0 | chain_id, |
1205 | 0 | config, |
1206 | 0 | Duration::from_secs(12), |
1207 | 0 | ) { |
1208 | 0 | Ok(request_id) => { |
1209 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? |
1210 | 0 | inner.blocks_requests.insert(request_id, result_tx); |
1211 | 0 | } |
1212 | 0 | Err(service::StartRequestError::NoConnection) => { |
1213 | 0 | inner.log_callback.log( |
1214 | 0 | LogLevel::Debug, |
1215 | 0 | format!( |
1216 | 0 | "blocks-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", |
1217 | 0 | target, |
1218 | 0 | inner.network[chain_id].log_name, |
1219 | 0 | ), |
1220 | 0 | ); |
1221 | 0 | let _ = result_tx.send(Err(BlocksRequestError::NoConnection)); |
1222 | 0 | } |
1223 | | } |
1224 | | } |
1225 | | WakeUpReason::Message(ToBackground::ForegroundWarpSyncRequest { |
1226 | 0 | target, |
1227 | 0 | chain_id, |
1228 | 0 | begin_hash, |
1229 | 0 | result_tx, |
1230 | 0 | }) => { |
1231 | 0 | inner.log_callback.log( |
1232 | 0 | LogLevel::Debug, |
1233 | 0 | format!( |
1234 | 0 | "warp-sync-request-start; peer_id={}; chain={}; begin-hash={}", |
1235 | 0 | target, |
1236 | 0 | inner.network[chain_id].log_name, |
1237 | 0 | HashDisplay(&begin_hash) |
1238 | 0 | ), |
1239 | 0 | ); |
1240 | 0 |
|
1241 | 0 | match inner.network.start_grandpa_warp_sync_request( |
1242 | 0 | &target, |
1243 | 0 | chain_id, |
1244 | 0 | begin_hash, |
1245 | 0 | Duration::from_secs(12), |
1246 | 0 | ) { |
1247 | 0 | Ok(request_id) => { |
1248 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? |
1249 | 0 | inner.warp_sync_requests.insert(request_id, result_tx); |
1250 | 0 | } |
1251 | 0 | Err(service::StartRequestError::NoConnection) => { |
1252 | 0 | inner.log_callback.log( |
1253 | 0 | LogLevel::Debug, |
1254 | 0 | format!( |
1255 | 0 | "warp-sync-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", |
1256 | 0 | target, |
1257 | 0 | inner.network[chain_id].log_name, |
1258 | 0 | ), |
1259 | 0 | ); |
1260 | 0 | let _ = result_tx.send(Err(WarpSyncRequestError::NoConnection)); |
1261 | 0 | } |
1262 | | } |
1263 | | } |
1264 | | WakeUpReason::Message(ToBackground::ForegroundStorageProofRequest { |
1265 | 0 | target, |
1266 | 0 | chain_id, |
1267 | 0 | config, |
1268 | 0 | result_tx, |
1269 | 0 | }) => { |
1270 | 0 | inner.log_callback.log( |
1271 | 0 | LogLevel::Debug, |
1272 | 0 | format!( |
1273 | 0 | "storage-request-start; peer_id={}; chain={}; block-hash={}; num-keys={}", |
1274 | 0 | target, |
1275 | 0 | inner.network[chain_id].log_name, |
1276 | 0 | HashDisplay(&config.block_hash), |
1277 | 0 | config.keys.len() |
1278 | 0 | ), |
1279 | 0 | ); |
1280 | 0 |
|
1281 | 0 | match inner.network.start_storage_proof_request( |
1282 | 0 | &target, |
1283 | 0 | chain_id, |
1284 | 0 | config, |
1285 | 0 | Duration::from_secs(12), |
1286 | 0 | ) { |
1287 | 0 | Ok(request_id) => { |
1288 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? |
1289 | 0 | inner.storage_requests.insert(request_id, result_tx); |
1290 | 0 | } |
1291 | 0 | Err(service::StartRequestMaybeTooLargeError::NoConnection) => { |
1292 | 0 | inner.log_callback.log( |
1293 | 0 | LogLevel::Debug, |
1294 | 0 | format!( |
1295 | 0 | "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", |
1296 | 0 | target, |
1297 | 0 | inner.network[chain_id].log_name, |
1298 | 0 | ), |
1299 | 0 | ); |
1300 | 0 | let _ = result_tx.send(Err(())); |
1301 | 0 | } |
1302 | 0 | Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => { |
1303 | 0 | inner.log_callback.log( |
1304 | 0 | LogLevel::Debug, |
1305 | 0 | format!( |
1306 | 0 | "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large", |
1307 | 0 | target, |
1308 | 0 | inner.network[chain_id].log_name, |
1309 | 0 | ), |
1310 | 0 | ); |
1311 | 0 | let _ = result_tx.send(Err(())); |
1312 | 0 | } |
1313 | | } |
1314 | | } |
1315 | | WakeUpReason::Message(ToBackground::ForegroundCallProofRequest { |
1316 | 0 | target, |
1317 | 0 | chain_id, |
1318 | 0 | config, |
1319 | 0 | result_tx, |
1320 | 0 | }) => { |
1321 | 0 | inner.log_callback.log( |
1322 | 0 | LogLevel::Debug, |
1323 | 0 | format!( |
1324 | 0 | "call-proof-request-start; peer_id={}; chain={}; block-hash={}; function={}", |
1325 | 0 | target, |
1326 | 0 | inner.network[chain_id].log_name, |
1327 | 0 | HashDisplay(&config.block_hash), |
1328 | 0 | config.method |
1329 | 0 | ), |
1330 | 0 | ); |
1331 | 0 |
|
1332 | 0 | match inner.network.start_call_proof_request( |
1333 | 0 | &target, |
1334 | 0 | chain_id, |
1335 | 0 | config, |
1336 | 0 | Duration::from_secs(12), |
1337 | 0 | ) { |
1338 | 0 | Ok(request_id) => { |
1339 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? |
1340 | 0 | inner.call_proof_requests.insert(request_id, result_tx); |
1341 | 0 | } |
1342 | 0 | Err(service::StartRequestMaybeTooLargeError::NoConnection) => { |
1343 | 0 | inner.log_callback.log( |
1344 | 0 | LogLevel::Debug, |
1345 | 0 | format!( |
1346 | 0 | "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", |
1347 | 0 | target, |
1348 | 0 | inner.network[chain_id].log_name, |
1349 | 0 | ), |
1350 | 0 | ); |
1351 | 0 | let _ = result_tx.send(Err(())); |
1352 | 0 | } |
1353 | 0 | Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => { |
1354 | 0 | inner.log_callback.log( |
1355 | 0 | LogLevel::Debug, |
1356 | 0 | format!( |
1357 | 0 | "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large", |
1358 | 0 | target, |
1359 | 0 | inner.network[chain_id].log_name, |
1360 | 0 | ), |
1361 | 0 | ); |
1362 | 0 | let _ = result_tx.send(Err(())); |
1363 | 0 | } |
1364 | | } |
1365 | | } |
1366 | 0 | WakeUpReason::Message(ToBackground::ForegroundGetNumConnections { result_tx }) => { |
1367 | 0 | let _ = result_tx.send(inner.network.num_connections()); |
1368 | 0 | } |
1369 | | WakeUpReason::Message(ToBackground::ForegroundGetNumPeers { |
1370 | 1 | chain_id, |
1371 | 1 | result_tx, |
1372 | 1 | }) => { |
1373 | 1 | // TODO: optimize? |
1374 | 1 | let _ = result_tx.send( |
1375 | 1 | inner |
1376 | 1 | .network |
1377 | 1 | .gossip_connected_peers( |
1378 | 1 | chain_id, |
1379 | 1 | service::GossipKind::ConsensusTransactions, |
1380 | 1 | ) |
1381 | 1 | .count(), |
1382 | 1 | ); |
1383 | 1 | } |
1384 | 0 | WakeUpReason::Message(ToBackground::ForegroundGetNumTotalPeers { result_tx }) => { |
1385 | 0 | // TODO: optimize? |
1386 | 0 | let total = inner |
1387 | 0 | .network |
1388 | 0 | .chains() |
1389 | 0 | .map(|chain_id| { |
1390 | 0 | inner |
1391 | 0 | .network |
1392 | 0 | .gossip_connected_peers( |
1393 | 0 | chain_id, |
1394 | 0 | service::GossipKind::ConsensusTransactions, |
1395 | 0 | ) |
1396 | 0 | .count() |
1397 | 0 | }) Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s5_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s5_0B7_ |
1398 | 0 | .sum(); |
1399 | 0 | let _ = result_tx.send(total); |
1400 | 0 | } |
1401 | | |
1402 | | WakeUpReason::EventSendersReady => { |
1403 | | // Dispatch the pending event, if any, to the various senders. |
1404 | | |
1405 | | // We made sure that the senders were ready before generating an event. |
1406 | 0 | let either::Left(event_senders) = &mut inner.event_senders else { |
1407 | 0 | unreachable!() |
1408 | | }; |
1409 | | |
1410 | 0 | if let Some(event_to_dispatch) = inner.event_pending_send.take() { |
1411 | 0 | let mut event_senders = mem::take(event_senders); |
1412 | 0 | inner.event_senders = either::Right(Box::pin(async move { |
1413 | | // Elements in `event_senders` are removed one by one and inserted |
1414 | | // back if the channel is still open. |
1415 | 0 | for index in (0..event_senders.len()).rev() { |
1416 | 0 | let event_sender = event_senders.swap_remove(index); |
1417 | 0 | if event_sender.send(event_to_dispatch.clone()).await.is_err() { |
1418 | 0 | continue; |
1419 | 0 | } |
1420 | 0 |
|
1421 | 0 | event_senders.push(event_sender); |
1422 | | } |
1423 | 0 | event_senders |
1424 | 0 | })); Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s6_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s6_0B7_ |
1425 | 0 | } |
1426 | | } |
1427 | | |
1428 | | WakeUpReason::NetworkEvent(service::Event::HandshakeFinished { |
1429 | 0 | id, |
1430 | 0 | expected_peer_id, |
1431 | 0 | peer_id, |
1432 | 0 | .. |
1433 | 0 | }) => { |
1434 | 0 | inner.num_pending_out_attempts -= 1; |
1435 | 0 |
|
1436 | 0 | let remote_addr = |
1437 | 0 | Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned()) |
1438 | 0 | .unwrap(); // TODO: review this unwrap |
1439 | 0 | if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id) Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s7_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s7_0B7_ |
1440 | | { |
1441 | 0 | inner |
1442 | 0 | .log_callback |
1443 | 0 | .log(LogLevel::Debug, format!("connected-peer-id-mismatch; expected_peer_id={}; actual_peer_id={}; address={}", expected_peer_id, peer_id, remote_addr)); |
1444 | 0 |
|
1445 | 0 | let _was_in = inner |
1446 | 0 | .peering_strategy |
1447 | 0 | .decrease_address_connections_and_remove_if_zero( |
1448 | 0 | expected_peer_id, |
1449 | 0 | remote_addr.as_ref(), |
1450 | 0 | ); |
1451 | 0 | debug_assert!(_was_in.is_ok()); |
1452 | | if let basic_peering_strategy::InsertAddressConnectionsResult::Inserted { |
1453 | 0 | address_removed: Some(addr_rm), |
1454 | 0 | } = inner.peering_strategy.increase_address_connections( |
1455 | 0 | &peer_id, |
1456 | 0 | remote_addr.into_bytes().to_owned(), |
1457 | 0 | 10, // TODO: constant |
1458 | 0 | ) { |
1459 | 0 | let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap(); |
1460 | 0 | inner.log_callback.log( |
1461 | 0 | LogLevel::Debug, |
1462 | 0 | format!("address-purged; peer_id={}; address={}", peer_id, addr_rm), |
1463 | 0 | ); |
1464 | 0 | } |
1465 | 0 | } else { |
1466 | 0 | inner |
1467 | 0 | .log_callback |
1468 | 0 | .log(LogLevel::Debug, format!("connected; peer_id={}", peer_id)); |
1469 | 0 | } |
1470 | | } |
1471 | | |
1472 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { |
1473 | | expected_peer_id: Some(_), |
1474 | | .. |
1475 | | }) |
1476 | | | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => { |
1477 | 0 | let (address, peer_id, handshake_finished) = match wake_up_reason { |
1478 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { |
1479 | 0 | address, |
1480 | 0 | expected_peer_id: Some(peer_id), |
1481 | 0 | .. |
1482 | 0 | }) => (address, peer_id, false), |
1483 | | WakeUpReason::NetworkEvent(service::Event::Disconnected { |
1484 | 0 | address, |
1485 | 0 | peer_id, |
1486 | 0 | .. |
1487 | 0 | }) => (address, peer_id, true), |
1488 | 0 | _ => unreachable!(), |
1489 | | }; |
1490 | | |
1491 | 0 | if !handshake_finished { |
1492 | 0 | inner.num_pending_out_attempts -= 1; |
1493 | 0 | } |
1494 | | |
1495 | 0 | inner |
1496 | 0 | .peering_strategy |
1497 | 0 | .decrease_address_connections(&peer_id, &address) |
1498 | 0 | .unwrap(); |
1499 | 0 | let address = Multiaddr::from_bytes(&address).unwrap(); |
1500 | 0 | inner.log_callback.log( |
1501 | 0 | LogLevel::Debug, |
1502 | 0 | format!( |
1503 | 0 | "disconnected; handshake-finished={}; peer_id={}; address={}", |
1504 | 0 | handshake_finished, peer_id, address |
1505 | 0 | ), |
1506 | 0 | ); |
1507 | 0 |
|
1508 | 0 | // Ban the peer in order to avoid trying over and over again the same address(es). |
1509 | 0 | // Even if the handshake was finished, it is possible that the peer simply shuts |
1510 | 0 | // down connections immediately after it has been opened, hence the ban. |
1511 | 0 | // Due to race conditions and peerid mismatches, it is possible that there is |
1512 | 0 | // another existing connection or connection attempt with that same peer. However, |
1513 | 0 | // it is not possible to be sure that we will reach 0 connections or connection |
1514 | 0 | // attempts, and thus we ban the peer every time. |
1515 | 0 | let ban_duration = Duration::from_secs(5); |
1516 | 0 | inner.network.gossip_remove_desired_all( |
1517 | 0 | &peer_id, |
1518 | 0 | service::GossipKind::ConsensusTransactions, |
1519 | 0 | ); |
1520 | 0 | for (&chain_id, what_happened) in inner |
1521 | 0 | .peering_strategy |
1522 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + ban_duration) |
1523 | | { |
1524 | 0 | if matches!( |
1525 | 0 | what_happened, |
1526 | | basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true } |
1527 | 0 | ) { |
1528 | 0 | inner.log_callback.log( |
1529 | 0 | LogLevel::Debug, |
1530 | 0 | format!( |
1531 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=disconnected", |
1532 | 0 | peer_id, inner.network[chain_id].log_name |
1533 | 0 | ), |
1534 | 0 | ); |
1535 | 0 | } |
1536 | | } |
1537 | | } |
1538 | | |
1539 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { |
1540 | | expected_peer_id: None, |
1541 | 0 | address, |
1542 | 0 | .. |
1543 | 0 | }) => { |
1544 | 0 | inner.log_callback.log( |
1545 | 0 | LogLevel::Debug, |
1546 | 0 | format!( |
1547 | 0 | "disconnected; handshake-finished=false; address={}", |
1548 | 0 | Multiaddr::from_bytes(&address).unwrap() |
1549 | 0 | ), |
1550 | 0 | ); |
1551 | 0 | } |
1552 | | |
1553 | | WakeUpReason::NetworkEvent(service::Event::PingOutSuccess { |
1554 | 0 | id, |
1555 | 0 | peer_id, |
1556 | 0 | ping_time, |
1557 | 0 | }) => { |
1558 | 0 | let remote_addr = |
1559 | 0 | Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned()) |
1560 | 0 | .unwrap(); // TODO: review this unwrap |
1561 | 0 | inner.log_callback.log( |
1562 | 0 | LogLevel::Debug, |
1563 | 0 | format!("ping; peer_id={peer_id}; remote_addr={remote_addr}); ping-time={ping_time:?}"), |
1564 | 0 | ); |
1565 | 0 | } |
1566 | | |
1567 | | WakeUpReason::NetworkEvent(service::Event::BlockAnnounce { |
1568 | 0 | chain_id, |
1569 | 0 | peer_id, |
1570 | 0 | announce, |
1571 | 0 | }) => { |
1572 | 0 | let decoded = announce.decode(); |
1573 | 0 | let header_hash = |
1574 | 0 | header::hash_from_scale_encoded_header(decoded.scale_encoded_header); |
1575 | 0 | match header::decode( |
1576 | 0 | decoded.scale_encoded_header, |
1577 | 0 | inner.network.block_number_bytes(chain_id), |
1578 | 0 | ) { |
1579 | 0 | Ok(decoded_header) => { |
1580 | 0 | let mut _jaeger_span = inner.jaeger_service.block_announce_receive_span( |
1581 | 0 | &inner.local_peer_id, |
1582 | 0 | &peer_id, |
1583 | 0 | decoded_header.number, |
1584 | 0 | &decoded_header.hash(inner.network.block_number_bytes(chain_id)), |
1585 | 0 | ); |
1586 | 0 |
|
1587 | 0 | inner.log_callback.log(LogLevel::Debug, format!( |
1588 | 0 | "block-announce; peer_id={}; chain={}; hash={}; number={}; is_best={:?}", |
1589 | 0 | peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded_header.number, decoded.is_best |
1590 | 0 | )); |
1591 | 0 |
|
1592 | 0 | debug_assert!(inner.event_pending_send.is_none()); |
1593 | 0 | inner.event_pending_send = Some(Event::BlockAnnounce { |
1594 | 0 | chain_id, |
1595 | 0 | peer_id, |
1596 | 0 | is_best: decoded.is_best, |
1597 | 0 | scale_encoded_header: decoded.scale_encoded_header.to_owned(), // TODO: somewhat wasteful to copy here, could pass the entire announce |
1598 | 0 | }); |
1599 | | } |
1600 | 0 | Err(error) => { |
1601 | 0 | inner.log_callback.log(LogLevel::Warn, format!( |
1602 | 0 | "block-announce-bad-header; peer_id={}; chain={}; hash={}; is_best={:?}; error={}", |
1603 | 0 | peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded.is_best, error |
1604 | 0 | )); |
1605 | 0 |
|
1606 | 0 | if inner.network.gossip_remove_desired( |
1607 | 0 | chain_id, |
1608 | 0 | &peer_id, |
1609 | 0 | service::GossipKind::ConsensusTransactions, |
1610 | 0 | ) { |
1611 | 0 | inner.peering_strategy.unassign_slot_and_ban( |
1612 | 0 | &chain_id, |
1613 | 0 | &peer_id, |
1614 | 0 | Instant::now() + Duration::from_secs(10), |
1615 | 0 | ); |
1616 | 0 | inner.log_callback.log( |
1617 | 0 | LogLevel::Debug, |
1618 | 0 | format!( |
1619 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=bad-block-announce", |
1620 | 0 | peer_id, inner.network[chain_id].log_name |
1621 | 0 | ), |
1622 | 0 | ); |
1623 | 0 | } |
1624 | 0 | let _ = inner.network.gossip_close( |
1625 | 0 | chain_id, |
1626 | 0 | &peer_id, |
1627 | 0 | service::GossipKind::ConsensusTransactions, |
1628 | 0 | ); // TODO: what is the return value? |
1629 | | } |
1630 | | } |
1631 | | } |
1632 | | WakeUpReason::NetworkEvent(service::Event::GossipConnected { |
1633 | 0 | peer_id, |
1634 | 0 | chain_id, |
1635 | 0 | best_number, |
1636 | 0 | best_hash, |
1637 | 0 | .. |
1638 | 0 | }) => { |
1639 | 0 | inner.log_callback.log( |
1640 | 0 | LogLevel::Debug, |
1641 | 0 | format!( |
1642 | 0 | "chain-connected; peer_id={}; chain={}; best_number={}; best_hash={}", |
1643 | 0 | peer_id, |
1644 | 0 | inner.network[chain_id].log_name, |
1645 | 0 | best_number, |
1646 | 0 | HashDisplay(&best_hash), |
1647 | 0 | ), |
1648 | 0 | ); |
1649 | 0 | debug_assert!(inner.event_pending_send.is_none()); |
1650 | 0 | inner.event_pending_send = Some(Event::Connected { |
1651 | 0 | peer_id, |
1652 | 0 | chain_id, |
1653 | 0 | best_block_number: best_number, |
1654 | 0 | best_block_hash: best_hash, |
1655 | 0 | }); |
1656 | | } |
1657 | | WakeUpReason::NetworkEvent(service::Event::GossipDisconnected { |
1658 | 0 | peer_id, |
1659 | 0 | chain_id, |
1660 | 0 | .. |
1661 | 0 | }) => { |
1662 | 0 | inner.log_callback.log( |
1663 | 0 | LogLevel::Debug, |
1664 | 0 | format!( |
1665 | 0 | "chain-disconnected; peer_id={}; chain={}", |
1666 | 0 | peer_id, inner.network[chain_id].log_name |
1667 | 0 | ), |
1668 | 0 | ); |
1669 | 0 |
|
1670 | 0 | // Note that peer doesn't necessarily have an out slot, as this event |
1671 | 0 | // might happen as a result of an inbound gossip connection. |
1672 | 0 | inner.peering_strategy.unassign_slot_and_ban( |
1673 | 0 | &chain_id, |
1674 | 0 | &peer_id, |
1675 | 0 | Instant::now() + Duration::from_secs(10), |
1676 | 0 | ); |
1677 | 0 | if inner.network.gossip_remove_desired( |
1678 | 0 | chain_id, |
1679 | 0 | &peer_id, |
1680 | 0 | service::GossipKind::ConsensusTransactions, |
1681 | 0 | ) { |
1682 | 0 | inner.log_callback.log( |
1683 | 0 | LogLevel::Debug, |
1684 | 0 | format!( |
1685 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=gossip-disconnected", |
1686 | 0 | peer_id, inner.network[chain_id].log_name |
1687 | 0 | ), |
1688 | 0 | ); |
1689 | 0 | } |
1690 | | |
1691 | 0 | debug_assert!(inner.event_pending_send.is_none()); |
1692 | 0 | inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); |
1693 | | } |
1694 | | WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed { |
1695 | 0 | chain_id, |
1696 | 0 | peer_id, |
1697 | 0 | error, |
1698 | 0 | .. |
1699 | 0 | }) => { |
1700 | 0 | inner.log_callback.log( |
1701 | 0 | LogLevel::Debug, |
1702 | 0 | format!( |
1703 | 0 | "chain-connect-attempt-failed; peer_id={}; chain={}; error={}", |
1704 | 0 | peer_id, inner.network[chain_id].log_name, error |
1705 | 0 | ), |
1706 | 0 | ); |
1707 | 0 |
|
1708 | 0 | // Note that peer doesn't necessarily have an out slot, as this event |
1709 | 0 | // might happen as a result of an inbound gossip connection. |
1710 | 0 | if inner.network.gossip_remove_desired( |
1711 | 0 | chain_id, |
1712 | 0 | &peer_id, |
1713 | 0 | service::GossipKind::ConsensusTransactions, |
1714 | 0 | ) { |
1715 | 0 | inner.log_callback.log( |
1716 | 0 | LogLevel::Debug, |
1717 | 0 | format!( |
1718 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=gossip-open-failed", |
1719 | 0 | peer_id, inner.network[chain_id].log_name |
1720 | 0 | ), |
1721 | 0 | ); |
1722 | 0 | } |
1723 | | |
1724 | 0 | if let service::GossipConnectError::GenesisMismatch { .. } = error { |
1725 | 0 | inner |
1726 | 0 | .peering_strategy |
1727 | 0 | .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id); |
1728 | 0 | } else { |
1729 | 0 | inner.peering_strategy.unassign_slot_and_ban( |
1730 | 0 | &chain_id, |
1731 | 0 | &peer_id, |
1732 | 0 | Instant::now() + Duration::from_secs(15), |
1733 | 0 | ); |
1734 | 0 | } |
1735 | | } |
1736 | | WakeUpReason::NetworkEvent(service::Event::GossipInDesired { |
1737 | 0 | chain_id, |
1738 | 0 | peer_id, |
1739 | 0 | kind: service::GossipKind::ConsensusTransactions, |
1740 | 0 | }) => { |
1741 | 0 | // TODO: log this |
1742 | 0 | // The networking state machine guarantees that `GossipInDesired` |
1743 | 0 | // can't happen if we are already opening an out slot, which we do |
1744 | 0 | // immediately. |
1745 | 0 | // TODO: add debug_assert! ^ |
1746 | 0 | if inner |
1747 | 0 | .network |
1748 | 0 | .opened_gossip_undesired_by_chain(chain_id) |
1749 | 0 | .count() |
1750 | 0 | < inner.network[chain_id].max_in_peers |
1751 | 0 | { |
1752 | 0 | inner |
1753 | 0 | .network |
1754 | 0 | .gossip_open( |
1755 | 0 | chain_id, |
1756 | 0 | &peer_id, |
1757 | 0 | service::GossipKind::ConsensusTransactions, |
1758 | 0 | ) |
1759 | 0 | .unwrap(); |
1760 | 0 | } else { |
1761 | 0 | inner |
1762 | 0 | .network |
1763 | 0 | .gossip_close( |
1764 | 0 | chain_id, |
1765 | 0 | &peer_id, |
1766 | 0 | service::GossipKind::ConsensusTransactions, |
1767 | 0 | ) |
1768 | 0 | .unwrap(); |
1769 | 0 | } |
1770 | | } |
1771 | | WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => { |
1772 | | // All `GossipInDesired` are immediately accepted or rejected, meaning |
1773 | | // that this event can't happen. |
1774 | 0 | unreachable!() |
1775 | | } |
1776 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1777 | 0 | substream_id, |
1778 | 0 | peer_id, |
1779 | 0 | chain_id, |
1780 | 0 | response: service::RequestResult::Blocks(response), |
1781 | 0 | }) => { |
1782 | 0 | match &response { |
1783 | 0 | Ok(success) => { |
1784 | 0 | inner.log_callback.log( |
1785 | 0 | LogLevel::Debug, |
1786 | 0 | format!( |
1787 | 0 | "blocks-request-ended; outcome=success; peer_id={peer_id}; chain={}; response-blocks={}", |
1788 | 0 | inner.network[chain_id].log_name, |
1789 | 0 | success.len() |
1790 | 0 | ), |
1791 | 0 | ); |
1792 | 0 | } |
1793 | 0 | Err(err) => { |
1794 | 0 | inner.log_callback.log( |
1795 | 0 | LogLevel::Debug, |
1796 | 0 | format!("blocks-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", |
1797 | 0 | inner.network[chain_id].log_name, err), |
1798 | 0 | ); |
1799 | 0 | } |
1800 | | } |
1801 | | |
1802 | 0 | let _ = inner |
1803 | 0 | .blocks_requests |
1804 | 0 | .remove(&substream_id) |
1805 | 0 | .unwrap() |
1806 | 0 | .send(response.map_err(BlocksRequestError::Request)); |
1807 | | } |
1808 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1809 | 0 | substream_id, |
1810 | 0 | peer_id, |
1811 | 0 | chain_id, |
1812 | 0 | response: service::RequestResult::GrandpaWarpSync(response), |
1813 | 0 | }) => { |
1814 | 0 | match &response { |
1815 | 0 | Ok(success) => { |
1816 | 0 | let decoded = success.decode(); |
1817 | 0 | inner.log_callback.log( |
1818 | 0 | LogLevel::Debug, |
1819 | 0 | format!( |
1820 | 0 | "warp-sync-request-ended; outcome=success; peer_id={peer_id}; chain={}; num-fragments={}; is-finished={:?}", |
1821 | 0 | inner.network[chain_id].log_name, |
1822 | 0 | decoded.fragments.len(), decoded.is_finished, |
1823 | 0 | ), |
1824 | 0 | ); |
1825 | 0 | } |
1826 | 0 | Err(err) => { |
1827 | 0 | inner.log_callback.log( |
1828 | 0 | LogLevel::Debug, |
1829 | 0 | format!("warp-sync-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", |
1830 | 0 | inner.network[chain_id].log_name, err), |
1831 | 0 | ); |
1832 | 0 | } |
1833 | | } |
1834 | | |
1835 | 0 | let _ = inner |
1836 | 0 | .warp_sync_requests |
1837 | 0 | .remove(&substream_id) |
1838 | 0 | .unwrap() |
1839 | 0 | .send(response.map_err(WarpSyncRequestError::Request)); |
1840 | | } |
1841 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1842 | 0 | substream_id, |
1843 | 0 | peer_id, |
1844 | 0 | chain_id, |
1845 | 0 | response: service::RequestResult::StorageProof(response), |
1846 | 0 | }) => { |
1847 | 0 | match &response { |
1848 | 0 | Ok(success) => { |
1849 | 0 | inner.log_callback.log( |
1850 | 0 | LogLevel::Debug, |
1851 | 0 | format!( |
1852 | 0 | "storage-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}", |
1853 | 0 | inner.network[chain_id].log_name, |
1854 | 0 | BytesDisplay(u64::try_from(success.decode().len()).unwrap()), |
1855 | 0 | ), |
1856 | 0 | ); |
1857 | 0 | } |
1858 | 0 | Err(err) => { |
1859 | 0 | inner.log_callback.log( |
1860 | 0 | LogLevel::Debug, |
1861 | 0 | format!( |
1862 | 0 | "storage-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", |
1863 | 0 | inner.network[chain_id].log_name, err |
1864 | 0 | ), |
1865 | 0 | ); |
1866 | 0 | } |
1867 | | } |
1868 | | |
1869 | 0 | let _ = inner |
1870 | 0 | .storage_requests |
1871 | 0 | .remove(&substream_id) |
1872 | 0 | .unwrap() |
1873 | 0 | .send(response.map_err(|_| ())); Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s8_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s8_0B7_ |
1874 | | } |
1875 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1876 | 0 | substream_id, |
1877 | 0 | peer_id, |
1878 | 0 | chain_id, |
1879 | 0 | response: service::RequestResult::CallProof(response), |
1880 | 0 | }) => { |
1881 | 0 | match &response { |
1882 | 0 | Ok(success) => { |
1883 | 0 | inner.log_callback.log( |
1884 | 0 | LogLevel::Debug, |
1885 | 0 | format!( |
1886 | 0 | "call-proof-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}", |
1887 | 0 | inner.network[chain_id].log_name, |
1888 | 0 | BytesDisplay(u64::try_from(success.decode().len()).unwrap()), |
1889 | 0 | ), |
1890 | 0 | ); |
1891 | 0 | } |
1892 | 0 | Err(err) => { |
1893 | 0 | inner.log_callback.log( |
1894 | 0 | LogLevel::Debug, |
1895 | 0 | format!( |
1896 | 0 | "call-proof-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", |
1897 | 0 | inner.network[chain_id].log_name, |
1898 | 0 | err |
1899 | 0 | ), |
1900 | 0 | ); |
1901 | 0 | } |
1902 | | } |
1903 | | |
1904 | 0 | let _ = inner |
1905 | 0 | .call_proof_requests |
1906 | 0 | .remove(&substream_id) |
1907 | 0 | .unwrap() |
1908 | 0 | .send(response.map_err(|_| ())); Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0s9_0B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0s9_0B7_ |
1909 | | } |
1910 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1911 | 0 | peer_id: kademlia_request_target, |
1912 | 0 | chain_id, |
1913 | 0 | response: service::RequestResult::KademliaFindNode(Ok(nodes)), |
1914 | | .. |
1915 | | }) => { |
1916 | 0 | for (peer_id, addrs) in nodes { |
1917 | 0 | let mut valid_addrs = Vec::with_capacity(addrs.len()); |
1918 | 0 | for addr in addrs { |
1919 | 0 | match Multiaddr::from_bytes(addr) { |
1920 | 0 | Ok(a) => valid_addrs.push(a), |
1921 | 0 | Err((error, addr)) => { |
1922 | 0 | inner.log_callback.log( |
1923 | 0 | LogLevel::Debug, |
1924 | 0 | format!( |
1925 | 0 | "discovery-invalid-address; error={error}, addr={}, discovered_from={kademlia_request_target}", |
1926 | 0 | hex::encode(&addr) |
1927 | 0 | ), |
1928 | 0 | ); |
1929 | 0 | continue; |
1930 | | } |
1931 | | } |
1932 | | } |
1933 | | |
1934 | 0 | if !valid_addrs.is_empty() { |
1935 | | // Note that we must call this function before `insert_address`, |
1936 | | // as documented in `basic_peering_strategy`. |
1937 | | if let basic_peering_strategy::InsertChainPeerResult::Inserted { |
1938 | 0 | peer_removed: Some(peer_removed), |
1939 | 0 | } = inner.peering_strategy.insert_chain_peer( |
1940 | 0 | chain_id, |
1941 | 0 | peer_id.clone(), |
1942 | 0 | 100, // TODO: constant |
1943 | 0 | ) { |
1944 | 0 | inner.log_callback.log( |
1945 | 0 | LogLevel::Debug, |
1946 | 0 | format!( |
1947 | 0 | "peer-forgotten; peer_id={}; chain={}", |
1948 | 0 | peer_removed, inner.network[chain_id].log_name |
1949 | 0 | ), |
1950 | 0 | ); |
1951 | 0 | } |
1952 | 0 | } |
1953 | | |
1954 | 0 | for addr in valid_addrs { |
1955 | 0 | inner.log_callback.log( |
1956 | 0 | LogLevel::Debug, |
1957 | 0 | format!( |
1958 | 0 | "discovered; chain={}; peer_id={peer_id}; address={addr}; discovered_from={kademlia_request_target}", |
1959 | 0 | inner.network[chain_id].log_name |
1960 | 0 | ), |
1961 | 0 | ); |
1962 | 0 |
|
1963 | 0 | match inner |
1964 | 0 | .peering_strategy |
1965 | 0 | .insert_address(&peer_id, addr.into_bytes(), 10) // TODO: constant |
1966 | | { |
1967 | 0 | basic_peering_strategy::InsertAddressResult::Inserted { address_removed: Some(addr_rm) } => { |
1968 | 0 | let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap(); |
1969 | 0 | inner |
1970 | 0 | .log_callback |
1971 | 0 | .log(LogLevel::Debug, format!("address-purged; peer_id={}; address={}", peer_id, addr_rm)); |
1972 | 0 | } |
1973 | 0 | basic_peering_strategy::InsertAddressResult::UnknownPeer => unreachable!(), |
1974 | 0 | _ => {} |
1975 | | } |
1976 | | } |
1977 | | } |
1978 | | } |
1979 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { |
1980 | 0 | peer_id, |
1981 | 0 | chain_id, |
1982 | 0 | response: service::RequestResult::KademliaFindNode(Err(error)), |
1983 | 0 | .. |
1984 | 0 | }) => { |
1985 | 0 | inner.log_callback.log( |
1986 | 0 | LogLevel::Debug, |
1987 | 0 | format!( |
1988 | 0 | "discovery-error; chain={}; peer_id={peer_id}; error={error}", |
1989 | 0 | inner.network[chain_id].log_name |
1990 | 0 | ), |
1991 | 0 | ); |
1992 | 0 | } |
1993 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => { |
1994 | | // We never start a request of any other kind. |
1995 | 0 | unreachable!() |
1996 | | } |
1997 | | WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => { |
1998 | | // Requests are answered immediately, and thus cancelling events can't happen. |
1999 | 0 | unreachable!() |
2000 | | } |
2001 | | WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn { |
2002 | 0 | peer_id, |
2003 | 0 | substream_id, |
2004 | 0 | }) => { |
2005 | 0 | inner.log_callback.log( |
2006 | 0 | LogLevel::Debug, |
2007 | 0 | format!("identify-request; peer_id={}", peer_id), |
2008 | 0 | ); |
2009 | 0 | inner |
2010 | 0 | .network |
2011 | 0 | .respond_identify(substream_id, &inner.identify_agent_version); |
2012 | 0 | } |
2013 | | WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { |
2014 | 0 | peer_id, |
2015 | 0 | chain_id, |
2016 | 0 | config, |
2017 | 0 | substream_id, |
2018 | 0 | }) => { |
2019 | 0 | inner.log_callback.log( |
2020 | 0 | LogLevel::Debug, |
2021 | 0 | format!( |
2022 | 0 | "incoming-blocks-request; peer_id={}; chain={}", |
2023 | 0 | peer_id, inner.network[chain_id].log_name |
2024 | 0 | ), |
2025 | 0 | ); |
2026 | 0 | let mut _jaeger_span = inner.jaeger_service.incoming_block_request_span( |
2027 | 0 | &inner.local_peer_id, |
2028 | 0 | &peer_id, |
2029 | 0 | config.desired_count.get(), |
2030 | 0 | if let (1, codec::BlocksRequestConfigStart::Hash(block_hash)) = |
2031 | 0 | (config.desired_count.get(), &config.start) |
2032 | | { |
2033 | 0 | Some(block_hash) |
2034 | | } else { |
2035 | 0 | None |
2036 | | }, |
2037 | | ); |
2038 | | |
2039 | | // TODO: is it a good idea to await here while the lock is held and freezing the entire networking background task? |
2040 | 0 | let response = blocks_request_response( |
2041 | 0 | &inner.network[chain_id].database, |
2042 | 0 | inner.network.block_number_bytes(chain_id), |
2043 | 0 | config, |
2044 | 0 | ) |
2045 | 0 | .await; |
2046 | 0 | inner.network.respond_blocks( |
2047 | 0 | substream_id, |
2048 | 0 | match response { |
2049 | 0 | Ok(b) => Some(b), |
2050 | 0 | Err(error) => { |
2051 | 0 | inner.log_callback.log( |
2052 | 0 | LogLevel::Warn, |
2053 | 0 | format!("incoming-blocks-request-error; error={}", error), |
2054 | 0 | ); |
2055 | 0 | None |
2056 | | } |
2057 | | }, |
2058 | | ); |
2059 | | } |
2060 | | WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket { |
2061 | 0 | chain_id, |
2062 | 0 | peer_id, |
2063 | 0 | state, |
2064 | 0 | }) => { |
2065 | 0 | inner.log_callback.log(LogLevel::Debug, format!( |
2066 | 0 | "grandpa-neighbor-packet; peer_id={}; chain={}; round_number={}; set_id={}; commit_finalized_height={}", |
2067 | 0 | peer_id, |
2068 | 0 | inner.network[chain_id].log_name, |
2069 | 0 | state.round_number, |
2070 | 0 | state.set_id, |
2071 | 0 | state.commit_finalized_height, |
2072 | 0 | )); |
2073 | 0 |
|
2074 | 0 | debug_assert!(inner.event_pending_send.is_none()); |
2075 | 0 | inner.event_pending_send = Some(Event::GrandpaNeighborPacket { |
2076 | 0 | chain_id, |
2077 | 0 | peer_id, |
2078 | 0 | finalized_block_height: state.commit_finalized_height, |
2079 | 0 | }); |
2080 | | } |
2081 | | WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage { |
2082 | 0 | chain_id, |
2083 | 0 | peer_id, |
2084 | 0 | message, |
2085 | 0 | }) => { |
2086 | 0 | inner.log_callback.log( |
2087 | 0 | LogLevel::Debug, |
2088 | 0 | format!( |
2089 | 0 | "grandpa-commit-message; peer_id={}; chain={}; target_hash={}", |
2090 | 0 | peer_id, |
2091 | 0 | inner.network[chain_id].log_name, |
2092 | 0 | HashDisplay(message.decode().target_hash), |
2093 | 0 | ), |
2094 | 0 | ); |
2095 | 0 | } |
2096 | 0 | WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => { |
2097 | 0 | inner.log_callback.log( |
2098 | 0 | LogLevel::Warn, |
2099 | 0 | format!("protocol-error; peer_id={}; error={}", peer_id, error), |
2100 | 0 | ); |
2101 | 0 | inner |
2102 | 0 | .peering_strategy |
2103 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(5)); |
2104 | 0 | // TODO: log chain names? |
2105 | 0 | inner.log_callback.log( |
2106 | 0 | LogLevel::Debug, |
2107 | 0 | format!( |
2108 | 0 | "all-slots-unassigned; reason=no-address; peer_id={}", |
2109 | 0 | peer_id |
2110 | 0 | ), |
2111 | 0 | ); |
2112 | 0 | } |
2113 | | |
2114 | 0 | WakeUpReason::CanAssignSlot(peer_id, chain_id) => { |
2115 | 0 | inner.peering_strategy.assign_slot(&chain_id, &peer_id); |
2116 | 0 |
|
2117 | 0 | inner.log_callback.log( |
2118 | 0 | LogLevel::Debug, |
2119 | 0 | format!( |
2120 | 0 | "slot-assigned; peer_id={}; chain={}", |
2121 | 0 | peer_id, inner.network[chain_id].log_name |
2122 | 0 | ), |
2123 | 0 | ); |
2124 | 0 |
|
2125 | 0 | inner.network.gossip_insert_desired( |
2126 | 0 | chain_id, |
2127 | 0 | peer_id, |
2128 | 0 | service::GossipKind::ConsensusTransactions, |
2129 | 0 | ); |
2130 | 0 | } |
2131 | | |
2132 | 0 | WakeUpReason::CanStartConnect(peer_id) => { |
2133 | 0 | inner.num_pending_out_attempts += 1; |
2134 | | |
2135 | 0 | let Some(multiaddr) = inner |
2136 | 0 | .peering_strategy |
2137 | 0 | .pick_address_and_add_connection(&peer_id) |
2138 | | else { |
2139 | | // There is no address for that peer in the address book. |
2140 | 0 | inner.network.gossip_remove_desired_all( |
2141 | 0 | &peer_id, |
2142 | 0 | service::GossipKind::ConsensusTransactions, |
2143 | 0 | ); |
2144 | 0 | for (chain_id, what_happened) in inner |
2145 | 0 | .peering_strategy |
2146 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(10)) |
2147 | | { |
2148 | 0 | if matches!( |
2149 | 0 | what_happened, |
2150 | | basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true } |
2151 | 0 | ) { |
2152 | 0 | inner.log_callback.log( |
2153 | 0 | LogLevel::Debug, |
2154 | 0 | format!( |
2155 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=no-address", |
2156 | 0 | peer_id, inner.network[*chain_id].log_name |
2157 | 0 | ), |
2158 | 0 | ); |
2159 | 0 | } |
2160 | | } |
2161 | 0 | continue; |
2162 | | }; |
2163 | | |
2164 | 0 | let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) { |
2165 | 0 | Ok(a) => a, |
2166 | 0 | Err((multiaddr::FromBytesError, multiaddr)) => { |
2167 | 0 | // Address is in an invalid format. |
2168 | 0 | inner.log_callback.log( |
2169 | 0 | LogLevel::Debug, |
2170 | 0 | format!( |
2171 | 0 | "invalid-address; peer_id={}; address={:?}", |
2172 | 0 | peer_id, multiaddr |
2173 | 0 | ), |
2174 | 0 | ); |
2175 | 0 | let _was_in = inner |
2176 | 0 | .peering_strategy |
2177 | 0 | .decrease_address_connections_and_remove_if_zero(&peer_id, &multiaddr); |
2178 | 0 | debug_assert!(_was_in.is_ok()); |
2179 | 0 | continue; |
2180 | | } |
2181 | | }; |
2182 | | |
2183 | | // Convert the `multiaddr` (typically of the form `/ip4/a.b.c.d/tcp/d`) into |
2184 | | // a `Future<dyn Output = Result<TcpStream, ...>>`. |
2185 | 0 | let socket = match tasks::multiaddr_to_socket(&multiaddr) { |
2186 | 0 | Ok(socket) => socket, |
2187 | | Err(_) => { |
2188 | | // Address is in an invalid format or isn't supported. |
2189 | 0 | inner.log_callback.log( |
2190 | 0 | LogLevel::Debug, |
2191 | 0 | format!( |
2192 | 0 | "invalid-address; peer_id={}; address={}", |
2193 | 0 | peer_id, multiaddr |
2194 | 0 | ), |
2195 | 0 | ); |
2196 | 0 | let _was_in = inner |
2197 | 0 | .peering_strategy |
2198 | 0 | .decrease_address_connections_and_remove_if_zero( |
2199 | 0 | &peer_id, |
2200 | 0 | multiaddr.as_ref(), |
2201 | 0 | ); |
2202 | 0 | debug_assert!(_was_in.is_ok()); |
2203 | 0 | continue; |
2204 | | } |
2205 | | }; |
2206 | | |
2207 | 0 | inner.log_callback.log( |
2208 | 0 | LogLevel::Debug, |
2209 | 0 | format!("start-connecting; peer_id={peer_id}; address={multiaddr}"), |
2210 | 0 | ); |
2211 | 0 |
|
2212 | 0 | let (tx, rx) = channel::bounded(16); // TODO: ?! |
2213 | 0 |
|
2214 | 0 | let (connection_id, connection_task) = inner.network.add_single_stream_connection( |
2215 | 0 | Instant::now(), |
2216 | 0 | service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { |
2217 | 0 | is_initiator: true, |
2218 | 0 | noise_key: &inner.noise_key, |
2219 | 0 | }, |
2220 | 0 | multiaddr.clone().into_bytes(), |
2221 | 0 | Some(peer_id.clone()), |
2222 | 0 | tx, |
2223 | 0 | ); |
2224 | 0 |
|
2225 | 0 | // Handle the connection in a separate task. |
2226 | 0 | (inner.tasks_executor)(Box::pin(tasks::connection_task( |
2227 | 0 | inner.log_callback.clone(), |
2228 | 0 | multiaddr.to_string(), |
2229 | 0 | socket, |
2230 | 0 | connection_id, |
2231 | 0 | connection_task, |
2232 | 0 | rx, |
2233 | 0 | inner.from_connections_tx.clone(), |
2234 | 0 | ))); |
2235 | | } |
2236 | | |
2237 | 0 | WakeUpReason::CanOpenGossip(peer_id, chain_id) => { |
2238 | 0 | inner |
2239 | 0 | .network |
2240 | 0 | .gossip_open( |
2241 | 0 | chain_id, |
2242 | 0 | &peer_id, |
2243 | 0 | service::GossipKind::ConsensusTransactions, |
2244 | 0 | ) |
2245 | 0 | .unwrap(); |
2246 | 0 |
|
2247 | 0 | inner.log_callback.log( |
2248 | 0 | LogLevel::Debug, |
2249 | 0 | format!( |
2250 | 0 | "gossip-open; peer_id={}; chain={}", |
2251 | 0 | peer_id, &inner.network[chain_id].log_name |
2252 | 0 | ), |
2253 | 0 | ); |
2254 | 0 | } |
2255 | | } |
2256 | | } |
2257 | 0 | } _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service15background_task0B5_ Line | Count | Source | 847 | 21 | async fn background_task(mut inner: Inner) { | 848 | | loop { | 849 | | enum WakeUpReason { | 850 | | IncomingConnection { | 851 | | socket: TcpStream, | 852 | | socket_addr: SocketAddr, | 853 | | }, | 854 | | NetworkEvent(service::Event<channel::Sender<service::CoordinatorToConnection>>), | 855 | | Message(ToBackground), | 856 | | ForegroundClosed, | 857 | | FromConnectionTask { | 858 | | connection_id: service::ConnectionId, | 859 | | // TODO: this Option is weird | 860 | | message: Option<service::ConnectionToCoordinator>, | 861 | | }, | 862 | | EventSendersReady, | 863 | | CanAssignSlot(PeerId, ChainId), | 864 | | CanStartConnect(PeerId), | 865 | | CanOpenGossip(PeerId, ChainId), | 866 | | StartKademliaDiscoveries, | 867 | | MessageToConnection { | 868 | | connection_id: service::ConnectionId, | 869 | | message: service::CoordinatorToConnection, | 870 | | }, | 871 | | } | 872 | | | 873 | 126 | let wake_up_reason = async { | 874 | | inner | 875 | | .to_background_rx | 876 | | .next() | 877 | | .await | 878 | | .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Message) | 879 | | } | 880 | 147 | .or({ | 881 | 147 | let event_senders_ready = matches!0 (inner.event_senders, either::Left(_)); | 882 | 147 | let event_pending_send = &inner.event_pending_send; | 883 | 147 | let network = &mut inner.network; | 884 | 147 | let peering_strategy = &mut inner.peering_strategy; | 885 | 147 | let num_pending_out_attempts = &inner.num_pending_out_attempts; | 886 | | async move { | 887 | | if let Some(event) = (event_senders_ready && event_pending_send.is_none()) | 888 | | .then(|| network.next_event()) | 889 | | .flatten() | 890 | | { | 891 | | WakeUpReason::NetworkEvent(event) | 892 | | } else if let Some((connection_id, message)) = network.pull_message_to_connection() | 893 | | { | 894 | | WakeUpReason::MessageToConnection { | 895 | | connection_id, | 896 | | message, | 897 | | } | 898 | | } else if let Some((peer_id, chain_id)) = network | 899 | | .connected_unopened_gossip_desired() | 900 | | .next() | 901 | | .map(|(peer_id, chain_id, _)| (peer_id.clone(), chain_id)) | 902 | | { | 903 | | WakeUpReason::CanOpenGossip(peer_id, chain_id) | 904 | | } else if let Some(peer_id) = (*num_pending_out_attempts < 16) | 905 | | .then(|| network.unconnected_desired().next().cloned()) | 906 | | .flatten() | 907 | | { | 908 | | WakeUpReason::CanStartConnect(peer_id) | 909 | | } else { | 910 | | 'search: loop { | 911 | | let mut earlier_unban = None; | 912 | | | 913 | | for chain_id in network.chains().collect::<Vec<_>>() { | 914 | | if network.gossip_desired_num( | 915 | | chain_id, | 916 | | service::GossipKind::ConsensusTransactions, | 917 | | ) >= network[chain_id].max_slots | 918 | | { | 919 | | continue; | 920 | | } | 921 | | | 922 | | match peering_strategy.pick_assignable_peer(&chain_id, &Instant::now()) | 923 | | { | 924 | | basic_peering_strategy::AssignablePeer::Assignable(peer_id) => { | 925 | | break 'search WakeUpReason::CanAssignSlot( | 926 | | peer_id.clone(), | 927 | | chain_id, | 928 | | ) | 929 | | } | 930 | | basic_peering_strategy::AssignablePeer::AllPeersBanned { | 931 | | next_unban, | 932 | | } => { | 933 | | if earlier_unban.as_ref().map_or(true, |b| b > next_unban) { | 934 | | earlier_unban = Some(next_unban.clone()); | 935 | | } | 936 | | } | 937 | | basic_peering_strategy::AssignablePeer::NoPeer => continue, | 938 | | } | 939 | | } | 940 | | | 941 | | if let Some(earlier_unban) = earlier_unban { | 942 | | smol::Timer::at(earlier_unban).await; | 943 | | } else { | 944 | | future::pending::<()>().await; | 945 | | } | 946 | | } | 947 | | } | 948 | | } | 949 | | }) | 950 | 147 | .or(async { | 951 | | if let either::Right(sending) = &mut inner.event_senders { | 952 | | let event_senders = sending.await; | 953 | | inner.event_senders = either::Left(event_senders); | 954 | | WakeUpReason::EventSendersReady | 955 | | } else if inner.event_pending_send.is_some() { | 956 | | WakeUpReason::EventSendersReady | 957 | | } else { | 958 | | future::pending().await | 959 | | } | 960 | 147 | }) | 961 | 147 | .or(async { | 962 | | (&mut inner.next_discovery).await; | 963 | | inner.next_discovery = smol::Timer::after(inner.next_discovery_period); | 964 | | inner.next_discovery_period = | 965 | | cmp::min(inner.next_discovery_period * 2, Duration::from_secs(120)); | 966 | | WakeUpReason::StartKademliaDiscoveries | 967 | 147 | }) | 968 | 147 | .or(async { | 969 | | let (connection_id, message) = inner.from_connections_rx.next().await.unwrap(); | 970 | | WakeUpReason::FromConnectionTask { | 971 | | connection_id, | 972 | | message, | 973 | | } | 974 | 147 | }) | 975 | 147 | .or(async { | 976 | | let Some((socket, socket_addr)) = inner.incoming_connections.next().await else { | 977 | | future::pending().await | 978 | | }; | 979 | | WakeUpReason::IncomingConnection { | 980 | | socket, | 981 | | socket_addr, | 982 | | } | 983 | 147 | }) | 984 | 122 | .await; | 985 | | | 986 | 0 | match wake_up_reason { | 987 | | WakeUpReason::MessageToConnection { | 988 | 0 | connection_id, | 989 | 0 | message, | 990 | 0 | } => { | 991 | 0 | // Note that it is critical for the sending to not take too long here, in order to | 992 | 0 | // not block the process of the network service. | 993 | 0 | // In particular, if sending the message to the connection is blocked due to | 994 | 0 | // sending a message on the connection-to-coordinator channel, this will result | 995 | 0 | // in a deadlock. | 996 | 0 | // For this reason, the connection task is always ready to immediately accept a | 997 | 0 | // message on the coordinator-to-connection channel. | 998 | 0 | inner.network[connection_id].send(message).await.unwrap(); | 999 | | } | 1000 | | | 1001 | | WakeUpReason::FromConnectionTask { | 1002 | 0 | connection_id, | 1003 | 0 | message, | 1004 | | } => { | 1005 | 0 | if let Some(message) = message { | 1006 | 0 | inner | 1007 | 0 | .network | 1008 | 0 | .inject_connection_message(connection_id, message); | 1009 | 0 | } | 1010 | | } | 1011 | | | 1012 | | WakeUpReason::IncomingConnection { | 1013 | 0 | socket, | 1014 | 0 | socket_addr, | 1015 | 0 | } => { | 1016 | 0 | // The Nagle algorithm, implemented in the kernel, consists in buffering the | 1017 | 0 | // data to be sent out and waiting a bit before actually sending it out, in | 1018 | 0 | // order to potentially merge multiple writes in a row into one packet. In | 1019 | 0 | // the implementation below, it is guaranteed that the buffer in `WithBuffers` | 1020 | 0 | // is filled with as much data as possible before the operating system gets | 1021 | 0 | // involved. As such, we disable the Nagle algorithm, in order to avoid adding | 1022 | 0 | // an artificial delay to all sends. | 1023 | 0 | let _ = socket.set_nodelay(true); | 1024 | | | 1025 | 0 | let multiaddr = [ | 1026 | 0 | match socket_addr.ip() { | 1027 | 0 | IpAddr::V4(ip) => Protocol::<&[u8]>::Ip4(ip.octets()), | 1028 | 0 | IpAddr::V6(ip) => Protocol::Ip6(ip.octets()), | 1029 | | }, | 1030 | 0 | Protocol::Tcp(socket_addr.port()), | 1031 | 0 | ] | 1032 | 0 | .into_iter() | 1033 | 0 | .collect::<Multiaddr>(); | 1034 | 0 |
| 1035 | 0 | inner.log_callback.log( | 1036 | 0 | LogLevel::Debug, | 1037 | 0 | format!("incoming-connection; multiaddr={}", multiaddr), | 1038 | 0 | ); | 1039 | 0 |
| 1040 | 0 | let (tx, rx) = channel::bounded(16); // TODO: ?! | 1041 | 0 |
| 1042 | 0 | let (connection_id, connection_task) = inner.network.add_single_stream_connection( | 1043 | 0 | Instant::now(), | 1044 | 0 | service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { | 1045 | 0 | is_initiator: false, | 1046 | 0 | noise_key: &inner.noise_key, | 1047 | 0 | }, | 1048 | 0 | multiaddr.clone().into_bytes(), | 1049 | 0 | None, | 1050 | 0 | tx, | 1051 | 0 | ); | 1052 | 0 |
| 1053 | 0 | (inner.tasks_executor)(Box::pin(tasks::connection_task( | 1054 | 0 | inner.log_callback.clone(), | 1055 | 0 | multiaddr.to_string(), | 1056 | 0 | async move { Ok(socket) }, | 1057 | 0 | connection_id, | 1058 | 0 | connection_task, | 1059 | 0 | rx, | 1060 | 0 | inner.from_connections_tx.clone(), | 1061 | 0 | ))); | 1062 | | } | 1063 | | | 1064 | | WakeUpReason::StartKademliaDiscoveries => { | 1065 | 104 | for chain_id in inner.network.chains().collect::<Vec<_>>() { | 1066 | 104 | let random_peer_id = | 1067 | 104 | PeerId::from_public_key(&peer_id::PublicKey::Ed25519(rand::random())); | 1068 | 104 | | 1069 | 104 | // TODO: select target closest to the random peer instead | 1070 | 104 | let target = inner | 1071 | 104 | .network | 1072 | 104 | .gossip_connected_peers( | 1073 | 104 | chain_id, | 1074 | 104 | service::GossipKind::ConsensusTransactions, | 1075 | 104 | ) | 1076 | 104 | .next() | 1077 | 104 | .cloned(); | 1078 | | | 1079 | 104 | if let Some(target0 ) = target { | 1080 | 0 | match inner.network.start_kademlia_find_node_request( | 1081 | 0 | &target, | 1082 | 0 | chain_id, | 1083 | 0 | &random_peer_id, | 1084 | 0 | Duration::from_secs(20), | 1085 | 0 | ) { | 1086 | 0 | Ok(_) => {} | 1087 | 0 | Err(service::StartRequestError::NoConnection) => unreachable!(), | 1088 | | }; | 1089 | 104 | } else { | 1090 | 104 | // TODO: log message | 1091 | 104 | } | 1092 | | } | 1093 | | } | 1094 | | | 1095 | | WakeUpReason::ForegroundClosed => { | 1096 | | // TODO: do a clean shutdown of all the connections | 1097 | 0 | return; | 1098 | | } | 1099 | | | 1100 | | WakeUpReason::Message(ToBackground::ForegroundDisconnectAndBan { | 1101 | 0 | peer_id, | 1102 | 0 | chain_id, | 1103 | 0 | severity, | 1104 | 0 | reason, | 1105 | 0 | }) => { | 1106 | 0 | // Note that peer doesn't necessarily have an out slot. | 1107 | 0 | inner.peering_strategy.unassign_slot_and_ban( | 1108 | 0 | &chain_id, | 1109 | 0 | &peer_id, | 1110 | 0 | Instant::now() | 1111 | 0 | + Duration::from_secs(match severity { | 1112 | 0 | BanSeverity::Low => 10, | 1113 | 0 | BanSeverity::High => 40, | 1114 | | }), | 1115 | | ); | 1116 | 0 | if inner.network.gossip_remove_desired( | 1117 | 0 | chain_id, | 1118 | 0 | &peer_id, | 1119 | 0 | service::GossipKind::ConsensusTransactions, | 1120 | 0 | ) { | 1121 | 0 | inner.log_callback.log( | 1122 | 0 | LogLevel::Debug, | 1123 | 0 | format!( | 1124 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=user-ban; user-reason={}", | 1125 | 0 | peer_id, inner.network[chain_id].log_name, reason | 1126 | 0 | ), | 1127 | 0 | ); | 1128 | 0 | } | 1129 | | | 1130 | 0 | if inner.network.gossip_is_connected( | 1131 | 0 | chain_id, | 1132 | 0 | &peer_id, | 1133 | 0 | service::GossipKind::ConsensusTransactions, | 1134 | 0 | ) { | 1135 | 0 | let _close_result = inner.network.gossip_close( | 1136 | 0 | chain_id, | 1137 | 0 | &peer_id, | 1138 | 0 | service::GossipKind::ConsensusTransactions, | 1139 | 0 | ); | 1140 | 0 | debug_assert!(_close_result.is_ok()); | 1141 | | | 1142 | 0 | inner.log_callback.log( | 1143 | 0 | LogLevel::Debug, | 1144 | 0 | format!( | 1145 | 0 | "chain-disconnected; peer_id={}; chain={}", | 1146 | 0 | peer_id, inner.network[chain_id].log_name | 1147 | 0 | ), | 1148 | 0 | ); | 1149 | 0 |
| 1150 | 0 | debug_assert!(inner.event_pending_send.is_none()); | 1151 | 0 | inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); | 1152 | 0 | } | 1153 | | } | 1154 | | | 1155 | | WakeUpReason::Message(ToBackground::ForegroundAnnounceBlock { | 1156 | 0 | target, | 1157 | 0 | chain_id, | 1158 | 0 | scale_encoded_header, | 1159 | 0 | is_best, | 1160 | 0 | result_tx, | 1161 | 0 | }) => { | 1162 | 0 | let _ = result_tx.send(inner.network.gossip_send_block_announce( | 1163 | 0 | &target, | 1164 | 0 | chain_id, | 1165 | 0 | &scale_encoded_header, | 1166 | 0 | is_best, | 1167 | 0 | )); | 1168 | 0 | } | 1169 | | WakeUpReason::Message(ToBackground::ForegroundSetLocalBestBlock { | 1170 | 21 | chain_id, | 1171 | 21 | best_hash, | 1172 | 21 | best_number, | 1173 | 21 | }) => { | 1174 | 21 | inner | 1175 | 21 | .network | 1176 | 21 | .set_chain_local_best_block(chain_id, best_hash, best_number); | 1177 | 21 | } | 1178 | | WakeUpReason::Message(ToBackground::ForegroundBlocksRequest { | 1179 | 0 | target, | 1180 | 0 | chain_id, | 1181 | 0 | config, | 1182 | 0 | result_tx, | 1183 | 0 | }) => { | 1184 | 0 | inner.log_callback.log( | 1185 | 0 | LogLevel::Debug, | 1186 | 0 | format!( | 1187 | 0 | "blocks-request-start; peer_id={}; chain={}; start={}; desired_count={}; direction={}", | 1188 | 0 | target, | 1189 | 0 | inner.network[chain_id].log_name, | 1190 | 0 | match &config.start { | 1191 | 0 | codec::BlocksRequestConfigStart::Hash(h) => either::Left(HashDisplay(h)), | 1192 | 0 | codec::BlocksRequestConfigStart::Number(n) => either::Right(n), | 1193 | | }, | 1194 | | config.desired_count, | 1195 | 0 | match config.direction { | 1196 | 0 | codec::BlocksRequestDirection::Ascending => "ascending", | 1197 | 0 | codec::BlocksRequestDirection::Descending => "descending", | 1198 | | }, | 1199 | | ), | 1200 | | ); | 1201 | | | 1202 | 0 | match inner.network.start_blocks_request( | 1203 | 0 | &target, | 1204 | 0 | chain_id, | 1205 | 0 | config, | 1206 | 0 | Duration::from_secs(12), | 1207 | 0 | ) { | 1208 | 0 | Ok(request_id) => { | 1209 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? | 1210 | 0 | inner.blocks_requests.insert(request_id, result_tx); | 1211 | 0 | } | 1212 | 0 | Err(service::StartRequestError::NoConnection) => { | 1213 | 0 | inner.log_callback.log( | 1214 | 0 | LogLevel::Debug, | 1215 | 0 | format!( | 1216 | 0 | "blocks-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", | 1217 | 0 | target, | 1218 | 0 | inner.network[chain_id].log_name, | 1219 | 0 | ), | 1220 | 0 | ); | 1221 | 0 | let _ = result_tx.send(Err(BlocksRequestError::NoConnection)); | 1222 | 0 | } | 1223 | | } | 1224 | | } | 1225 | | WakeUpReason::Message(ToBackground::ForegroundWarpSyncRequest { | 1226 | 0 | target, | 1227 | 0 | chain_id, | 1228 | 0 | begin_hash, | 1229 | 0 | result_tx, | 1230 | 0 | }) => { | 1231 | 0 | inner.log_callback.log( | 1232 | 0 | LogLevel::Debug, | 1233 | 0 | format!( | 1234 | 0 | "warp-sync-request-start; peer_id={}; chain={}; begin-hash={}", | 1235 | 0 | target, | 1236 | 0 | inner.network[chain_id].log_name, | 1237 | 0 | HashDisplay(&begin_hash) | 1238 | 0 | ), | 1239 | 0 | ); | 1240 | 0 |
| 1241 | 0 | match inner.network.start_grandpa_warp_sync_request( | 1242 | 0 | &target, | 1243 | 0 | chain_id, | 1244 | 0 | begin_hash, | 1245 | 0 | Duration::from_secs(12), | 1246 | 0 | ) { | 1247 | 0 | Ok(request_id) => { | 1248 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? | 1249 | 0 | inner.warp_sync_requests.insert(request_id, result_tx); | 1250 | 0 | } | 1251 | 0 | Err(service::StartRequestError::NoConnection) => { | 1252 | 0 | inner.log_callback.log( | 1253 | 0 | LogLevel::Debug, | 1254 | 0 | format!( | 1255 | 0 | "warp-sync-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", | 1256 | 0 | target, | 1257 | 0 | inner.network[chain_id].log_name, | 1258 | 0 | ), | 1259 | 0 | ); | 1260 | 0 | let _ = result_tx.send(Err(WarpSyncRequestError::NoConnection)); | 1261 | 0 | } | 1262 | | } | 1263 | | } | 1264 | | WakeUpReason::Message(ToBackground::ForegroundStorageProofRequest { | 1265 | 0 | target, | 1266 | 0 | chain_id, | 1267 | 0 | config, | 1268 | 0 | result_tx, | 1269 | 0 | }) => { | 1270 | 0 | inner.log_callback.log( | 1271 | 0 | LogLevel::Debug, | 1272 | 0 | format!( | 1273 | 0 | "storage-request-start; peer_id={}; chain={}; block-hash={}; num-keys={}", | 1274 | 0 | target, | 1275 | 0 | inner.network[chain_id].log_name, | 1276 | 0 | HashDisplay(&config.block_hash), | 1277 | 0 | config.keys.len() | 1278 | 0 | ), | 1279 | 0 | ); | 1280 | 0 |
| 1281 | 0 | match inner.network.start_storage_proof_request( | 1282 | 0 | &target, | 1283 | 0 | chain_id, | 1284 | 0 | config, | 1285 | 0 | Duration::from_secs(12), | 1286 | 0 | ) { | 1287 | 0 | Ok(request_id) => { | 1288 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? | 1289 | 0 | inner.storage_requests.insert(request_id, result_tx); | 1290 | 0 | } | 1291 | 0 | Err(service::StartRequestMaybeTooLargeError::NoConnection) => { | 1292 | 0 | inner.log_callback.log( | 1293 | 0 | LogLevel::Debug, | 1294 | 0 | format!( | 1295 | 0 | "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", | 1296 | 0 | target, | 1297 | 0 | inner.network[chain_id].log_name, | 1298 | 0 | ), | 1299 | 0 | ); | 1300 | 0 | let _ = result_tx.send(Err(())); | 1301 | 0 | } | 1302 | 0 | Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => { | 1303 | 0 | inner.log_callback.log( | 1304 | 0 | LogLevel::Debug, | 1305 | 0 | format!( | 1306 | 0 | "storage-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large", | 1307 | 0 | target, | 1308 | 0 | inner.network[chain_id].log_name, | 1309 | 0 | ), | 1310 | 0 | ); | 1311 | 0 | let _ = result_tx.send(Err(())); | 1312 | 0 | } | 1313 | | } | 1314 | | } | 1315 | | WakeUpReason::Message(ToBackground::ForegroundCallProofRequest { | 1316 | 0 | target, | 1317 | 0 | chain_id, | 1318 | 0 | config, | 1319 | 0 | result_tx, | 1320 | 0 | }) => { | 1321 | 0 | inner.log_callback.log( | 1322 | 0 | LogLevel::Debug, | 1323 | 0 | format!( | 1324 | 0 | "call-proof-request-start; peer_id={}; chain={}; block-hash={}; function={}", | 1325 | 0 | target, | 1326 | 0 | inner.network[chain_id].log_name, | 1327 | 0 | HashDisplay(&config.block_hash), | 1328 | 0 | config.method | 1329 | 0 | ), | 1330 | 0 | ); | 1331 | 0 |
| 1332 | 0 | match inner.network.start_call_proof_request( | 1333 | 0 | &target, | 1334 | 0 | chain_id, | 1335 | 0 | config, | 1336 | 0 | Duration::from_secs(12), | 1337 | 0 | ) { | 1338 | 0 | Ok(request_id) => { | 1339 | 0 | // TODO: somehow cancel the request if the `rx` is dropped? | 1340 | 0 | inner.call_proof_requests.insert(request_id, result_tx); | 1341 | 0 | } | 1342 | 0 | Err(service::StartRequestMaybeTooLargeError::NoConnection) => { | 1343 | 0 | inner.log_callback.log( | 1344 | 0 | LogLevel::Debug, | 1345 | 0 | format!( | 1346 | 0 | "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=no-connection", | 1347 | 0 | target, | 1348 | 0 | inner.network[chain_id].log_name, | 1349 | 0 | ), | 1350 | 0 | ); | 1351 | 0 | let _ = result_tx.send(Err(())); | 1352 | 0 | } | 1353 | 0 | Err(service::StartRequestMaybeTooLargeError::RequestTooLarge) => { | 1354 | 0 | inner.log_callback.log( | 1355 | 0 | LogLevel::Debug, | 1356 | 0 | format!( | 1357 | 0 | "call-proof-request-ended; peer_id={}; chain={}; outcome=failure; error=request-too-large", | 1358 | 0 | target, | 1359 | 0 | inner.network[chain_id].log_name, | 1360 | 0 | ), | 1361 | 0 | ); | 1362 | 0 | let _ = result_tx.send(Err(())); | 1363 | 0 | } | 1364 | | } | 1365 | | } | 1366 | 0 | WakeUpReason::Message(ToBackground::ForegroundGetNumConnections { result_tx }) => { | 1367 | 0 | let _ = result_tx.send(inner.network.num_connections()); | 1368 | 0 | } | 1369 | | WakeUpReason::Message(ToBackground::ForegroundGetNumPeers { | 1370 | 1 | chain_id, | 1371 | 1 | result_tx, | 1372 | 1 | }) => { | 1373 | 1 | // TODO: optimize? | 1374 | 1 | let _ = result_tx.send( | 1375 | 1 | inner | 1376 | 1 | .network | 1377 | 1 | .gossip_connected_peers( | 1378 | 1 | chain_id, | 1379 | 1 | service::GossipKind::ConsensusTransactions, | 1380 | 1 | ) | 1381 | 1 | .count(), | 1382 | 1 | ); | 1383 | 1 | } | 1384 | 0 | WakeUpReason::Message(ToBackground::ForegroundGetNumTotalPeers { result_tx }) => { | 1385 | 0 | // TODO: optimize? | 1386 | 0 | let total = inner | 1387 | 0 | .network | 1388 | 0 | .chains() | 1389 | 0 | .map(|chain_id| { | 1390 | | inner | 1391 | | .network | 1392 | | .gossip_connected_peers( | 1393 | | chain_id, | 1394 | | service::GossipKind::ConsensusTransactions, | 1395 | | ) | 1396 | | .count() | 1397 | 0 | }) | 1398 | 0 | .sum(); | 1399 | 0 | let _ = result_tx.send(total); | 1400 | 0 | } | 1401 | | | 1402 | | WakeUpReason::EventSendersReady => { | 1403 | | // Dispatch the pending event, if any, to the various senders. | 1404 | | | 1405 | | // We made sure that the senders were ready before generating an event. | 1406 | 0 | let either::Left(event_senders) = &mut inner.event_senders else { | 1407 | 0 | unreachable!() | 1408 | | }; | 1409 | | | 1410 | 0 | if let Some(event_to_dispatch) = inner.event_pending_send.take() { | 1411 | 0 | let mut event_senders = mem::take(event_senders); | 1412 | 0 | inner.event_senders = either::Right(Box::pin(async move { | 1413 | | // Elements in `event_senders` are removed one by one and inserted | 1414 | | // back if the channel is still open. | 1415 | | for index in (0..event_senders.len()).rev() { | 1416 | | let event_sender = event_senders.swap_remove(index); | 1417 | | if event_sender.send(event_to_dispatch.clone()).await.is_err() { | 1418 | | continue; | 1419 | | } | 1420 | | | 1421 | | event_senders.push(event_sender); | 1422 | | } | 1423 | | event_senders | 1424 | 0 | })); | 1425 | 0 | } | 1426 | | } | 1427 | | | 1428 | | WakeUpReason::NetworkEvent(service::Event::HandshakeFinished { | 1429 | 0 | id, | 1430 | 0 | expected_peer_id, | 1431 | 0 | peer_id, | 1432 | 0 | .. | 1433 | 0 | }) => { | 1434 | 0 | inner.num_pending_out_attempts -= 1; | 1435 | 0 |
| 1436 | 0 | let remote_addr = | 1437 | 0 | Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned()) | 1438 | 0 | .unwrap(); // TODO: review this unwrap | 1439 | 0 | if let Some(expected_peer_id) = expected_peer_id.as_ref().filter(|p| **p != peer_id) | 1440 | | { | 1441 | 0 | inner | 1442 | 0 | .log_callback | 1443 | 0 | .log(LogLevel::Debug, format!("connected-peer-id-mismatch; expected_peer_id={}; actual_peer_id={}; address={}", expected_peer_id, peer_id, remote_addr)); | 1444 | 0 |
| 1445 | 0 | let _was_in = inner | 1446 | 0 | .peering_strategy | 1447 | 0 | .decrease_address_connections_and_remove_if_zero( | 1448 | 0 | expected_peer_id, | 1449 | 0 | remote_addr.as_ref(), | 1450 | 0 | ); | 1451 | 0 | debug_assert!(_was_in.is_ok()); | 1452 | | if let basic_peering_strategy::InsertAddressConnectionsResult::Inserted { | 1453 | 0 | address_removed: Some(addr_rm), | 1454 | 0 | } = inner.peering_strategy.increase_address_connections( | 1455 | 0 | &peer_id, | 1456 | 0 | remote_addr.into_bytes().to_owned(), | 1457 | 0 | 10, // TODO: constant | 1458 | 0 | ) { | 1459 | 0 | let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap(); | 1460 | 0 | inner.log_callback.log( | 1461 | 0 | LogLevel::Debug, | 1462 | 0 | format!("address-purged; peer_id={}; address={}", peer_id, addr_rm), | 1463 | 0 | ); | 1464 | 0 | } | 1465 | 0 | } else { | 1466 | 0 | inner | 1467 | 0 | .log_callback | 1468 | 0 | .log(LogLevel::Debug, format!("connected; peer_id={}", peer_id)); | 1469 | 0 | } | 1470 | | } | 1471 | | | 1472 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { | 1473 | | expected_peer_id: Some(_), | 1474 | | .. | 1475 | | }) | 1476 | | | WakeUpReason::NetworkEvent(service::Event::Disconnected { .. }) => { | 1477 | 0 | let (address, peer_id, handshake_finished) = match wake_up_reason { | 1478 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { | 1479 | 0 | address, | 1480 | 0 | expected_peer_id: Some(peer_id), | 1481 | 0 | .. | 1482 | 0 | }) => (address, peer_id, false), | 1483 | | WakeUpReason::NetworkEvent(service::Event::Disconnected { | 1484 | 0 | address, | 1485 | 0 | peer_id, | 1486 | 0 | .. | 1487 | 0 | }) => (address, peer_id, true), | 1488 | 0 | _ => unreachable!(), | 1489 | | }; | 1490 | | | 1491 | 0 | if !handshake_finished { | 1492 | 0 | inner.num_pending_out_attempts -= 1; | 1493 | 0 | } | 1494 | | | 1495 | 0 | inner | 1496 | 0 | .peering_strategy | 1497 | 0 | .decrease_address_connections(&peer_id, &address) | 1498 | 0 | .unwrap(); | 1499 | 0 | let address = Multiaddr::from_bytes(&address).unwrap(); | 1500 | 0 | inner.log_callback.log( | 1501 | 0 | LogLevel::Debug, | 1502 | 0 | format!( | 1503 | 0 | "disconnected; handshake-finished={}; peer_id={}; address={}", | 1504 | 0 | handshake_finished, peer_id, address | 1505 | 0 | ), | 1506 | 0 | ); | 1507 | 0 |
| 1508 | 0 | // Ban the peer in order to avoid trying over and over again the same address(es). | 1509 | 0 | // Even if the handshake was finished, it is possible that the peer simply shuts | 1510 | 0 | // down connections immediately after it has been opened, hence the ban. | 1511 | 0 | // Due to race conditions and peerid mismatches, it is possible that there is | 1512 | 0 | // another existing connection or connection attempt with that same peer. However, | 1513 | 0 | // it is not possible to be sure that we will reach 0 connections or connection | 1514 | 0 | // attempts, and thus we ban the peer every time. | 1515 | 0 | let ban_duration = Duration::from_secs(5); | 1516 | 0 | inner.network.gossip_remove_desired_all( | 1517 | 0 | &peer_id, | 1518 | 0 | service::GossipKind::ConsensusTransactions, | 1519 | 0 | ); | 1520 | 0 | for (&chain_id, what_happened) in inner | 1521 | 0 | .peering_strategy | 1522 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + ban_duration) | 1523 | | { | 1524 | 0 | if matches!( | 1525 | 0 | what_happened, | 1526 | | basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true } | 1527 | 0 | ) { | 1528 | 0 | inner.log_callback.log( | 1529 | 0 | LogLevel::Debug, | 1530 | 0 | format!( | 1531 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=disconnected", | 1532 | 0 | peer_id, inner.network[chain_id].log_name | 1533 | 0 | ), | 1534 | 0 | ); | 1535 | 0 | } | 1536 | | } | 1537 | | } | 1538 | | | 1539 | | WakeUpReason::NetworkEvent(service::Event::PreHandshakeDisconnected { | 1540 | | expected_peer_id: None, | 1541 | 0 | address, | 1542 | 0 | .. | 1543 | 0 | }) => { | 1544 | 0 | inner.log_callback.log( | 1545 | 0 | LogLevel::Debug, | 1546 | 0 | format!( | 1547 | 0 | "disconnected; handshake-finished=false; address={}", | 1548 | 0 | Multiaddr::from_bytes(&address).unwrap() | 1549 | 0 | ), | 1550 | 0 | ); | 1551 | 0 | } | 1552 | | | 1553 | | WakeUpReason::NetworkEvent(service::Event::PingOutSuccess { | 1554 | 0 | id, | 1555 | 0 | peer_id, | 1556 | 0 | ping_time, | 1557 | 0 | }) => { | 1558 | 0 | let remote_addr = | 1559 | 0 | Multiaddr::from_bytes(inner.network.connection_remote_addr(id).to_owned()) | 1560 | 0 | .unwrap(); // TODO: review this unwrap | 1561 | 0 | inner.log_callback.log( | 1562 | 0 | LogLevel::Debug, | 1563 | 0 | format!("ping; peer_id={peer_id}; remote_addr={remote_addr}); ping-time={ping_time:?}"), | 1564 | 0 | ); | 1565 | 0 | } | 1566 | | | 1567 | | WakeUpReason::NetworkEvent(service::Event::BlockAnnounce { | 1568 | 0 | chain_id, | 1569 | 0 | peer_id, | 1570 | 0 | announce, | 1571 | 0 | }) => { | 1572 | 0 | let decoded = announce.decode(); | 1573 | 0 | let header_hash = | 1574 | 0 | header::hash_from_scale_encoded_header(decoded.scale_encoded_header); | 1575 | 0 | match header::decode( | 1576 | 0 | decoded.scale_encoded_header, | 1577 | 0 | inner.network.block_number_bytes(chain_id), | 1578 | 0 | ) { | 1579 | 0 | Ok(decoded_header) => { | 1580 | 0 | let mut _jaeger_span = inner.jaeger_service.block_announce_receive_span( | 1581 | 0 | &inner.local_peer_id, | 1582 | 0 | &peer_id, | 1583 | 0 | decoded_header.number, | 1584 | 0 | &decoded_header.hash(inner.network.block_number_bytes(chain_id)), | 1585 | 0 | ); | 1586 | 0 |
| 1587 | 0 | inner.log_callback.log(LogLevel::Debug, format!( | 1588 | 0 | "block-announce; peer_id={}; chain={}; hash={}; number={}; is_best={:?}", | 1589 | 0 | peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded_header.number, decoded.is_best | 1590 | 0 | )); | 1591 | 0 |
| 1592 | 0 | debug_assert!(inner.event_pending_send.is_none()); | 1593 | 0 | inner.event_pending_send = Some(Event::BlockAnnounce { | 1594 | 0 | chain_id, | 1595 | 0 | peer_id, | 1596 | 0 | is_best: decoded.is_best, | 1597 | 0 | scale_encoded_header: decoded.scale_encoded_header.to_owned(), // TODO: somewhat wasteful to copy here, could pass the entire announce | 1598 | 0 | }); | 1599 | | } | 1600 | 0 | Err(error) => { | 1601 | 0 | inner.log_callback.log(LogLevel::Warn, format!( | 1602 | 0 | "block-announce-bad-header; peer_id={}; chain={}; hash={}; is_best={:?}; error={}", | 1603 | 0 | peer_id, inner.network[chain_id].log_name, HashDisplay(&header_hash), decoded.is_best, error | 1604 | 0 | )); | 1605 | 0 |
| 1606 | 0 | if inner.network.gossip_remove_desired( | 1607 | 0 | chain_id, | 1608 | 0 | &peer_id, | 1609 | 0 | service::GossipKind::ConsensusTransactions, | 1610 | 0 | ) { | 1611 | 0 | inner.peering_strategy.unassign_slot_and_ban( | 1612 | 0 | &chain_id, | 1613 | 0 | &peer_id, | 1614 | 0 | Instant::now() + Duration::from_secs(10), | 1615 | 0 | ); | 1616 | 0 | inner.log_callback.log( | 1617 | 0 | LogLevel::Debug, | 1618 | 0 | format!( | 1619 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=bad-block-announce", | 1620 | 0 | peer_id, inner.network[chain_id].log_name | 1621 | 0 | ), | 1622 | 0 | ); | 1623 | 0 | } | 1624 | 0 | let _ = inner.network.gossip_close( | 1625 | 0 | chain_id, | 1626 | 0 | &peer_id, | 1627 | 0 | service::GossipKind::ConsensusTransactions, | 1628 | 0 | ); // TODO: what is the return value? | 1629 | | } | 1630 | | } | 1631 | | } | 1632 | | WakeUpReason::NetworkEvent(service::Event::GossipConnected { | 1633 | 0 | peer_id, | 1634 | 0 | chain_id, | 1635 | 0 | best_number, | 1636 | 0 | best_hash, | 1637 | 0 | .. | 1638 | 0 | }) => { | 1639 | 0 | inner.log_callback.log( | 1640 | 0 | LogLevel::Debug, | 1641 | 0 | format!( | 1642 | 0 | "chain-connected; peer_id={}; chain={}; best_number={}; best_hash={}", | 1643 | 0 | peer_id, | 1644 | 0 | inner.network[chain_id].log_name, | 1645 | 0 | best_number, | 1646 | 0 | HashDisplay(&best_hash), | 1647 | 0 | ), | 1648 | 0 | ); | 1649 | 0 | debug_assert!(inner.event_pending_send.is_none()); | 1650 | 0 | inner.event_pending_send = Some(Event::Connected { | 1651 | 0 | peer_id, | 1652 | 0 | chain_id, | 1653 | 0 | best_block_number: best_number, | 1654 | 0 | best_block_hash: best_hash, | 1655 | 0 | }); | 1656 | | } | 1657 | | WakeUpReason::NetworkEvent(service::Event::GossipDisconnected { | 1658 | 0 | peer_id, | 1659 | 0 | chain_id, | 1660 | 0 | .. | 1661 | 0 | }) => { | 1662 | 0 | inner.log_callback.log( | 1663 | 0 | LogLevel::Debug, | 1664 | 0 | format!( | 1665 | 0 | "chain-disconnected; peer_id={}; chain={}", | 1666 | 0 | peer_id, inner.network[chain_id].log_name | 1667 | 0 | ), | 1668 | 0 | ); | 1669 | 0 |
| 1670 | 0 | // Note that peer doesn't necessarily have an out slot, as this event | 1671 | 0 | // might happen as a result of an inbound gossip connection. | 1672 | 0 | inner.peering_strategy.unassign_slot_and_ban( | 1673 | 0 | &chain_id, | 1674 | 0 | &peer_id, | 1675 | 0 | Instant::now() + Duration::from_secs(10), | 1676 | 0 | ); | 1677 | 0 | if inner.network.gossip_remove_desired( | 1678 | 0 | chain_id, | 1679 | 0 | &peer_id, | 1680 | 0 | service::GossipKind::ConsensusTransactions, | 1681 | 0 | ) { | 1682 | 0 | inner.log_callback.log( | 1683 | 0 | LogLevel::Debug, | 1684 | 0 | format!( | 1685 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=gossip-disconnected", | 1686 | 0 | peer_id, inner.network[chain_id].log_name | 1687 | 0 | ), | 1688 | 0 | ); | 1689 | 0 | } | 1690 | | | 1691 | 0 | debug_assert!(inner.event_pending_send.is_none()); | 1692 | 0 | inner.event_pending_send = Some(Event::Disconnected { chain_id, peer_id }); | 1693 | | } | 1694 | | WakeUpReason::NetworkEvent(service::Event::GossipOpenFailed { | 1695 | 0 | chain_id, | 1696 | 0 | peer_id, | 1697 | 0 | error, | 1698 | 0 | .. | 1699 | 0 | }) => { | 1700 | 0 | inner.log_callback.log( | 1701 | 0 | LogLevel::Debug, | 1702 | 0 | format!( | 1703 | 0 | "chain-connect-attempt-failed; peer_id={}; chain={}; error={}", | 1704 | 0 | peer_id, inner.network[chain_id].log_name, error | 1705 | 0 | ), | 1706 | 0 | ); | 1707 | 0 |
| 1708 | 0 | // Note that peer doesn't necessarily have an out slot, as this event | 1709 | 0 | // might happen as a result of an inbound gossip connection. | 1710 | 0 | if inner.network.gossip_remove_desired( | 1711 | 0 | chain_id, | 1712 | 0 | &peer_id, | 1713 | 0 | service::GossipKind::ConsensusTransactions, | 1714 | 0 | ) { | 1715 | 0 | inner.log_callback.log( | 1716 | 0 | LogLevel::Debug, | 1717 | 0 | format!( | 1718 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=gossip-open-failed", | 1719 | 0 | peer_id, inner.network[chain_id].log_name | 1720 | 0 | ), | 1721 | 0 | ); | 1722 | 0 | } | 1723 | | | 1724 | 0 | if let service::GossipConnectError::GenesisMismatch { .. } = error { | 1725 | 0 | inner | 1726 | 0 | .peering_strategy | 1727 | 0 | .unassign_slot_and_remove_chain_peer(&chain_id, &peer_id); | 1728 | 0 | } else { | 1729 | 0 | inner.peering_strategy.unassign_slot_and_ban( | 1730 | 0 | &chain_id, | 1731 | 0 | &peer_id, | 1732 | 0 | Instant::now() + Duration::from_secs(15), | 1733 | 0 | ); | 1734 | 0 | } | 1735 | | } | 1736 | | WakeUpReason::NetworkEvent(service::Event::GossipInDesired { | 1737 | 0 | chain_id, | 1738 | 0 | peer_id, | 1739 | 0 | kind: service::GossipKind::ConsensusTransactions, | 1740 | 0 | }) => { | 1741 | 0 | // TODO: log this | 1742 | 0 | // The networking state machine guarantees that `GossipInDesired` | 1743 | 0 | // can't happen if we are already opening an out slot, which we do | 1744 | 0 | // immediately. | 1745 | 0 | // TODO: add debug_assert! ^ | 1746 | 0 | if inner | 1747 | 0 | .network | 1748 | 0 | .opened_gossip_undesired_by_chain(chain_id) | 1749 | 0 | .count() | 1750 | 0 | < inner.network[chain_id].max_in_peers | 1751 | 0 | { | 1752 | 0 | inner | 1753 | 0 | .network | 1754 | 0 | .gossip_open( | 1755 | 0 | chain_id, | 1756 | 0 | &peer_id, | 1757 | 0 | service::GossipKind::ConsensusTransactions, | 1758 | 0 | ) | 1759 | 0 | .unwrap(); | 1760 | 0 | } else { | 1761 | 0 | inner | 1762 | 0 | .network | 1763 | 0 | .gossip_close( | 1764 | 0 | chain_id, | 1765 | 0 | &peer_id, | 1766 | 0 | service::GossipKind::ConsensusTransactions, | 1767 | 0 | ) | 1768 | 0 | .unwrap(); | 1769 | 0 | } | 1770 | | } | 1771 | | WakeUpReason::NetworkEvent(service::Event::GossipInDesiredCancel { .. }) => { | 1772 | | // All `GossipInDesired` are immediately accepted or rejected, meaning | 1773 | | // that this event can't happen. | 1774 | 0 | unreachable!() | 1775 | | } | 1776 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1777 | 0 | substream_id, | 1778 | 0 | peer_id, | 1779 | 0 | chain_id, | 1780 | 0 | response: service::RequestResult::Blocks(response), | 1781 | 0 | }) => { | 1782 | 0 | match &response { | 1783 | 0 | Ok(success) => { | 1784 | 0 | inner.log_callback.log( | 1785 | 0 | LogLevel::Debug, | 1786 | 0 | format!( | 1787 | 0 | "blocks-request-ended; outcome=success; peer_id={peer_id}; chain={}; response-blocks={}", | 1788 | 0 | inner.network[chain_id].log_name, | 1789 | 0 | success.len() | 1790 | 0 | ), | 1791 | 0 | ); | 1792 | 0 | } | 1793 | 0 | Err(err) => { | 1794 | 0 | inner.log_callback.log( | 1795 | 0 | LogLevel::Debug, | 1796 | 0 | format!("blocks-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", | 1797 | 0 | inner.network[chain_id].log_name, err), | 1798 | 0 | ); | 1799 | 0 | } | 1800 | | } | 1801 | | | 1802 | 0 | let _ = inner | 1803 | 0 | .blocks_requests | 1804 | 0 | .remove(&substream_id) | 1805 | 0 | .unwrap() | 1806 | 0 | .send(response.map_err(BlocksRequestError::Request)); | 1807 | | } | 1808 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1809 | 0 | substream_id, | 1810 | 0 | peer_id, | 1811 | 0 | chain_id, | 1812 | 0 | response: service::RequestResult::GrandpaWarpSync(response), | 1813 | 0 | }) => { | 1814 | 0 | match &response { | 1815 | 0 | Ok(success) => { | 1816 | 0 | let decoded = success.decode(); | 1817 | 0 | inner.log_callback.log( | 1818 | 0 | LogLevel::Debug, | 1819 | 0 | format!( | 1820 | 0 | "warp-sync-request-ended; outcome=success; peer_id={peer_id}; chain={}; num-fragments={}; is-finished={:?}", | 1821 | 0 | inner.network[chain_id].log_name, | 1822 | 0 | decoded.fragments.len(), decoded.is_finished, | 1823 | 0 | ), | 1824 | 0 | ); | 1825 | 0 | } | 1826 | 0 | Err(err) => { | 1827 | 0 | inner.log_callback.log( | 1828 | 0 | LogLevel::Debug, | 1829 | 0 | format!("warp-sync-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", | 1830 | 0 | inner.network[chain_id].log_name, err), | 1831 | 0 | ); | 1832 | 0 | } | 1833 | | } | 1834 | | | 1835 | 0 | let _ = inner | 1836 | 0 | .warp_sync_requests | 1837 | 0 | .remove(&substream_id) | 1838 | 0 | .unwrap() | 1839 | 0 | .send(response.map_err(WarpSyncRequestError::Request)); | 1840 | | } | 1841 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1842 | 0 | substream_id, | 1843 | 0 | peer_id, | 1844 | 0 | chain_id, | 1845 | 0 | response: service::RequestResult::StorageProof(response), | 1846 | 0 | }) => { | 1847 | 0 | match &response { | 1848 | 0 | Ok(success) => { | 1849 | 0 | inner.log_callback.log( | 1850 | 0 | LogLevel::Debug, | 1851 | 0 | format!( | 1852 | 0 | "storage-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}", | 1853 | 0 | inner.network[chain_id].log_name, | 1854 | 0 | BytesDisplay(u64::try_from(success.decode().len()).unwrap()), | 1855 | 0 | ), | 1856 | 0 | ); | 1857 | 0 | } | 1858 | 0 | Err(err) => { | 1859 | 0 | inner.log_callback.log( | 1860 | 0 | LogLevel::Debug, | 1861 | 0 | format!( | 1862 | 0 | "storage-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", | 1863 | 0 | inner.network[chain_id].log_name, err | 1864 | 0 | ), | 1865 | 0 | ); | 1866 | 0 | } | 1867 | | } | 1868 | | | 1869 | 0 | let _ = inner | 1870 | 0 | .storage_requests | 1871 | 0 | .remove(&substream_id) | 1872 | 0 | .unwrap() | 1873 | 0 | .send(response.map_err(|_| ())); | 1874 | | } | 1875 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1876 | 0 | substream_id, | 1877 | 0 | peer_id, | 1878 | 0 | chain_id, | 1879 | 0 | response: service::RequestResult::CallProof(response), | 1880 | 0 | }) => { | 1881 | 0 | match &response { | 1882 | 0 | Ok(success) => { | 1883 | 0 | inner.log_callback.log( | 1884 | 0 | LogLevel::Debug, | 1885 | 0 | format!( | 1886 | 0 | "call-proof-request-ended; outcome=success; peer_id={peer_id}; chain={}; proof-size={}", | 1887 | 0 | inner.network[chain_id].log_name, | 1888 | 0 | BytesDisplay(u64::try_from(success.decode().len()).unwrap()), | 1889 | 0 | ), | 1890 | 0 | ); | 1891 | 0 | } | 1892 | 0 | Err(err) => { | 1893 | 0 | inner.log_callback.log( | 1894 | 0 | LogLevel::Debug, | 1895 | 0 | format!( | 1896 | 0 | "call-proof-request-ended; outcome=failure; peer_id={peer_id}; chain={}; error={}", | 1897 | 0 | inner.network[chain_id].log_name, | 1898 | 0 | err | 1899 | 0 | ), | 1900 | 0 | ); | 1901 | 0 | } | 1902 | | } | 1903 | | | 1904 | 0 | let _ = inner | 1905 | 0 | .call_proof_requests | 1906 | 0 | .remove(&substream_id) | 1907 | 0 | .unwrap() | 1908 | 0 | .send(response.map_err(|_| ())); | 1909 | | } | 1910 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1911 | 0 | peer_id: kademlia_request_target, | 1912 | 0 | chain_id, | 1913 | 0 | response: service::RequestResult::KademliaFindNode(Ok(nodes)), | 1914 | | .. | 1915 | | }) => { | 1916 | 0 | for (peer_id, addrs) in nodes { | 1917 | 0 | let mut valid_addrs = Vec::with_capacity(addrs.len()); | 1918 | 0 | for addr in addrs { | 1919 | 0 | match Multiaddr::from_bytes(addr) { | 1920 | 0 | Ok(a) => valid_addrs.push(a), | 1921 | 0 | Err((error, addr)) => { | 1922 | 0 | inner.log_callback.log( | 1923 | 0 | LogLevel::Debug, | 1924 | 0 | format!( | 1925 | 0 | "discovery-invalid-address; error={error}, addr={}, discovered_from={kademlia_request_target}", | 1926 | 0 | hex::encode(&addr) | 1927 | 0 | ), | 1928 | 0 | ); | 1929 | 0 | continue; | 1930 | | } | 1931 | | } | 1932 | | } | 1933 | | | 1934 | 0 | if !valid_addrs.is_empty() { | 1935 | | // Note that we must call this function before `insert_address`, | 1936 | | // as documented in `basic_peering_strategy`. | 1937 | | if let basic_peering_strategy::InsertChainPeerResult::Inserted { | 1938 | 0 | peer_removed: Some(peer_removed), | 1939 | 0 | } = inner.peering_strategy.insert_chain_peer( | 1940 | 0 | chain_id, | 1941 | 0 | peer_id.clone(), | 1942 | 0 | 100, // TODO: constant | 1943 | 0 | ) { | 1944 | 0 | inner.log_callback.log( | 1945 | 0 | LogLevel::Debug, | 1946 | 0 | format!( | 1947 | 0 | "peer-forgotten; peer_id={}; chain={}", | 1948 | 0 | peer_removed, inner.network[chain_id].log_name | 1949 | 0 | ), | 1950 | 0 | ); | 1951 | 0 | } | 1952 | 0 | } | 1953 | | | 1954 | 0 | for addr in valid_addrs { | 1955 | 0 | inner.log_callback.log( | 1956 | 0 | LogLevel::Debug, | 1957 | 0 | format!( | 1958 | 0 | "discovered; chain={}; peer_id={peer_id}; address={addr}; discovered_from={kademlia_request_target}", | 1959 | 0 | inner.network[chain_id].log_name | 1960 | 0 | ), | 1961 | 0 | ); | 1962 | 0 |
| 1963 | 0 | match inner | 1964 | 0 | .peering_strategy | 1965 | 0 | .insert_address(&peer_id, addr.into_bytes(), 10) // TODO: constant | 1966 | | { | 1967 | 0 | basic_peering_strategy::InsertAddressResult::Inserted { address_removed: Some(addr_rm) } => { | 1968 | 0 | let addr_rm = Multiaddr::from_bytes(addr_rm).unwrap(); | 1969 | 0 | inner | 1970 | 0 | .log_callback | 1971 | 0 | .log(LogLevel::Debug, format!("address-purged; peer_id={}; address={}", peer_id, addr_rm)); | 1972 | 0 | } | 1973 | 0 | basic_peering_strategy::InsertAddressResult::UnknownPeer => unreachable!(), | 1974 | 0 | _ => {} | 1975 | | } | 1976 | | } | 1977 | | } | 1978 | | } | 1979 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { | 1980 | 0 | peer_id, | 1981 | 0 | chain_id, | 1982 | 0 | response: service::RequestResult::KademliaFindNode(Err(error)), | 1983 | 0 | .. | 1984 | 0 | }) => { | 1985 | 0 | inner.log_callback.log( | 1986 | 0 | LogLevel::Debug, | 1987 | 0 | format!( | 1988 | 0 | "discovery-error; chain={}; peer_id={peer_id}; error={error}", | 1989 | 0 | inner.network[chain_id].log_name | 1990 | 0 | ), | 1991 | 0 | ); | 1992 | 0 | } | 1993 | | WakeUpReason::NetworkEvent(service::Event::RequestResult { .. }) => { | 1994 | | // We never start a request of any other kind. | 1995 | 0 | unreachable!() | 1996 | | } | 1997 | | WakeUpReason::NetworkEvent(service::Event::RequestInCancel { .. }) => { | 1998 | | // Requests are answered immediately, and thus cancelling events can't happen. | 1999 | 0 | unreachable!() | 2000 | | } | 2001 | | WakeUpReason::NetworkEvent(service::Event::IdentifyRequestIn { | 2002 | 0 | peer_id, | 2003 | 0 | substream_id, | 2004 | 0 | }) => { | 2005 | 0 | inner.log_callback.log( | 2006 | 0 | LogLevel::Debug, | 2007 | 0 | format!("identify-request; peer_id={}", peer_id), | 2008 | 0 | ); | 2009 | 0 | inner | 2010 | 0 | .network | 2011 | 0 | .respond_identify(substream_id, &inner.identify_agent_version); | 2012 | 0 | } | 2013 | | WakeUpReason::NetworkEvent(service::Event::BlocksRequestIn { | 2014 | 0 | peer_id, | 2015 | 0 | chain_id, | 2016 | 0 | config, | 2017 | 0 | substream_id, | 2018 | 0 | }) => { | 2019 | 0 | inner.log_callback.log( | 2020 | 0 | LogLevel::Debug, | 2021 | 0 | format!( | 2022 | 0 | "incoming-blocks-request; peer_id={}; chain={}", | 2023 | 0 | peer_id, inner.network[chain_id].log_name | 2024 | 0 | ), | 2025 | 0 | ); | 2026 | 0 | let mut _jaeger_span = inner.jaeger_service.incoming_block_request_span( | 2027 | 0 | &inner.local_peer_id, | 2028 | 0 | &peer_id, | 2029 | 0 | config.desired_count.get(), | 2030 | 0 | if let (1, codec::BlocksRequestConfigStart::Hash(block_hash)) = | 2031 | 0 | (config.desired_count.get(), &config.start) | 2032 | | { | 2033 | 0 | Some(block_hash) | 2034 | | } else { | 2035 | 0 | None | 2036 | | }, | 2037 | | ); | 2038 | | | 2039 | | // TODO: is it a good idea to await here while the lock is held and freezing the entire networking background task? | 2040 | 0 | let response = blocks_request_response( | 2041 | 0 | &inner.network[chain_id].database, | 2042 | 0 | inner.network.block_number_bytes(chain_id), | 2043 | 0 | config, | 2044 | 0 | ) | 2045 | 0 | .await; | 2046 | 0 | inner.network.respond_blocks( | 2047 | 0 | substream_id, | 2048 | 0 | match response { | 2049 | 0 | Ok(b) => Some(b), | 2050 | 0 | Err(error) => { | 2051 | 0 | inner.log_callback.log( | 2052 | 0 | LogLevel::Warn, | 2053 | 0 | format!("incoming-blocks-request-error; error={}", error), | 2054 | 0 | ); | 2055 | 0 | None | 2056 | | } | 2057 | | }, | 2058 | | ); | 2059 | | } | 2060 | | WakeUpReason::NetworkEvent(service::Event::GrandpaNeighborPacket { | 2061 | 0 | chain_id, | 2062 | 0 | peer_id, | 2063 | 0 | state, | 2064 | 0 | }) => { | 2065 | 0 | inner.log_callback.log(LogLevel::Debug, format!( | 2066 | 0 | "grandpa-neighbor-packet; peer_id={}; chain={}; round_number={}; set_id={}; commit_finalized_height={}", | 2067 | 0 | peer_id, | 2068 | 0 | inner.network[chain_id].log_name, | 2069 | 0 | state.round_number, | 2070 | 0 | state.set_id, | 2071 | 0 | state.commit_finalized_height, | 2072 | 0 | )); | 2073 | 0 |
| 2074 | 0 | debug_assert!(inner.event_pending_send.is_none()); | 2075 | 0 | inner.event_pending_send = Some(Event::GrandpaNeighborPacket { | 2076 | 0 | chain_id, | 2077 | 0 | peer_id, | 2078 | 0 | finalized_block_height: state.commit_finalized_height, | 2079 | 0 | }); | 2080 | | } | 2081 | | WakeUpReason::NetworkEvent(service::Event::GrandpaCommitMessage { | 2082 | 0 | chain_id, | 2083 | 0 | peer_id, | 2084 | 0 | message, | 2085 | 0 | }) => { | 2086 | 0 | inner.log_callback.log( | 2087 | 0 | LogLevel::Debug, | 2088 | 0 | format!( | 2089 | 0 | "grandpa-commit-message; peer_id={}; chain={}; target_hash={}", | 2090 | 0 | peer_id, | 2091 | 0 | inner.network[chain_id].log_name, | 2092 | 0 | HashDisplay(message.decode().target_hash), | 2093 | 0 | ), | 2094 | 0 | ); | 2095 | 0 | } | 2096 | 0 | WakeUpReason::NetworkEvent(service::Event::ProtocolError { peer_id, error }) => { | 2097 | 0 | inner.log_callback.log( | 2098 | 0 | LogLevel::Warn, | 2099 | 0 | format!("protocol-error; peer_id={}; error={}", peer_id, error), | 2100 | 0 | ); | 2101 | 0 | inner | 2102 | 0 | .peering_strategy | 2103 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(5)); | 2104 | 0 | // TODO: log chain names? | 2105 | 0 | inner.log_callback.log( | 2106 | 0 | LogLevel::Debug, | 2107 | 0 | format!( | 2108 | 0 | "all-slots-unassigned; reason=no-address; peer_id={}", | 2109 | 0 | peer_id | 2110 | 0 | ), | 2111 | 0 | ); | 2112 | 0 | } | 2113 | | | 2114 | 0 | WakeUpReason::CanAssignSlot(peer_id, chain_id) => { | 2115 | 0 | inner.peering_strategy.assign_slot(&chain_id, &peer_id); | 2116 | 0 |
| 2117 | 0 | inner.log_callback.log( | 2118 | 0 | LogLevel::Debug, | 2119 | 0 | format!( | 2120 | 0 | "slot-assigned; peer_id={}; chain={}", | 2121 | 0 | peer_id, inner.network[chain_id].log_name | 2122 | 0 | ), | 2123 | 0 | ); | 2124 | 0 |
| 2125 | 0 | inner.network.gossip_insert_desired( | 2126 | 0 | chain_id, | 2127 | 0 | peer_id, | 2128 | 0 | service::GossipKind::ConsensusTransactions, | 2129 | 0 | ); | 2130 | 0 | } | 2131 | | | 2132 | 0 | WakeUpReason::CanStartConnect(peer_id) => { | 2133 | 0 | inner.num_pending_out_attempts += 1; | 2134 | | | 2135 | 0 | let Some(multiaddr) = inner | 2136 | 0 | .peering_strategy | 2137 | 0 | .pick_address_and_add_connection(&peer_id) | 2138 | | else { | 2139 | | // There is no address for that peer in the address book. | 2140 | 0 | inner.network.gossip_remove_desired_all( | 2141 | 0 | &peer_id, | 2142 | 0 | service::GossipKind::ConsensusTransactions, | 2143 | 0 | ); | 2144 | 0 | for (chain_id, what_happened) in inner | 2145 | 0 | .peering_strategy | 2146 | 0 | .unassign_slots_and_ban(&peer_id, Instant::now() + Duration::from_secs(10)) | 2147 | | { | 2148 | 0 | if matches!( | 2149 | 0 | what_happened, | 2150 | | basic_peering_strategy::UnassignSlotsAndBan::Banned { had_slot: true } | 2151 | 0 | ) { | 2152 | 0 | inner.log_callback.log( | 2153 | 0 | LogLevel::Debug, | 2154 | 0 | format!( | 2155 | 0 | "slot-unassigned; peer_id={}; chain={}; reason=no-address", | 2156 | 0 | peer_id, inner.network[*chain_id].log_name | 2157 | 0 | ), | 2158 | 0 | ); | 2159 | 0 | } | 2160 | | } | 2161 | 0 | continue; | 2162 | | }; | 2163 | | | 2164 | 0 | let multiaddr = match multiaddr::Multiaddr::from_bytes(multiaddr.to_owned()) { | 2165 | 0 | Ok(a) => a, | 2166 | 0 | Err((multiaddr::FromBytesError, multiaddr)) => { | 2167 | 0 | // Address is in an invalid format. | 2168 | 0 | inner.log_callback.log( | 2169 | 0 | LogLevel::Debug, | 2170 | 0 | format!( | 2171 | 0 | "invalid-address; peer_id={}; address={:?}", | 2172 | 0 | peer_id, multiaddr | 2173 | 0 | ), | 2174 | 0 | ); | 2175 | 0 | let _was_in = inner | 2176 | 0 | .peering_strategy | 2177 | 0 | .decrease_address_connections_and_remove_if_zero(&peer_id, &multiaddr); | 2178 | 0 | debug_assert!(_was_in.is_ok()); | 2179 | 0 | continue; | 2180 | | } | 2181 | | }; | 2182 | | | 2183 | | // Convert the `multiaddr` (typically of the form `/ip4/a.b.c.d/tcp/d`) into | 2184 | | // a `Future<dyn Output = Result<TcpStream, ...>>`. | 2185 | 0 | let socket = match tasks::multiaddr_to_socket(&multiaddr) { | 2186 | 0 | Ok(socket) => socket, | 2187 | | Err(_) => { | 2188 | | // Address is in an invalid format or isn't supported. | 2189 | 0 | inner.log_callback.log( | 2190 | 0 | LogLevel::Debug, | 2191 | 0 | format!( | 2192 | 0 | "invalid-address; peer_id={}; address={}", | 2193 | 0 | peer_id, multiaddr | 2194 | 0 | ), | 2195 | 0 | ); | 2196 | 0 | let _was_in = inner | 2197 | 0 | .peering_strategy | 2198 | 0 | .decrease_address_connections_and_remove_if_zero( | 2199 | 0 | &peer_id, | 2200 | 0 | multiaddr.as_ref(), | 2201 | 0 | ); | 2202 | 0 | debug_assert!(_was_in.is_ok()); | 2203 | 0 | continue; | 2204 | | } | 2205 | | }; | 2206 | | | 2207 | 0 | inner.log_callback.log( | 2208 | 0 | LogLevel::Debug, | 2209 | 0 | format!("start-connecting; peer_id={peer_id}; address={multiaddr}"), | 2210 | 0 | ); | 2211 | 0 |
| 2212 | 0 | let (tx, rx) = channel::bounded(16); // TODO: ?! | 2213 | 0 |
| 2214 | 0 | let (connection_id, connection_task) = inner.network.add_single_stream_connection( | 2215 | 0 | Instant::now(), | 2216 | 0 | service::SingleStreamHandshakeKind::MultistreamSelectNoiseYamux { | 2217 | 0 | is_initiator: true, | 2218 | 0 | noise_key: &inner.noise_key, | 2219 | 0 | }, | 2220 | 0 | multiaddr.clone().into_bytes(), | 2221 | 0 | Some(peer_id.clone()), | 2222 | 0 | tx, | 2223 | 0 | ); | 2224 | 0 |
| 2225 | 0 | // Handle the connection in a separate task. | 2226 | 0 | (inner.tasks_executor)(Box::pin(tasks::connection_task( | 2227 | 0 | inner.log_callback.clone(), | 2228 | 0 | multiaddr.to_string(), | 2229 | 0 | socket, | 2230 | 0 | connection_id, | 2231 | 0 | connection_task, | 2232 | 0 | rx, | 2233 | 0 | inner.from_connections_tx.clone(), | 2234 | 0 | ))); | 2235 | | } | 2236 | | | 2237 | 0 | WakeUpReason::CanOpenGossip(peer_id, chain_id) => { | 2238 | 0 | inner | 2239 | 0 | .network | 2240 | 0 | .gossip_open( | 2241 | 0 | chain_id, | 2242 | 0 | &peer_id, | 2243 | 0 | service::GossipKind::ConsensusTransactions, | 2244 | 0 | ) | 2245 | 0 | .unwrap(); | 2246 | 0 |
| 2247 | 0 | inner.log_callback.log( | 2248 | 0 | LogLevel::Debug, | 2249 | 0 | format!( | 2250 | 0 | "gossip-open; peer_id={}; chain={}", | 2251 | 0 | peer_id, &inner.network[chain_id].log_name | 2252 | 0 | ), | 2253 | 0 | ); | 2254 | 0 | } | 2255 | | } | 2256 | | } | 2257 | 0 | } |
Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service15background_task0B5_ |
2258 | | |
2259 | | /// Builds the response to a block request by reading from the given database. |
2260 | 0 | async fn blocks_request_response( |
2261 | 0 | database: &database_thread::DatabaseThread, |
2262 | 0 | block_number_bytes: usize, |
2263 | 0 | config: codec::BlocksRequestConfig, |
2264 | 0 | ) -> Result<Vec<codec::BlockData>, full_sqlite::CorruptedError> { Unexecuted instantiation: _RNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response Unexecuted instantiation: _RNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response |
2265 | 0 | database |
2266 | 0 | .with_database(move |database| { |
2267 | 0 | let num_blocks = cmp::min( |
2268 | 0 | usize::try_from(config.desired_count.get()).unwrap_or(usize::MAX), |
2269 | 0 | 128, |
2270 | 0 | ); |
2271 | 0 |
|
2272 | 0 | let mut output = Vec::with_capacity(num_blocks); |
2273 | 0 | let mut next_block = config.start; |
2274 | | |
2275 | | loop { |
2276 | 0 | if output.len() >= num_blocks { |
2277 | 0 | break; |
2278 | 0 | } |
2279 | | |
2280 | 0 | let hash = match next_block { |
2281 | 0 | codec::BlocksRequestConfigStart::Hash(hash) => hash, |
2282 | 0 | codec::BlocksRequestConfigStart::Number(number) => { |
2283 | 0 | // TODO: naive block selection ; should choose the best chain instead |
2284 | 0 | match database.block_hash_by_number(number)?.next() { |
2285 | 0 | Some(h) => h, |
2286 | 0 | None => break, |
2287 | | } |
2288 | | } |
2289 | | }; |
2290 | | |
2291 | 0 | let header = match database.block_scale_encoded_header(&hash)? { |
2292 | 0 | Some(h) => h, |
2293 | 0 | None => break, |
2294 | | }; |
2295 | | |
2296 | | next_block = { |
2297 | 0 | let decoded = header::decode(&header, block_number_bytes).unwrap(); |
2298 | 0 | match config.direction { |
2299 | | codec::BlocksRequestDirection::Ascending => { |
2300 | | // TODO: right now, since we don't necessarily pick the best chain in `block_hash_by_number`, it is possible that the next block doesn't have the current block as parent |
2301 | 0 | codec::BlocksRequestConfigStart::Number(decoded.number + 1) |
2302 | | } |
2303 | | codec::BlocksRequestDirection::Descending => { |
2304 | 0 | codec::BlocksRequestConfigStart::Hash(*decoded.parent_hash) |
2305 | | } |
2306 | | } |
2307 | | }; |
2308 | | |
2309 | 0 | output.push(codec::BlockData { |
2310 | 0 | hash, |
2311 | 0 | header: if config.fields.header { |
2312 | 0 | Some(header) |
2313 | | } else { |
2314 | 0 | None |
2315 | | }, |
2316 | 0 | body: if config.fields.body { |
2317 | 0 | Some(match database.block_extrinsics(&hash)? { |
2318 | 0 | Some(body) => body.collect(), |
2319 | 0 | None => break, |
2320 | | }) |
2321 | | } else { |
2322 | 0 | None |
2323 | | }, |
2324 | 0 | justifications: if config.fields.justifications { |
2325 | | // TODO: justifications aren't saved in database at the moment |
2326 | 0 | Some(Vec::new()) |
2327 | | } else { |
2328 | 0 | None |
2329 | | }, |
2330 | | }); |
2331 | | } |
2332 | | |
2333 | 0 | Ok(output) |
2334 | 0 | }) Unexecuted instantiation: _RNCNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response00B7_ Unexecuted instantiation: _RNCNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response00B7_ |
2335 | 0 | .await |
2336 | 0 | } Unexecuted instantiation: _RNCNvNtCsiUjFBJteJ7x_17smoldot_full_node15network_service23blocks_request_response0B5_ Unexecuted instantiation: _RNCNvNtCshBwayKnNXDT_17smoldot_full_node15network_service23blocks_request_response0B5_ |