/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service/chain_head_subscriptions.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2019-2022 Parity Technologies (UK) Ltd. |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | // TODO: document |
19 | | |
20 | | use futures_channel::oneshot; |
21 | | use futures_lite::FutureExt as _; |
22 | | use smol::stream::StreamExt as _; |
23 | | use smoldot::{ |
24 | | executor, |
25 | | json_rpc::{methods, service}, |
26 | | }; |
27 | | use std::{ |
28 | | future::Future, |
29 | | num::NonZeroUsize, |
30 | | pin::{self, Pin}, |
31 | | sync::Arc, |
32 | | }; |
33 | | |
34 | | use crate::{consensus_service, database_thread, LogCallback}; |
35 | | |
36 | | pub struct Config { |
37 | | /// Function that can be used to spawn background tasks. |
38 | | /// |
39 | | /// The tasks passed as parameter must be executed until they shut down. |
40 | | pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
41 | | |
42 | | /// Function called in order to notify of something. |
43 | | pub log_callback: Arc<dyn LogCallback + Send + Sync>, |
44 | | |
45 | | /// Receiver for actions that the JSON-RPC client wants to perform. |
46 | | pub receiver: async_channel::Receiver<Message>, |
47 | | |
48 | | /// `chainHead_v1_follow` subscription start handle. |
49 | | pub chain_head_follow_subscription: service::SubscriptionStartProcess, |
50 | | |
51 | | /// Parameter that was passed by the user when requesting `chainHead_v1_follow`. |
52 | | pub with_runtime: bool, |
53 | | |
54 | | /// Consensus service of the chain. |
55 | | pub consensus_service: Arc<consensus_service::ConsensusService>, |
56 | | |
57 | | /// Database to access blocks. |
58 | | pub database: Arc<database_thread::DatabaseThread>, |
59 | | } |
60 | | |
61 | | pub enum Message { |
62 | | Header { |
63 | | request: service::RequestProcess, |
64 | | }, |
65 | | Unpin { |
66 | | block_hashes: Vec<[u8; 32]>, |
67 | | outcome: oneshot::Sender<Result<(), ()>>, |
68 | | }, |
69 | | } |
70 | | |
71 | | /// Spawns a new tasks dedicated to handling a `chainHead_v1_follow` subscription. |
72 | | /// |
73 | | /// Returns the identifier of the subscription. |
74 | 0 | pub async fn spawn_chain_head_subscription_task(config: Config) -> String { Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task |
75 | 0 | let mut json_rpc_subscription = config.chain_head_follow_subscription.accept(); |
76 | 0 | let json_rpc_subscription_id = json_rpc_subscription.subscription_id().to_owned(); |
77 | 0 | let return_value = json_rpc_subscription_id.clone(); |
78 | 0 |
|
79 | 0 | let tasks_executor = config.tasks_executor.clone(); |
80 | 0 | tasks_executor(Box::pin(async move { |
81 | 0 | let consensus_service_subscription = config |
82 | 0 | .consensus_service |
83 | 0 | .subscribe_all(32, NonZeroUsize::new(32).unwrap()) |
84 | 0 | .await; |
85 | 0 | let mut consensus_service_subscription_new_blocks = |
86 | 0 | pin::pin!(consensus_service_subscription.new_blocks); |
87 | 0 |
|
88 | 0 | let mut foreground_receiver = pin::pin!(config.receiver); |
89 | 0 |
|
90 | 0 | let mut pinned_blocks = |
91 | 0 | hashbrown::HashSet::with_capacity_and_hasher(32, fnv::FnvBuildHasher::default()); |
92 | 0 | let mut current_best_block = consensus_service_subscription.finalized_block_hash; |
93 | 0 |
|
94 | 0 | pinned_blocks.insert(consensus_service_subscription.finalized_block_hash); |
95 | 0 | json_rpc_subscription |
96 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
97 | 0 | subscription: (&json_rpc_subscription_id).into(), |
98 | 0 | result: methods::FollowEvent::Initialized { |
99 | 0 | finalized_block_hashes: vec![methods::HashHexString( |
100 | 0 | consensus_service_subscription.finalized_block_hash, |
101 | 0 | )], |
102 | 0 | finalized_block_runtime: if config.with_runtime { |
103 | 0 | Some(convert_runtime_spec( |
104 | 0 | consensus_service_subscription |
105 | 0 | .finalized_block_runtime |
106 | 0 | .runtime_version(), |
107 | 0 | )) |
108 | | } else { |
109 | 0 | None |
110 | | }, |
111 | | }, |
112 | | }) |
113 | 0 | .await; |
114 | | |
115 | 0 | for block in consensus_service_subscription.non_finalized_blocks_ancestry_order { |
116 | 0 | pinned_blocks.insert(block.block_hash); |
117 | 0 | json_rpc_subscription |
118 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
119 | 0 | subscription: (&json_rpc_subscription_id).into(), |
120 | 0 | result: methods::FollowEvent::NewBlock { |
121 | 0 | block_hash: methods::HashHexString(block.block_hash), |
122 | 0 | new_runtime: if let (Some(new_runtime), true) = |
123 | 0 | (&block.runtime_update, config.with_runtime) |
124 | | { |
125 | 0 | Some(convert_runtime_spec(new_runtime.runtime_version())) |
126 | | } else { |
127 | 0 | None |
128 | | }, |
129 | 0 | parent_block_hash: methods::HashHexString(block.parent_hash), |
130 | | }, |
131 | | }) |
132 | 0 | .await; |
133 | | |
134 | 0 | if block.is_new_best { |
135 | 0 | current_best_block = block.block_hash; |
136 | 0 | json_rpc_subscription |
137 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
138 | 0 | subscription: (&json_rpc_subscription_id).into(), |
139 | 0 | result: methods::FollowEvent::BestBlockChanged { |
140 | 0 | best_block_hash: methods::HashHexString(block.block_hash), |
141 | 0 | }, |
142 | 0 | }) |
143 | 0 | .await; |
144 | 0 | } |
145 | | } |
146 | | |
147 | | loop { |
148 | | enum WakeUpReason { |
149 | | ConsensusNotification(consensus_service::Notification), |
150 | | ConsensusSubscriptionStop, |
151 | | Foreground(Message), |
152 | | ForegroundClosed, |
153 | | } |
154 | | |
155 | 0 | let wake_up_reason = async { |
156 | 0 | consensus_service_subscription_new_blocks |
157 | 0 | .next() |
158 | 0 | .await |
159 | 0 | .map_or( |
160 | 0 | WakeUpReason::ConsensusSubscriptionStop, |
161 | 0 | WakeUpReason::ConsensusNotification, |
162 | 0 | ) |
163 | 0 | } Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task000Bb_ Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task000Bb_ |
164 | 0 | .or(async { |
165 | 0 | foreground_receiver |
166 | 0 | .next() |
167 | 0 | .await |
168 | 0 | .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Foreground) |
169 | 0 | }) Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s_0Bb_ Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s_0Bb_ |
170 | 0 | .await; |
171 | | |
172 | 0 | match wake_up_reason { |
173 | 0 | WakeUpReason::ForegroundClosed => return, |
174 | 0 | WakeUpReason::Foreground(Message::Header { request }) => { |
175 | 0 | let methods::MethodCall::chainHead_v1_header { hash, .. } = request.request() |
176 | | else { |
177 | 0 | unreachable!() |
178 | | }; |
179 | | |
180 | 0 | if !pinned_blocks.contains(&hash.0) { |
181 | 0 | request.fail(service::ErrorResponse::InvalidParams); |
182 | 0 | continue; |
183 | 0 | } |
184 | | |
185 | 0 | let database_outcome = config |
186 | 0 | .database |
187 | 0 | .with_database(move |database| database.block_scale_encoded_header(&hash.0)) Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s0_0Bb_ Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s0_0Bb_ |
188 | 0 | .await; |
189 | | |
190 | 0 | match database_outcome { |
191 | 0 | Ok(Some(header)) => { |
192 | 0 | request.respond(methods::Response::chainHead_v1_header(Some( |
193 | 0 | methods::HexString(header), |
194 | 0 | ))) |
195 | | } |
196 | 0 | Ok(None) => { |
197 | 0 | // Should never happen given that blocks are pinned. |
198 | 0 | // TODO: log the problem |
199 | 0 | request.fail(service::ErrorResponse::InternalError); |
200 | 0 | } |
201 | 0 | Err(_) => { |
202 | 0 | // TODO: log the problem |
203 | 0 | request.fail(service::ErrorResponse::InternalError); |
204 | 0 | } |
205 | | } |
206 | | } |
207 | | WakeUpReason::Foreground(Message::Unpin { |
208 | 0 | block_hashes, |
209 | 0 | outcome, |
210 | 0 | }) => { |
211 | 0 | if block_hashes.iter().any(|h| !pinned_blocks.contains(h)) { Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s1_0Bb_ Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s1_0Bb_ |
212 | 0 | let _ = outcome.send(Err(())); |
213 | 0 | } else { |
214 | 0 | for block_hash in block_hashes { |
215 | 0 | pinned_blocks.remove(&block_hash); |
216 | 0 | config |
217 | 0 | .consensus_service |
218 | 0 | .unpin_block(consensus_service_subscription.id, block_hash) |
219 | 0 | .await; |
220 | | } |
221 | 0 | let _ = outcome.send(Ok(())); |
222 | | } |
223 | | } |
224 | | WakeUpReason::ConsensusNotification(consensus_service::Notification::Block { |
225 | 0 | block, |
226 | 0 | .. |
227 | 0 | }) => { |
228 | 0 | pinned_blocks.insert(block.block_hash); |
229 | 0 | json_rpc_subscription |
230 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
231 | 0 | subscription: (&json_rpc_subscription_id).into(), |
232 | 0 | result: methods::FollowEvent::NewBlock { |
233 | 0 | block_hash: methods::HashHexString(block.block_hash), |
234 | 0 | new_runtime: if let (Some(new_runtime), true) = |
235 | 0 | (&block.runtime_update, config.with_runtime) |
236 | | { |
237 | 0 | Some(convert_runtime_spec(new_runtime.runtime_version())) |
238 | | } else { |
239 | 0 | None |
240 | | }, |
241 | 0 | parent_block_hash: methods::HashHexString(block.parent_hash), |
242 | | }, |
243 | | }) |
244 | 0 | .await; |
245 | | |
246 | 0 | if block.is_new_best { |
247 | 0 | current_best_block = block.block_hash; |
248 | 0 | json_rpc_subscription |
249 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
250 | 0 | subscription: (&json_rpc_subscription_id).into(), |
251 | 0 | result: methods::FollowEvent::BestBlockChanged { |
252 | 0 | best_block_hash: methods::HashHexString(block.block_hash), |
253 | 0 | }, |
254 | 0 | }) |
255 | 0 | .await; |
256 | 0 | } |
257 | | } |
258 | | WakeUpReason::ConsensusNotification( |
259 | | consensus_service::Notification::Finalized { |
260 | 0 | finalized_blocks_newest_to_oldest, |
261 | 0 | pruned_blocks_hashes, |
262 | 0 | best_block_hash, |
263 | 0 | }, |
264 | 0 | ) => { |
265 | 0 | json_rpc_subscription |
266 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
267 | 0 | subscription: (&json_rpc_subscription_id).into(), |
268 | 0 | result: methods::FollowEvent::Finalized { |
269 | 0 | // As specified in the JSON-RPC spec, the list must be ordered |
270 | 0 | // in increasing block number. Consequently we have to reverse |
271 | 0 | // the list. |
272 | 0 | finalized_blocks_hashes: finalized_blocks_newest_to_oldest |
273 | 0 | .into_iter() |
274 | 0 | .map(methods::HashHexString) |
275 | 0 | .rev() |
276 | 0 | .collect(), |
277 | 0 | pruned_blocks_hashes: pruned_blocks_hashes |
278 | 0 | .into_iter() |
279 | 0 | .map(methods::HashHexString) |
280 | 0 | .collect(), |
281 | 0 | }, |
282 | 0 | }) |
283 | 0 | .await; |
284 | | |
285 | 0 | if best_block_hash != current_best_block { |
286 | 0 | current_best_block = best_block_hash; |
287 | 0 | json_rpc_subscription |
288 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
289 | 0 | subscription: (&json_rpc_subscription_id).into(), |
290 | 0 | result: methods::FollowEvent::BestBlockChanged { |
291 | 0 | best_block_hash: methods::HashHexString(best_block_hash), |
292 | 0 | }, |
293 | 0 | }) |
294 | 0 | .await; |
295 | 0 | } |
296 | | } |
297 | | WakeUpReason::ConsensusSubscriptionStop => { |
298 | 0 | json_rpc_subscription |
299 | 0 | .send_notification(methods::ServerToClient::chainHead_v1_followEvent { |
300 | 0 | subscription: (&json_rpc_subscription_id).into(), |
301 | 0 | result: methods::FollowEvent::Stop {}, |
302 | 0 | }) |
303 | 0 | .await; |
304 | | } |
305 | | } |
306 | | } |
307 | 0 | })); Unexecuted instantiation: _RNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00B9_ Unexecuted instantiation: _RNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00B9_ |
308 | 0 |
|
309 | 0 | return_value |
310 | 0 | } Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task0B7_ Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task0B7_ |
311 | | |
312 | 0 | fn convert_runtime_spec(runtime: &executor::CoreVersion) -> methods::MaybeRuntimeSpec { |
313 | 0 | let runtime = runtime.decode(); |
314 | 0 | methods::MaybeRuntimeSpec::Valid { |
315 | 0 | spec: methods::RuntimeSpec { |
316 | 0 | impl_name: runtime.impl_name.into(), |
317 | 0 | spec_name: runtime.spec_name.into(), |
318 | 0 | impl_version: runtime.impl_version, |
319 | 0 | spec_version: runtime.spec_version, |
320 | 0 | transaction_version: runtime.transaction_version, |
321 | 0 | apis: runtime |
322 | 0 | .apis |
323 | 0 | .map(|api| (methods::HexString(api.name_hash.to_vec()), api.version)) Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec0B7_ Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec0B7_ |
324 | 0 | .collect(), |
325 | 0 | }, |
326 | 0 | } |
327 | 0 | } Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec |