Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service/legacy_api_subscriptions.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use hashbrown::HashMap;
19
use smol::stream::StreamExt as _;
20
use smoldot::{
21
    chain::fork_tree,
22
    executor::{host::HostVmPrototype, CoreVersion},
23
    trie,
24
};
25
use std::{collections::BTreeSet, iter, mem, num::NonZeroUsize, ops, pin::Pin, sync::Arc};
26
27
use crate::{consensus_service, database_thread};
28
29
/// Helper that provides the blocks of a `chain_subscribeAllHeads` subscription.
30
pub struct SubscribeAllHeads {
31
    consensus_service: Arc<consensus_service::ConsensusService>,
32
33
    /// Active subscription to the consensus service blocks. `None` if not subscribed yet or if
34
    /// the subscription has stopped.
35
    subscription: Option<SubscribeAllHeadsSubscription>,
36
}
37
38
struct SubscribeAllHeadsSubscription {
39
    subscription_id: consensus_service::SubscriptionId,
40
    new_blocks: Pin<Box<async_channel::Receiver<consensus_service::Notification>>>,
41
    blocks_to_unpin: Vec<[u8; 32]>,
42
}
43
44
impl SubscribeAllHeads {
45
    /// Builds a new [`SubscribeAllHeads`].
46
0
    pub fn new(consensus_service: Arc<consensus_service::ConsensusService>) -> Self {
47
0
        SubscribeAllHeads {
48
0
            consensus_service,
49
0
            subscription: None,
50
0
        }
51
0
    }
Unexecuted instantiation: _RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB2_17SubscribeAllHeads3new
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB2_17SubscribeAllHeads3new
52
53
    /// Returns the SCALE-encoded header of the next block to provide as part of the subscription.
54
0
    pub async fn next_scale_encoded_header(&mut self) -> Vec<u8> {
Unexecuted instantiation: _RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB2_17SubscribeAllHeads25next_scale_encoded_header
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB2_17SubscribeAllHeads25next_scale_encoded_header
55
        loop {
56
0
            let subscription = match &mut self.subscription {
57
0
                Some(s) => s,
58
                None => {
59
0
                    let subscribe_all = self
60
0
                        .consensus_service
61
0
                        .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
62
0
                        .await;
63
64
0
                    let blocks_to_unpin = iter::once(subscribe_all.finalized_block_hash)
65
0
                        .chain(
66
0
                            subscribe_all
67
0
                                .non_finalized_blocks_ancestry_order
68
0
                                .into_iter()
69
0
                                .map(|b| b.block_hash),
Unexecuted instantiation: _RNCNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB6_17SubscribeAllHeads25next_scale_encoded_header00Ba_
Unexecuted instantiation: _RNCNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB6_17SubscribeAllHeads25next_scale_encoded_header00Ba_
70
0
                        )
71
0
                        .collect();
72
0
73
0
                    self.subscription.insert(SubscribeAllHeadsSubscription {
74
0
                        subscription_id: subscribe_all.id,
75
0
                        new_blocks: Box::pin(subscribe_all.new_blocks),
76
0
                        blocks_to_unpin,
77
0
                    })
78
                }
79
            };
80
81
0
            while let Some(block_to_unpin) = subscription.blocks_to_unpin.last() {
82
0
                self.consensus_service
83
0
                    .unpin_block(subscription.subscription_id, *block_to_unpin)
84
0
                    .await;
85
0
                let _ = subscription.blocks_to_unpin.pop();
86
            }
87
88
            loop {
89
0
                match subscription.new_blocks.next().await {
90
                    None => {
91
0
                        self.subscription = None;
92
0
                        break;
93
                    }
94
0
                    Some(consensus_service::Notification::Block { block, .. }) => {
95
0
                        subscription.blocks_to_unpin.push(block.block_hash);
96
0
                        return block.scale_encoded_header;
97
                    }
98
0
                    Some(consensus_service::Notification::Finalized { .. }) => {
99
0
                        // Ignore event.
100
0
                    }
101
                }
102
            }
103
        }
104
0
    }
Unexecuted instantiation: _RNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_17SubscribeAllHeads25next_scale_encoded_header0B8_
Unexecuted instantiation: _RNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_17SubscribeAllHeads25next_scale_encoded_header0B8_
105
}
106
107
/// Helper that provides the blocks of a `chain_subscribeFinalizedHeads` subscription.
108
pub struct SubscribeFinalizedHeads {
109
    consensus_service: Arc<consensus_service::ConsensusService>,
110
111
    /// Active subscription to the consensus service blocks. `None` if not subscribed yet or if
112
    /// the subscription has stopped.
113
    subscription: Option<SubscribeFinalizedHeadsSubscription>,
114
}
115
116
struct SubscribeFinalizedHeadsSubscription {
117
    subscription_id: consensus_service::SubscriptionId,
118
    new_blocks: Pin<Box<async_channel::Receiver<consensus_service::Notification>>>,
119
    pinned_blocks: HashMap<[u8; 32], Vec<u8>>,
120
    blocks_to_unpin: Vec<[u8; 32]>,
121
}
122
123
impl SubscribeFinalizedHeads {
124
    /// Builds a new [`SubscribeFinalizedHeads`].
125
0
    pub fn new(consensus_service: Arc<consensus_service::ConsensusService>) -> Self {
126
0
        SubscribeFinalizedHeads {
127
0
            consensus_service,
128
0
            subscription: None,
129
0
        }
130
0
    }
Unexecuted instantiation: _RNvMs_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_23SubscribeFinalizedHeads3new
Unexecuted instantiation: _RNvMs_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_23SubscribeFinalizedHeads3new
131
132
    /// Returns the SCALE-encoded header of the next block to provide as part of the subscription.
133
0
    pub async fn next_scale_encoded_header(&mut self) -> Vec<u8> {
Unexecuted instantiation: _RNvMs_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_23SubscribeFinalizedHeads25next_scale_encoded_header
Unexecuted instantiation: _RNvMs_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB4_23SubscribeFinalizedHeads25next_scale_encoded_header
134
        loop {
135
0
            let subscription = match &mut self.subscription {
136
0
                Some(s) => s,
137
                None => {
138
0
                    let subscribe_all = self
139
0
                        .consensus_service
140
0
                        .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
141
0
                        .await;
142
143
0
                    let mut pinned_blocks = HashMap::with_capacity(
144
0
                        subscribe_all.non_finalized_blocks_ancestry_order.len() + 1 + 8,
145
0
                    );
146
0
                    for block in subscribe_all.non_finalized_blocks_ancestry_order {
147
0
                        pinned_blocks.insert(block.block_hash, block.scale_encoded_header);
148
0
                    }
149
150
0
                    let mut blocks_to_unpin = Vec::with_capacity(8);
151
0
                    blocks_to_unpin.push(subscribe_all.finalized_block_hash);
152
0
153
0
                    self.subscription = Some(SubscribeFinalizedHeadsSubscription {
154
0
                        subscription_id: subscribe_all.id,
155
0
                        new_blocks: Box::pin(subscribe_all.new_blocks),
156
0
                        pinned_blocks,
157
0
                        blocks_to_unpin,
158
0
                    });
159
0
160
0
                    return subscribe_all.finalized_block_scale_encoded_header;
161
                }
162
            };
163
164
0
            while let Some(block_to_unpin) = subscription.blocks_to_unpin.last() {
165
0
                self.consensus_service
166
0
                    .unpin_block(subscription.subscription_id, *block_to_unpin)
167
0
                    .await;
168
0
                let _ = subscription.blocks_to_unpin.pop();
169
            }
170
171
            loop {
172
0
                match subscription.new_blocks.next().await {
173
                    None => {
174
0
                        self.subscription = None;
175
0
                        break;
176
                    }
177
0
                    Some(consensus_service::Notification::Block { block, .. }) => {
178
0
                        subscription
179
0
                            .pinned_blocks
180
0
                            .insert(block.block_hash, block.scale_encoded_header);
181
0
                    }
182
                    Some(consensus_service::Notification::Finalized {
183
0
                        finalized_blocks_newest_to_oldest,
184
0
                        pruned_blocks_hashes,
185
0
                        ..
186
0
                    }) => {
187
0
                        debug_assert!(!finalized_blocks_newest_to_oldest.is_empty());
188
0
                        let finalized_block_hash =
189
0
                            *finalized_blocks_newest_to_oldest.first().unwrap();
190
0
                        subscription.blocks_to_unpin.push(finalized_block_hash);
191
0
                        let finalized_block_header = subscription
192
0
                            .pinned_blocks
193
0
                            .remove(&finalized_block_hash)
194
0
                            .unwrap();
195
196
0
                        for block in pruned_blocks_hashes {
197
0
                            subscription.blocks_to_unpin.push(block);
198
0
                            let _was_in = subscription.pinned_blocks.remove(&block);
199
0
                            debug_assert!(_was_in.is_some());
200
                        }
201
202
0
                        for block in finalized_blocks_newest_to_oldest.into_iter().skip(1) {
203
0
                            subscription.blocks_to_unpin.push(block);
204
0
                            let _was_in = subscription.pinned_blocks.remove(&block);
205
0
                            debug_assert!(_was_in.is_some());
206
                        }
207
208
0
                        return finalized_block_header;
209
                    }
210
                }
211
            }
212
        }
213
0
    }
Unexecuted instantiation: _RNCNvMs_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB6_23SubscribeFinalizedHeads25next_scale_encoded_header0Ba_
Unexecuted instantiation: _RNCNvMs_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB6_23SubscribeFinalizedHeads25next_scale_encoded_header0Ba_
214
}
215
216
/// Helper that provides the blocks of a `chain_subscribeNewHeads` subscription.
217
pub struct SubscribeNewHeads {
218
    consensus_service: Arc<consensus_service::ConsensusService>,
219
220
    /// Active subscription to the consensus service blocks. `None` if not subscribed yet or if
221
    /// the subscription has stopped.
222
    subscription: Option<SubscribeNewHeadsSubscription>,
223
}
224
225
struct SubscribeNewHeadsSubscription {
226
    subscription_id: consensus_service::SubscriptionId,
227
    new_blocks: Pin<Box<async_channel::Receiver<consensus_service::Notification>>>,
228
    pinned_blocks: HashMap<[u8; 32], Vec<u8>>,
229
    blocks_to_unpin: Vec<[u8; 32]>,
230
    current_best_block_hash: [u8; 32],
231
}
232
233
impl SubscribeNewHeads {
234
    /// Builds a new [`SubscribeNewHeads`].
235
0
    pub fn new(consensus_service: Arc<consensus_service::ConsensusService>) -> Self {
236
0
        SubscribeNewHeads {
237
0
            consensus_service,
238
0
            subscription: None,
239
0
        }
240
0
    }
Unexecuted instantiation: _RNvMs0_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_17SubscribeNewHeads3new
Unexecuted instantiation: _RNvMs0_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_17SubscribeNewHeads3new
241
242
    /// Returns the SCALE-encoded header of the next block to provide as part of the subscription.
243
0
    pub async fn next_scale_encoded_header(&mut self) -> &Vec<u8> {
Unexecuted instantiation: _RNvMs0_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_17SubscribeNewHeads25next_scale_encoded_header
Unexecuted instantiation: _RNvMs0_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_17SubscribeNewHeads25next_scale_encoded_header
244
        // Note: this function is convoluted with many unwraps due to a difficult fight with the
245
        // Rust borrow checker.
246
247
0
        loop {
248
0
            if self.subscription.is_none() {
249
0
                let subscribe_all = self
250
0
                    .consensus_service
251
0
                    .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
252
0
                    .await;
253
254
0
                let mut pinned_blocks = HashMap::with_capacity(
255
0
                    subscribe_all.non_finalized_blocks_ancestry_order.len() + 1 + 8,
256
0
                );
257
0
258
0
                let mut current_best_block_hash = subscribe_all.finalized_block_hash;
259
0
260
0
                pinned_blocks.insert(
261
0
                    subscribe_all.finalized_block_hash,
262
0
                    subscribe_all.finalized_block_scale_encoded_header,
263
0
                );
264
265
0
                for block in subscribe_all.non_finalized_blocks_ancestry_order {
266
0
                    pinned_blocks.insert(block.block_hash, block.scale_encoded_header);
267
0
                    if block.is_new_best {
268
0
                        current_best_block_hash = block.block_hash;
269
0
                    }
270
                }
271
272
0
                let subscription = self.subscription.insert(SubscribeNewHeadsSubscription {
273
0
                    subscription_id: subscribe_all.id,
274
0
                    new_blocks: Box::pin(subscribe_all.new_blocks),
275
0
                    pinned_blocks,
276
0
                    blocks_to_unpin: Vec::with_capacity(8),
277
0
                    current_best_block_hash,
278
0
                });
279
0
280
0
                return subscription
281
0
                    .pinned_blocks
282
0
                    .get(&subscription.current_best_block_hash)
283
0
                    .unwrap();
284
0
            }
285
0
286
0
            {
287
0
                let subscription = self.subscription.as_mut().unwrap();
288
0
                while let Some(block_to_unpin) = subscription.blocks_to_unpin.last() {
289
0
                    self.consensus_service
290
0
                        .unpin_block(subscription.subscription_id, *block_to_unpin)
291
0
                        .await;
292
0
                    let _ = subscription.blocks_to_unpin.pop();
293
                }
294
            }
295
296
            loop {
297
0
                let notification = self.subscription.as_mut().unwrap().new_blocks.next().await;
298
0
                let Some(notification) = notification else {
299
0
                    self.subscription = None;
300
0
                    break;
301
                };
302
303
0
                match notification {
304
0
                    consensus_service::Notification::Block { block, .. } => {
305
0
                        let _previous_value = self
306
0
                            .subscription
307
0
                            .as_mut()
308
0
                            .unwrap()
309
0
                            .pinned_blocks
310
0
                            .insert(block.block_hash, block.scale_encoded_header);
311
0
                        debug_assert!(_previous_value.is_none());
312
313
0
                        if block.is_new_best {
314
0
                            self.subscription.as_mut().unwrap().current_best_block_hash =
315
0
                                block.block_hash;
316
0
                            return self
317
0
                                .subscription
318
0
                                .as_mut()
319
0
                                .unwrap()
320
0
                                .pinned_blocks
321
0
                                .get(&block.block_hash)
322
0
                                .unwrap();
323
0
                        }
324
                    }
325
                    consensus_service::Notification::Finalized {
326
0
                        pruned_blocks_hashes,
327
0
                        finalized_blocks_newest_to_oldest,
328
0
                        best_block_hash,
329
                    } => {
330
0
                        for hash in pruned_blocks_hashes {
331
0
                            self.subscription
332
0
                                .as_mut()
333
0
                                .unwrap()
334
0
                                .blocks_to_unpin
335
0
                                .push(hash);
336
0
                            let _was_in = self
337
0
                                .subscription
338
0
                                .as_mut()
339
0
                                .unwrap()
340
0
                                .pinned_blocks
341
0
                                .remove(&hash);
342
0
                            debug_assert!(_was_in.is_some());
343
                        }
344
345
0
                        for hash in finalized_blocks_newest_to_oldest.iter().skip(1) {
346
0
                            self.subscription
347
0
                                .as_mut()
348
0
                                .unwrap()
349
0
                                .blocks_to_unpin
350
0
                                .push(*hash);
351
0
                            let _was_in = self
352
0
                                .subscription
353
0
                                .as_mut()
354
0
                                .unwrap()
355
0
                                .pinned_blocks
356
0
                                .remove(hash);
357
0
                            debug_assert!(_was_in.is_some());
358
                        }
359
360
0
                        if best_block_hash
361
0
                            != self.subscription.as_mut().unwrap().current_best_block_hash
362
                        {
363
0
                            self.subscription.as_mut().unwrap().current_best_block_hash =
364
0
                                best_block_hash;
365
0
                            return self
366
0
                                .subscription
367
0
                                .as_mut()
368
0
                                .unwrap()
369
0
                                .pinned_blocks
370
0
                                .get(&best_block_hash)
371
0
                                .unwrap();
372
0
                        }
373
                    }
374
                }
375
            }
376
        }
377
0
    }
Unexecuted instantiation: _RNCNvMs0_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_17SubscribeNewHeads25next_scale_encoded_header0Bb_
Unexecuted instantiation: _RNCNvMs0_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_17SubscribeNewHeads25next_scale_encoded_header0Bb_
378
}
379
380
/// Helper that provides the blocks of a `state_subscribeRuntimeVersion` subscription.
381
pub struct SubscribeRuntimeVersion {
382
    consensus_service: Arc<consensus_service::ConsensusService>,
383
384
    /// Active subscription to the consensus service blocks. `None` if not subscribed yet or if
385
    /// the subscription has stopped.
386
    subscription: Option<SubscribeRuntimeVersionSubscription>,
387
}
388
389
struct SubscribeRuntimeVersionSubscription {
390
    subscription_id: consensus_service::SubscriptionId,
391
    new_blocks: Pin<Box<async_channel::Receiver<consensus_service::Notification>>>,
392
    pinned_blocks: HashMap<[u8; 32], Arc<HostVmPrototype>>,
393
    blocks_to_unpin: Vec<[u8; 32]>,
394
    current_best_block_hash: [u8; 32],
395
    current_best_block_runtime: Arc<HostVmPrototype>,
396
}
397
398
impl SubscribeRuntimeVersion {
399
    /// Builds a new [`SubscribeRuntimeVersion`].
400
0
    pub fn new(consensus_service: Arc<consensus_service::ConsensusService>) -> Self {
401
0
        SubscribeRuntimeVersion {
402
0
            consensus_service,
403
0
            subscription: None,
404
0
        }
405
0
    }
Unexecuted instantiation: _RNvMs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_23SubscribeRuntimeVersion3new
Unexecuted instantiation: _RNvMs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_23SubscribeRuntimeVersion3new
406
407
    /// Returns the next runtime version to provide as part of the subscription.
408
0
    pub async fn next_runtime_version(&mut self) -> &CoreVersion {
Unexecuted instantiation: _RNvMs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_23SubscribeRuntimeVersion20next_runtime_version
Unexecuted instantiation: _RNvMs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_23SubscribeRuntimeVersion20next_runtime_version
409
        // Note: this function is convoluted with many unwraps due to a difficult fight with the
410
        // Rust borrow checker.
411
412
0
        loop {
413
0
            if self.subscription.is_none() {
414
0
                let subscribe_all = self
415
0
                    .consensus_service
416
0
                    .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
417
0
                    .await;
418
419
0
                let mut pinned_blocks = HashMap::with_capacity(
420
0
                    subscribe_all.non_finalized_blocks_ancestry_order.len() + 1 + 8,
421
0
                );
422
0
423
0
                let mut current_best_block_hash = subscribe_all.finalized_block_hash;
424
0
                let mut current_best_block_runtime = subscribe_all.finalized_block_runtime.clone();
425
0
426
0
                pinned_blocks.insert(
427
0
                    subscribe_all.finalized_block_hash,
428
0
                    subscribe_all.finalized_block_runtime,
429
0
                );
430
431
0
                for block in subscribe_all.non_finalized_blocks_ancestry_order {
432
0
                    let runtime = block
433
0
                        .runtime_update
434
0
                        .unwrap_or_else(|| pinned_blocks.get(&block.parent_hash).unwrap().clone());
Unexecuted instantiation: _RNCNCNvMs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_23SubscribeRuntimeVersion20next_runtime_version00Bd_
Unexecuted instantiation: _RNCNCNvMs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_23SubscribeRuntimeVersion20next_runtime_version00Bd_
435
0
                    if block.is_new_best {
436
0
                        current_best_block_hash = block.block_hash;
437
0
                        current_best_block_runtime = runtime.clone();
438
0
                    }
439
0
                    pinned_blocks.insert(block.block_hash, runtime);
440
                }
441
442
0
                let subscription = self
443
0
                    .subscription
444
0
                    .insert(SubscribeRuntimeVersionSubscription {
445
0
                        subscription_id: subscribe_all.id,
446
0
                        new_blocks: Box::pin(subscribe_all.new_blocks),
447
0
                        pinned_blocks,
448
0
                        blocks_to_unpin: Vec::with_capacity(8),
449
0
                        current_best_block_hash,
450
0
                        current_best_block_runtime: current_best_block_runtime.clone(),
451
0
                    });
452
0
453
0
                return subscription.current_best_block_runtime.runtime_version();
454
0
            }
455
0
456
0
            {
457
0
                let subscription = self.subscription.as_mut().unwrap();
458
0
                while let Some(block_to_unpin) = subscription.blocks_to_unpin.last() {
459
0
                    self.consensus_service
460
0
                        .unpin_block(subscription.subscription_id, *block_to_unpin)
461
0
                        .await;
462
0
                    let _ = subscription.blocks_to_unpin.pop();
463
                }
464
            }
465
466
            loop {
467
0
                let notification = self.subscription.as_mut().unwrap().new_blocks.next().await;
468
0
                let Some(notification) = notification else {
469
0
                    self.subscription = None;
470
0
                    break;
471
                };
472
473
0
                match notification {
474
0
                    consensus_service::Notification::Block { block, .. } => {
475
0
                        let runtime = block.runtime_update.unwrap_or_else(|| {
476
0
                            self.subscription
477
0
                                .as_ref()
478
0
                                .unwrap()
479
0
                                .pinned_blocks
480
0
                                .get(&block.parent_hash)
481
0
                                .unwrap()
482
0
                                .clone()
483
0
                        });
Unexecuted instantiation: _RNCNCNvMs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_23SubscribeRuntimeVersion20next_runtime_version0s_0Bd_
Unexecuted instantiation: _RNCNCNvMs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_23SubscribeRuntimeVersion20next_runtime_version0s_0Bd_
484
0
485
0
                        let _previous_value = self
486
0
                            .subscription
487
0
                            .as_mut()
488
0
                            .unwrap()
489
0
                            .pinned_blocks
490
0
                            .insert(block.block_hash, runtime.clone());
491
0
                        debug_assert!(_previous_value.is_none());
492
493
0
                        if block.is_new_best {
494
0
                            self.subscription.as_mut().unwrap().current_best_block_hash =
495
0
                                block.block_hash;
496
0
                            if !Arc::ptr_eq(
497
0
                                &self
498
0
                                    .subscription
499
0
                                    .as_mut()
500
0
                                    .unwrap()
501
0
                                    .current_best_block_runtime,
502
0
                                &runtime,
503
0
                            ) {
504
0
                                self.subscription
505
0
                                    .as_mut()
506
0
                                    .unwrap()
507
0
                                    .current_best_block_runtime = runtime;
508
0
                                return self
509
0
                                    .subscription
510
0
                                    .as_mut()
511
0
                                    .unwrap()
512
0
                                    .current_best_block_runtime
513
0
                                    .runtime_version();
514
0
                            }
515
0
                        }
516
                    }
517
                    consensus_service::Notification::Finalized {
518
0
                        pruned_blocks_hashes,
519
0
                        finalized_blocks_newest_to_oldest,
520
0
                        best_block_hash,
521
                    } => {
522
0
                        for hash in pruned_blocks_hashes {
523
0
                            self.subscription
524
0
                                .as_mut()
525
0
                                .unwrap()
526
0
                                .blocks_to_unpin
527
0
                                .push(hash);
528
0
                            let _was_in = self
529
0
                                .subscription
530
0
                                .as_mut()
531
0
                                .unwrap()
532
0
                                .pinned_blocks
533
0
                                .remove(&hash);
534
0
                            debug_assert!(_was_in.is_some());
535
                        }
536
537
0
                        for hash in finalized_blocks_newest_to_oldest.iter().skip(1) {
538
0
                            self.subscription
539
0
                                .as_mut()
540
0
                                .unwrap()
541
0
                                .blocks_to_unpin
542
0
                                .push(*hash);
543
0
                            let _was_in = self
544
0
                                .subscription
545
0
                                .as_mut()
546
0
                                .unwrap()
547
0
                                .pinned_blocks
548
0
                                .remove(hash);
549
0
                            debug_assert!(_was_in.is_some());
550
                        }
551
552
0
                        if best_block_hash
553
0
                            != self.subscription.as_mut().unwrap().current_best_block_hash
554
                        {
555
0
                            self.subscription.as_mut().unwrap().current_best_block_hash =
556
0
                                best_block_hash;
557
0
                            let new_best_runtime = self
558
0
                                .subscription
559
0
                                .as_ref()
560
0
                                .unwrap()
561
0
                                .pinned_blocks
562
0
                                .get(&best_block_hash)
563
0
                                .unwrap()
564
0
                                .clone();
565
0
                            if !Arc::ptr_eq(
566
0
                                &self
567
0
                                    .subscription
568
0
                                    .as_mut()
569
0
                                    .unwrap()
570
0
                                    .current_best_block_runtime,
571
0
                                &new_best_runtime,
572
0
                            ) {
573
0
                                self.subscription
574
0
                                    .as_mut()
575
0
                                    .unwrap()
576
0
                                    .current_best_block_runtime = new_best_runtime;
577
0
                                return self
578
0
                                    .subscription
579
0
                                    .as_mut()
580
0
                                    .unwrap()
581
0
                                    .current_best_block_runtime
582
0
                                    .runtime_version();
583
0
                            }
584
0
                        }
585
                    }
586
                }
587
            }
588
        }
589
0
    }
Unexecuted instantiation: _RNCNvMs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_23SubscribeRuntimeVersion20next_runtime_version0Bb_
Unexecuted instantiation: _RNCNvMs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_23SubscribeRuntimeVersion20next_runtime_version0Bb_
590
}
591
592
/// Helper that provides the blocks of a `state_subscribeStorage` subscription.
593
///
594
/// Note that various corner cases are weirdly handled, due to `state_subscribeStorage` not being
595
/// properly defined anyway.
596
pub struct SubscribeStorage {
597
    /// Consensus service that was passed to [`SubscribeStorage::new`].
598
    consensus_service: Arc<consensus_service::ConsensusService>,
599
    /// Database that was passed to [`SubscribeStorage::new`].
600
    database: Arc<database_thread::DatabaseThread>,
601
    /// List of keys that was passed to [`SubscribeStorage::new`].
602
    keys: Vec<Vec<u8>>,
603
604
    /// Active subscription to the consensus service blocks. `None` if not subscribed yet or if
605
    /// the subscription has stopped.
606
    subscription: Option<SubscribeStorageSubscription>,
607
}
608
609
struct SubscribeStorageSubscription {
610
    /// Next changes report currently being prepared.
611
    new_report_preparation: Vec<(Vec<u8>, Option<Vec<u8>>)>,
612
    /// List of keys that remain to be included
613
    /// in [`SubscribeStorageSubscription::new_report_preparation`].
614
    new_report_remaining_keys: hashbrown::HashSet<Vec<u8>, fnv::FnvBuildHasher>,
615
616
    /// Identifier of the subscription towards the consensus service.
617
    subscription_id: consensus_service::SubscriptionId,
618
    /// Channel connected to the consensus service where notifications are received.
619
    new_blocks: Pin<Box<async_channel::Receiver<consensus_service::Notification>>>,
620
    /// List of block hashes that are still pinned but are not necessary anymore and should be
621
    /// unpinned.
622
    blocks_to_unpin: Vec<[u8; 32]>,
623
624
    /// Tree of all pinned blocks. Doesn't include the current finalized block.
625
    pinned_blocks: fork_tree::ForkTree<[u8; 32]>,
626
    /// Content of [`SubscribeStorageSubscription::pinned_blocks`], indexed by block hashes.
627
    pinned_blocks_by_hash: hashbrown::HashMap<[u8; 32], fork_tree::NodeIndex>,
628
    /// Contains all the storage changes related to the keys found by [`SubscribeStorage::keys`]
629
    /// (or all keys if subscribing to all keys) made in the pinned blocks found
630
    /// in [`SubscribeStorageSubscription::pinned_blocks`].
631
    ///
632
    /// Because the storage changes of blocks that were already present at the time when the
633
    /// subscription starts are unknown, they are also not in this list. This leads to corner
634
    /// cases where some changes aren't provided, but we don't really care
635
    /// as `state_subscribeStorage` is not properly defined anyway.
636
    pinned_blocks_storage_changes: BTreeSet<(fork_tree::NodeIndex, Vec<u8>)>,
637
    /// Hash of the current finalized block. Not found
638
    /// in [`SubscribeStorageSubscription::pinned_blocks`].
639
    current_finalized_block_hash: [u8; 32],
640
    /// Index of the current  best block within [`SubscribeStorageSubscription::pinned_blocks`],
641
    /// or `None` if the best block is equal to the finalized block.
642
    current_best_block_index: Option<fork_tree::NodeIndex>,
643
}
644
645
impl SubscribeStorage {
646
    /// Builds a new [`SubscribeStorage`].
647
    ///
648
    /// If the list of keys is empty, then all storage changes are reported, in accordance to the
649
    /// behavior of `state_subscribeStorage`.
650
0
    pub fn new(
651
0
        consensus_service: Arc<consensus_service::ConsensusService>,
652
0
        database: Arc<database_thread::DatabaseThread>,
653
0
        subscribed_keys: Vec<Vec<u8>>,
654
0
    ) -> Self {
655
0
        SubscribeStorage {
656
0
            consensus_service,
657
0
            database,
658
0
            keys: subscribed_keys,
659
0
            subscription: None,
660
0
        }
661
0
    }
Unexecuted instantiation: _RNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_16SubscribeStorage3new
Unexecuted instantiation: _RNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_16SubscribeStorage3new
662
663
    /// Returns the next storage change notification.
664
0
    pub async fn next_storage_update(
665
0
        &'_ mut self,
666
0
    ) -> (
667
0
        [u8; 32],
668
0
        impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)> + '_,
669
0
    ) {
Unexecuted instantiation: _RNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_16SubscribeStorage19next_storage_update
Unexecuted instantiation: _RNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB5_16SubscribeStorage19next_storage_update
670
        'main_subscription: loop {
671
            // Get the active consensus service subscription, or subscribe if necessary.
672
0
            let subscription = match &mut self.subscription {
673
0
                Some(s) => s,
674
0
                subscription @ None => {
675
0
                    let subscribe_all = self
676
0
                        .consensus_service
677
0
                        .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
678
0
                        .await;
679
680
0
                    let mut pinned_blocks_by_hash = HashMap::with_capacity(
681
0
                        subscribe_all.non_finalized_blocks_ancestry_order.len() + 1 + 8,
682
0
                    );
683
0
                    let mut pinned_blocks =
684
0
                        fork_tree::ForkTree::with_capacity(pinned_blocks_by_hash.capacity());
685
0
686
0
                    let mut current_best_block_index = None;
687
688
0
                    for block in subscribe_all.non_finalized_blocks_ancestry_order {
689
0
                        let node_index = pinned_blocks.insert(
690
0
                            if block.parent_hash != subscribe_all.finalized_block_hash {
691
0
                                Some(*pinned_blocks_by_hash.get(&block.parent_hash).unwrap())
692
                            } else {
693
0
                                None
694
                            },
695
0
                            block.block_hash,
696
0
                        );
697
0
                        pinned_blocks_by_hash.insert(block.block_hash, node_index);
698
0
                        if block.is_new_best {
699
0
                            current_best_block_index = Some(node_index);
700
0
                        }
701
                    }
702
703
0
                    subscription.insert(SubscribeStorageSubscription {
704
0
                        new_report_preparation: Vec::with_capacity(self.keys.len()),
705
0
                        // We put all the keys in `new_report_remaining_keys`, as we must indicate
706
0
                        // the initial values of all the keys.
707
0
                        // If the list of keys is empty (meaning that the API user wants to
708
0
                        // subscribe to all keys), this will intentionally not lead to any report,
709
0
                        // as said report would be huge.
710
0
                        // It is unclear how `state_subscribeStorage` is supposed to behave in
711
0
                        // that situation.
712
0
                        //
713
0
                        // Also note that this initial report will happen after a re-subscription,
714
0
                        // in order to not miss a storage change.
715
0
                        new_report_remaining_keys: self.keys.iter().cloned().collect(),
716
0
                        subscription_id: subscribe_all.id,
717
0
                        new_blocks: Box::pin(subscribe_all.new_blocks),
718
0
                        pinned_blocks,
719
0
                        pinned_blocks_by_hash,
720
0
                        pinned_blocks_storage_changes: BTreeSet::new(),
721
0
                        blocks_to_unpin: Vec::with_capacity(8),
722
0
                        current_finalized_block_hash: subscribe_all.finalized_block_hash,
723
0
                        current_best_block_index,
724
0
                    })
725
                }
726
            };
727
728
            // Unpin the blocks that must be unpinned.
729
0
            while let Some(block_to_unpin) = subscription.blocks_to_unpin.last() {
730
0
                self.consensus_service
731
0
                    .unpin_block(subscription.subscription_id, *block_to_unpin)
732
0
                    .await;
733
0
                let _ = subscription.blocks_to_unpin.pop();
734
            }
735
736
            // Continue to fill the next storage changes report.
737
0
            while let Some(key) = subscription.new_report_remaining_keys.iter().next() {
738
0
                let best_block_hash = subscription
739
0
                    .current_best_block_index
740
0
                    .map_or(subscription.current_finalized_block_hash, |idx| {
741
0
                        *subscription.pinned_blocks.get(idx).unwrap()
742
0
                    });
Unexecuted instantiation: _RNCNCNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update00Bd_
Unexecuted instantiation: _RNCNCNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update00Bd_
743
0
744
0
                let key = key.clone();
745
746
0
                let (key, result) = self
747
0
                    .database
748
0
                    .with_database(move |database| {
749
0
                        let result = database.block_storage_get(
750
0
                            &best_block_hash,
751
0
                            iter::empty::<iter::Empty<_>>(),
752
0
                            trie::bytes_to_nibbles(key.iter().copied()).map(u8::from),
753
0
                        );
754
0
                        (key, result)
755
0
                    })
Unexecuted instantiation: _RNCNCNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s_0Bd_
Unexecuted instantiation: _RNCNCNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s_0Bd_
756
0
                    .await;
757
758
0
                subscription.new_report_remaining_keys.remove(&key);
759
760
0
                match result {
761
0
                    Ok(value) => subscription
762
0
                        .new_report_preparation
763
0
                        .push((key, value.map(|(v, _)| v))),
Unexecuted instantiation: _RNCNCNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s0_0Bd_
Unexecuted instantiation: _RNCNCNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s0_0Bd_
764
                    Err(database_thread::StorageAccessError::UnknownBlock)
765
                    | Err(database_thread::StorageAccessError::IncompleteStorage) => {
766
0
                        self.subscription = None;
767
0
                        continue 'main_subscription;
768
                    }
769
                    Err(database_thread::StorageAccessError::Corrupted(_)) => {
770
                        // Database corruption errors are ignored.
771
0
                        continue;
772
                    }
773
                }
774
            }
775
776
            // Send the storage changes report if it is complete.
777
0
            debug_assert!(subscription.new_report_remaining_keys.is_empty());
778
0
            if !subscription.new_report_preparation.is_empty() {
779
0
                let best_block_hash = subscription
780
0
                    .current_best_block_index
781
0
                    .map_or(subscription.current_finalized_block_hash, |idx| {
782
0
                        *subscription.pinned_blocks.get(idx).unwrap()
783
0
                    });
Unexecuted instantiation: _RNCNCNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s1_0Bd_
Unexecuted instantiation: _RNCNCNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB9_16SubscribeStorage19next_storage_update0s1_0Bd_
784
0
                return (
785
0
                    best_block_hash,
786
0
                    mem::replace(
787
0
                        &mut subscription.new_report_preparation,
788
0
                        Vec::with_capacity(self.keys.len()),
789
0
                    )
790
0
                    .into_iter(),
791
0
                );
792
0
            }
793
794
            // Process the next incoming consensus service notification.
795
0
            let notification = subscription.new_blocks.next().await;
796
0
            let Some(mut notification) = notification else {
797
0
                self.subscription = None;
798
0
                continue;
799
            };
800
801
            // If the notification is about a new block, insert said new block in the state
802
            // machine.
803
            if let consensus_service::Notification::Block {
804
0
                block,
805
0
                storage_changes,
806
0
            } = &mut notification
807
            {
808
0
                let node_index = subscription.pinned_blocks.insert(
809
0
                    if block.parent_hash != subscription.current_finalized_block_hash {
810
0
                        Some(
811
0
                            *subscription
812
0
                                .pinned_blocks_by_hash
813
0
                                .get(&block.parent_hash)
814
0
                                .unwrap(),
815
0
                        )
816
                    } else {
817
0
                        None
818
                    },
819
0
                    block.block_hash,
820
0
                );
821
0
822
0
                subscription
823
0
                    .pinned_blocks_by_hash
824
0
                    .insert(block.block_hash, node_index);
825
0
826
0
                if !self.keys.is_empty() {
827
0
                    for key in &self.keys {
828
0
                        if storage_changes.main_trie_diff_get(key).is_some() {
829
0
                            subscription
830
0
                                .pinned_blocks_storage_changes
831
0
                                .insert((node_index, key.clone()));
832
0
                        }
833
                    }
834
                } else {
835
0
                    for (changed_key, _) in
836
0
                        storage_changes.main_trie_storage_changes_iter_unordered()
837
0
                    {
838
0
                        subscription
839
0
                            .pinned_blocks_storage_changes
840
0
                            .insert((node_index, changed_key.to_owned()));
841
0
                    }
842
                }
843
0
            }
844
845
            // If the notification changes the best block, find the keys that have changed and
846
            // put them in `new_report_remaining_keys`.
847
            if let consensus_service::Notification::Block {
848
                block:
849
                    consensus_service::BlockNotification {
850
0
                        block_hash: best_block_hash,
851
                        is_new_best: true,
852
                        ..
853
                    },
854
                ..
855
            }
856
            | consensus_service::Notification::Finalized {
857
0
                best_block_hash, ..
858
0
            } = &notification
859
            {
860
0
                let new_best_block_node_index = *subscription
861
0
                    .pinned_blocks_by_hash
862
0
                    .get(best_block_hash)
863
0
                    .unwrap();
864
865
0
                let ascend_descend_iter = match subscription.current_best_block_index {
866
0
                    Some(prev_best_idx) => {
867
0
                        let (a, d) = subscription
868
0
                            .pinned_blocks
869
0
                            .ascend_and_descend(prev_best_idx, new_best_block_node_index);
870
0
                        either::Left(a.chain(d))
871
                    }
872
0
                    None => either::Right(
873
0
                        subscription
874
0
                            .pinned_blocks
875
0
                            .root_to_node_path(new_best_block_node_index),
876
0
                    ),
877
                };
878
879
0
                for block in ascend_descend_iter {
880
0
                    let storage_changes = subscription.pinned_blocks_storage_changes.range((
881
0
                        ops::Bound::Included((block, Vec::new())),
882
0
                        if let Some(block_plus_one) = block.inc() {
883
0
                            ops::Bound::Excluded((block_plus_one, Vec::new()))
884
                        } else {
885
0
                            ops::Bound::Unbounded
886
                        },
887
                    ));
888
889
0
                    for (_, key) in storage_changes {
890
0
                        subscription.new_report_remaining_keys.insert(key.clone());
891
0
                    }
892
                }
893
894
0
                subscription.current_best_block_index = Some(new_best_block_node_index);
895
0
            }
896
897
            // Remove from the state machine the blocks that have been finalized.
898
            if let consensus_service::Notification::Finalized {
899
0
                finalized_blocks_newest_to_oldest,
900
                ..
901
0
            } = notification
902
            {
903
0
                subscription.current_finalized_block_hash =
904
0
                    *finalized_blocks_newest_to_oldest.first().unwrap();
905
906
0
                for pruned_block in subscription.pinned_blocks.prune_ancestors(
907
0
                    *subscription
908
0
                        .pinned_blocks_by_hash
909
0
                        .get(&subscription.current_finalized_block_hash)
910
0
                        .unwrap(),
911
0
                ) {
912
0
                    let _was_in = subscription
913
0
                        .pinned_blocks_by_hash
914
0
                        .remove(&pruned_block.user_data);
915
0
                    debug_assert_eq!(_was_in, Some(pruned_block.index));
916
917
0
                    let mut after_split_off_point = subscription
918
0
                        .pinned_blocks_storage_changes
919
0
                        .split_off(&(pruned_block.index, Vec::new()));
920
0
                    if let Some(index_plus_one) = pruned_block.index.inc() {
921
0
                        let mut after_changes =
922
0
                            after_split_off_point.split_off(&(index_plus_one, Vec::new()));
923
0
                        subscription
924
0
                            .pinned_blocks_storage_changes
925
0
                            .append(&mut after_changes);
926
0
                    }
927
928
0
                    subscription.blocks_to_unpin.push(pruned_block.user_data);
929
                }
930
0
            }
931
        }
932
0
    }
Unexecuted instantiation: _RNCNvMs2_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_16SubscribeStorage19next_storage_update0Bb_
Unexecuted instantiation: _RNCNvMs2_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service24legacy_api_subscriptionsNtB7_16SubscribeStorage19next_storage_update0Bb_
933
}