Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service/chain_head_subscriptions.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
// TODO: document
19
20
use futures_channel::oneshot;
21
use futures_lite::FutureExt as _;
22
use smol::stream::StreamExt as _;
23
use smoldot::{
24
    executor,
25
    json_rpc::{methods, service},
26
};
27
use std::{
28
    future::Future,
29
    num::NonZeroUsize,
30
    pin::{self, Pin},
31
    sync::Arc,
32
};
33
34
use crate::{consensus_service, database_thread, LogCallback};
35
36
pub struct Config {
37
    /// Function that can be used to spawn background tasks.
38
    ///
39
    /// The tasks passed as parameter must be executed until they shut down.
40
    pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
41
42
    /// Function called in order to notify of something.
43
    pub log_callback: Arc<dyn LogCallback + Send + Sync>,
44
45
    /// Receiver for actions that the JSON-RPC client wants to perform.
46
    pub receiver: async_channel::Receiver<Message>,
47
48
    /// `chainHead_v1_follow` subscription start handle.
49
    pub chain_head_follow_subscription: service::SubscriptionStartProcess,
50
51
    /// Parameter that was passed by the user when requesting `chainHead_v1_follow`.
52
    pub with_runtime: bool,
53
54
    /// Consensus service of the chain.
55
    pub consensus_service: Arc<consensus_service::ConsensusService>,
56
57
    /// Database to access blocks.
58
    pub database: Arc<database_thread::DatabaseThread>,
59
}
60
61
pub enum Message {
62
    Header {
63
        request: service::RequestProcess,
64
    },
65
    Unpin {
66
        block_hashes: Vec<[u8; 32]>,
67
        outcome: oneshot::Sender<Result<(), ()>>,
68
    },
69
}
70
71
/// Spawns a new tasks dedicated to handling a `chainHead_v1_follow` subscription.
72
///
73
/// Returns the identifier of the subscription.
74
0
pub async fn spawn_chain_head_subscription_task(config: Config) -> String {
Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task
Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task
75
0
    let mut json_rpc_subscription = config.chain_head_follow_subscription.accept();
76
0
    let json_rpc_subscription_id = json_rpc_subscription.subscription_id().to_owned();
77
0
    let return_value = json_rpc_subscription_id.clone();
78
0
79
0
    let tasks_executor = config.tasks_executor.clone();
80
0
    tasks_executor(Box::pin(async move {
81
0
        let consensus_service_subscription = config
82
0
            .consensus_service
83
0
            .subscribe_all(32, NonZeroUsize::new(32).unwrap())
84
0
            .await;
85
0
        let mut consensus_service_subscription_new_blocks =
86
0
            pin::pin!(consensus_service_subscription.new_blocks);
87
0
88
0
        let mut foreground_receiver = pin::pin!(config.receiver);
89
0
90
0
        let mut pinned_blocks =
91
0
            hashbrown::HashSet::with_capacity_and_hasher(32, fnv::FnvBuildHasher::default());
92
0
        let mut current_best_block = consensus_service_subscription.finalized_block_hash;
93
0
94
0
        pinned_blocks.insert(consensus_service_subscription.finalized_block_hash);
95
0
        json_rpc_subscription
96
0
            .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
97
0
                subscription: (&json_rpc_subscription_id).into(),
98
0
                result: methods::FollowEvent::Initialized {
99
0
                    finalized_block_hashes: vec![methods::HashHexString(
100
0
                        consensus_service_subscription.finalized_block_hash,
101
0
                    )],
102
0
                    finalized_block_runtime: if config.with_runtime {
103
0
                        Some(convert_runtime_spec(
104
0
                            consensus_service_subscription
105
0
                                .finalized_block_runtime
106
0
                                .runtime_version(),
107
0
                        ))
108
                    } else {
109
0
                        None
110
                    },
111
                },
112
            })
113
0
            .await;
114
115
0
        for block in consensus_service_subscription.non_finalized_blocks_ancestry_order {
116
0
            pinned_blocks.insert(block.block_hash);
117
0
            json_rpc_subscription
118
0
                .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
119
0
                    subscription: (&json_rpc_subscription_id).into(),
120
0
                    result: methods::FollowEvent::NewBlock {
121
0
                        block_hash: methods::HashHexString(block.block_hash),
122
0
                        new_runtime: if let (Some(new_runtime), true) =
123
0
                            (&block.runtime_update, config.with_runtime)
124
                        {
125
0
                            Some(convert_runtime_spec(new_runtime.runtime_version()))
126
                        } else {
127
0
                            None
128
                        },
129
0
                        parent_block_hash: methods::HashHexString(block.parent_hash),
130
                    },
131
                })
132
0
                .await;
133
134
0
            if block.is_new_best {
135
0
                current_best_block = block.block_hash;
136
0
                json_rpc_subscription
137
0
                    .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
138
0
                        subscription: (&json_rpc_subscription_id).into(),
139
0
                        result: methods::FollowEvent::BestBlockChanged {
140
0
                            best_block_hash: methods::HashHexString(block.block_hash),
141
0
                        },
142
0
                    })
143
0
                    .await;
144
0
            }
145
        }
146
147
        loop {
148
            enum WakeUpReason {
149
                ConsensusNotification(consensus_service::Notification),
150
                ConsensusSubscriptionStop,
151
                Foreground(Message),
152
                ForegroundClosed,
153
            }
154
155
0
            let wake_up_reason = async {
156
0
                consensus_service_subscription_new_blocks
157
0
                    .next()
158
0
                    .await
159
0
                    .map_or(
160
0
                        WakeUpReason::ConsensusSubscriptionStop,
161
0
                        WakeUpReason::ConsensusNotification,
162
0
                    )
163
0
            }
Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task000Bb_
Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task000Bb_
164
0
            .or(async {
165
0
                foreground_receiver
166
0
                    .next()
167
0
                    .await
168
0
                    .map_or(WakeUpReason::ForegroundClosed, WakeUpReason::Foreground)
169
0
            })
Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s_0Bb_
Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s_0Bb_
170
0
            .await;
171
172
0
            match wake_up_reason {
173
0
                WakeUpReason::ForegroundClosed => return,
174
0
                WakeUpReason::Foreground(Message::Header { request }) => {
175
0
                    let methods::MethodCall::chainHead_v1_header { hash, .. } = request.request()
176
                    else {
177
0
                        unreachable!()
178
                    };
179
180
0
                    if !pinned_blocks.contains(&hash.0) {
181
0
                        request.fail(service::ErrorResponse::InvalidParams);
182
0
                        continue;
183
0
                    }
184
185
0
                    let database_outcome = config
186
0
                        .database
187
0
                        .with_database(move |database| database.block_scale_encoded_header(&hash.0))
Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s0_0Bb_
Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s0_0Bb_
188
0
                        .await;
189
190
0
                    match database_outcome {
191
0
                        Ok(Some(header)) => {
192
0
                            request.respond(methods::Response::chainHead_v1_header(Some(
193
0
                                methods::HexString(header),
194
0
                            )))
195
                        }
196
0
                        Ok(None) => {
197
0
                            // Should never happen given that blocks are pinned.
198
0
                            // TODO: log the problem
199
0
                            request.fail(service::ErrorResponse::InternalError);
200
0
                        }
201
0
                        Err(_) => {
202
0
                            // TODO: log the problem
203
0
                            request.fail(service::ErrorResponse::InternalError);
204
0
                        }
205
                    }
206
                }
207
                WakeUpReason::Foreground(Message::Unpin {
208
0
                    block_hashes,
209
0
                    outcome,
210
0
                }) => {
211
0
                    if block_hashes.iter().any(|h| !pinned_blocks.contains(h)) {
Unexecuted instantiation: _RNCNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s1_0Bb_
Unexecuted instantiation: _RNCNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00s1_0Bb_
212
0
                        let _ = outcome.send(Err(()));
213
0
                    } else {
214
0
                        for block_hash in block_hashes {
215
0
                            pinned_blocks.remove(&block_hash);
216
0
                            config
217
0
                                .consensus_service
218
0
                                .unpin_block(consensus_service_subscription.id, block_hash)
219
0
                                .await;
220
                        }
221
0
                        let _ = outcome.send(Ok(()));
222
                    }
223
                }
224
                WakeUpReason::ConsensusNotification(consensus_service::Notification::Block {
225
0
                    block,
226
0
                    ..
227
0
                }) => {
228
0
                    pinned_blocks.insert(block.block_hash);
229
0
                    json_rpc_subscription
230
0
                        .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
231
0
                            subscription: (&json_rpc_subscription_id).into(),
232
0
                            result: methods::FollowEvent::NewBlock {
233
0
                                block_hash: methods::HashHexString(block.block_hash),
234
0
                                new_runtime: if let (Some(new_runtime), true) =
235
0
                                    (&block.runtime_update, config.with_runtime)
236
                                {
237
0
                                    Some(convert_runtime_spec(new_runtime.runtime_version()))
238
                                } else {
239
0
                                    None
240
                                },
241
0
                                parent_block_hash: methods::HashHexString(block.parent_hash),
242
                            },
243
                        })
244
0
                        .await;
245
246
0
                    if block.is_new_best {
247
0
                        current_best_block = block.block_hash;
248
0
                        json_rpc_subscription
249
0
                            .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
250
0
                                subscription: (&json_rpc_subscription_id).into(),
251
0
                                result: methods::FollowEvent::BestBlockChanged {
252
0
                                    best_block_hash: methods::HashHexString(block.block_hash),
253
0
                                },
254
0
                            })
255
0
                            .await;
256
0
                    }
257
                }
258
                WakeUpReason::ConsensusNotification(
259
                    consensus_service::Notification::Finalized {
260
0
                        finalized_blocks_newest_to_oldest,
261
0
                        pruned_blocks_hashes,
262
0
                        best_block_hash,
263
0
                    },
264
0
                ) => {
265
0
                    json_rpc_subscription
266
0
                        .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
267
0
                            subscription: (&json_rpc_subscription_id).into(),
268
0
                            result: methods::FollowEvent::Finalized {
269
0
                                // As specified in the JSON-RPC spec, the list must be ordered
270
0
                                // in increasing block number. Consequently we have to reverse
271
0
                                // the list.
272
0
                                finalized_blocks_hashes: finalized_blocks_newest_to_oldest
273
0
                                    .into_iter()
274
0
                                    .map(methods::HashHexString)
275
0
                                    .rev()
276
0
                                    .collect(),
277
0
                                pruned_blocks_hashes: pruned_blocks_hashes
278
0
                                    .into_iter()
279
0
                                    .map(methods::HashHexString)
280
0
                                    .collect(),
281
0
                            },
282
0
                        })
283
0
                        .await;
284
285
0
                    if best_block_hash != current_best_block {
286
0
                        current_best_block = best_block_hash;
287
0
                        json_rpc_subscription
288
0
                            .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
289
0
                                subscription: (&json_rpc_subscription_id).into(),
290
0
                                result: methods::FollowEvent::BestBlockChanged {
291
0
                                    best_block_hash: methods::HashHexString(best_block_hash),
292
0
                                },
293
0
                            })
294
0
                            .await;
295
0
                    }
296
                }
297
                WakeUpReason::ConsensusSubscriptionStop => {
298
0
                    json_rpc_subscription
299
0
                        .send_notification(methods::ServerToClient::chainHead_v1_followEvent {
300
0
                            subscription: (&json_rpc_subscription_id).into(),
301
0
                            result: methods::FollowEvent::Stop {},
302
0
                        })
303
0
                        .await;
304
                }
305
            }
306
        }
307
0
    }));
Unexecuted instantiation: _RNCNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00B9_
Unexecuted instantiation: _RNCNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task00B9_
308
0
309
0
    return_value
310
0
}
Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task0B7_
Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions34spawn_chain_head_subscription_task0B7_
311
312
0
fn convert_runtime_spec(runtime: &executor::CoreVersion) -> methods::MaybeRuntimeSpec {
313
0
    let runtime = runtime.decode();
314
0
    methods::MaybeRuntimeSpec::Valid {
315
0
        spec: methods::RuntimeSpec {
316
0
            impl_name: runtime.impl_name.into(),
317
0
            spec_name: runtime.spec_name.into(),
318
0
            impl_version: runtime.impl_version,
319
0
            spec_version: runtime.spec_version,
320
0
            transaction_version: runtime.transaction_version,
321
0
            apis: runtime
322
0
                .apis
323
0
                .map(|api| (methods::HexString(api.name_hash.to_vec()), api.version))
Unexecuted instantiation: _RNCNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec0B7_
Unexecuted instantiation: _RNCNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec0B7_
324
0
                .collect(),
325
0
        },
326
0
    }
327
0
}
Unexecuted instantiation: _RNvNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec
Unexecuted instantiation: _RNvNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24chain_head_subscriptions20convert_runtime_spec