Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service/runtime_caches_service.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2023  Pierre Krieger
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use crate::{database_thread, LogCallback};
19
20
use futures_channel::oneshot;
21
use futures_lite::{Future, StreamExt as _};
22
use smol::lock::Mutex;
23
use smoldot::{executor, trie};
24
use std::{
25
    iter,
26
    num::NonZeroUsize,
27
    pin::{self, Pin},
28
    sync::Arc,
29
};
30
31
/// Configuration of the service.
32
pub struct Config {
33
    /// Closure that spawns background tasks.
34
    pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>,
35
36
    /// Function called in order to notify of something.
37
    pub log_callback: Arc<dyn LogCallback + Send + Sync>,
38
39
    /// Database to access blocks.
40
    pub database: Arc<database_thread::DatabaseThread>,
41
42
    /// Number of entries in the cache of runtimes.
43
    pub num_cache_entries: NonZeroUsize,
44
}
45
46
/// A running runtime caches service.
47
pub struct RuntimeCachesService {
48
    to_background: Mutex<async_channel::Sender<Message>>,
49
}
50
51
/// Message sent from the frontend to the background task.
52
enum Message {
53
    Get {
54
        block_hash: [u8; 32],
55
        result_tx: oneshot::Sender<Result<Arc<executor::host::HostVmPrototype>, GetError>>,
56
    },
57
}
58
59
impl RuntimeCachesService {
60
    /// Start a new service.
61
21
    pub fn new(config: Config) -> Self {
62
21
        let (to_background, from_foreground) = async_channel::bounded(16);
63
21
64
21
        (config.tasks_executor)(Box::pin(async move {
65
21
            let mut from_foreground = pin::pin!(from_foreground);
66
21
            let mut cache =
67
21
                lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries);
68
69
            loop {
70
23
                match from_foreground.next().await {
71
                    Some(Message::Get {
72
2
                        block_hash,
73
2
                        result_tx,
74
                    }) => {
75
                        // Look in the cache.
76
2
                        if let Some(
cache_entry0
) = cache.get(&block_hash) {
77
0
                            let _ = result_tx.send(cache_entry.clone());
78
0
                            continue;
79
2
                        }
80
81
2
                        let (code, heap_pages) = config
82
2
                            .database
83
2
                            .with_database(move |database| {
84
2
                                let code = database.block_storage_get(
85
2
                                    &block_hash,
86
2
                                    iter::empty::<iter::Empty<_>>(),
87
2
                                    trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
88
2
                                );
89
2
                                let heap_pages = database.block_storage_get(
90
2
                                    &block_hash,
91
2
                                    iter::empty::<iter::Empty<_>>(),
92
2
                                    trie::bytes_to_nibbles(b":heappages".iter().copied())
93
2
                                        .map(u8::from),
94
2
                                );
95
2
                                (code, heap_pages)
96
2
                            })
_RNCNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new00Ba_
Line
Count
Source
83
2
                            .with_database(move |database| {
84
2
                                let code = database.block_storage_get(
85
2
                                    &block_hash,
86
2
                                    iter::empty::<iter::Empty<_>>(),
87
2
                                    trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
88
2
                                );
89
2
                                let heap_pages = database.block_storage_get(
90
2
                                    &block_hash,
91
2
                                    iter::empty::<iter::Empty<_>>(),
92
2
                                    trie::bytes_to_nibbles(b":heappages".iter().copied())
93
2
                                        .map(u8::from),
94
2
                                );
95
2
                                (code, heap_pages)
96
2
                            })
Unexecuted instantiation: _RNCNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new00Ba_
97
2
                            .await;
98
99
2
                        let runtime = match (code, heap_pages) {
100
2
                            (Ok(Some((code, _))), Ok(heap_pages)) => {
101
2
                                match executor::storage_heap_pages_to_value(
102
2
                                    heap_pages.as_ref().map(|(h, _)| 
&h[..]0
),
Unexecuted instantiation: _RNCNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new0s_0Ba_
Unexecuted instantiation: _RNCNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new0s_0Ba_
103
2
                                ) {
104
2
                                    Ok(heap_pages) => executor::host::HostVmPrototype::new(
105
2
                                        executor::host::Config {
106
2
                                            module: &code,
107
2
                                            heap_pages,
108
2
                                            exec_hint: executor::vm::ExecHint::ValidateAndCompile,
109
2
                                            allow_unresolved_imports: true, // TODO: configurable? or if not, document
110
2
                                        },
111
2
                                    )
112
2
                                    .map_err(GetError::InvalidRuntime),
113
0
                                    Err(_) => Err(GetError::InvalidHeapPages),
114
                                }
115
                            }
116
0
                            (Ok(None), Ok(_)) => Err(GetError::NoCode),
117
                            (Err(database_thread::StorageAccessError::UnknownBlock), _)
118
                            | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => {
119
                                // Note that we don't put the `CorruptedError` in the cache, in
120
                                // case the database somehow recovers.
121
0
                                let _ = result_tx.send(Err(GetError::UnknownBlock));
122
0
                                continue;
123
                            }
124
                            (Err(database_thread::StorageAccessError::IncompleteStorage), _)
125
                            | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => {
126
                                // Note that we don't put the `CorruptedError` in the cache, in
127
                                // case the database somehow recovers.
128
0
                                let _ = result_tx.send(Err(GetError::Pruned));
129
0
                                continue;
130
                            }
131
                            (Err(database_thread::StorageAccessError::Corrupted(_)), _)
132
                            | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => {
133
                                // Note that we don't put the `CorruptedError` in the cache, in
134
                                // case the database somehow recovers.
135
0
                                let _ = result_tx.send(Err(GetError::CorruptedDatabase));
136
0
                                continue;
137
                            }
138
                        };
139
140
2
                        let runtime = runtime.map(Arc::new);
141
2
                        cache.put(block_hash, runtime.clone());
142
2
                        let _ = result_tx.send(runtime);
143
                    }
144
                    None => {
145
                        // Stop the service.
146
21
                        return;
147
21
                    }
148
21
                }
149
21
            }
150
21
        }));
_RNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3new0B8_
Line
Count
Source
64
21
        (config.tasks_executor)(Box::pin(async move {
65
21
            let mut from_foreground = pin::pin!(from_foreground);
66
21
            let mut cache =
67
21
                lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries);
68
69
            loop {
70
23
                match from_foreground.next().await {
71
                    Some(Message::Get {
72
2
                        block_hash,
73
2
                        result_tx,
74
                    }) => {
75
                        // Look in the cache.
76
2
                        if let Some(
cache_entry0
) = cache.get(&block_hash) {
77
0
                            let _ = result_tx.send(cache_entry.clone());
78
0
                            continue;
79
2
                        }
80
81
2
                        let (code, heap_pages) = config
82
2
                            .database
83
2
                            .with_database(move |database| {
84
                                let code = database.block_storage_get(
85
                                    &block_hash,
86
                                    iter::empty::<iter::Empty<_>>(),
87
                                    trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
88
                                );
89
                                let heap_pages = database.block_storage_get(
90
                                    &block_hash,
91
                                    iter::empty::<iter::Empty<_>>(),
92
                                    trie::bytes_to_nibbles(b":heappages".iter().copied())
93
                                        .map(u8::from),
94
                                );
95
                                (code, heap_pages)
96
2
                            })
97
2
                            .await;
98
99
2
                        let runtime = match (code, heap_pages) {
100
2
                            (Ok(Some((code, _))), Ok(heap_pages)) => {
101
2
                                match executor::storage_heap_pages_to_value(
102
2
                                    heap_pages.as_ref().map(|(h, _)| &h[..]),
103
2
                                ) {
104
2
                                    Ok(heap_pages) => executor::host::HostVmPrototype::new(
105
2
                                        executor::host::Config {
106
2
                                            module: &code,
107
2
                                            heap_pages,
108
2
                                            exec_hint: executor::vm::ExecHint::ValidateAndCompile,
109
2
                                            allow_unresolved_imports: true, // TODO: configurable? or if not, document
110
2
                                        },
111
2
                                    )
112
2
                                    .map_err(GetError::InvalidRuntime),
113
0
                                    Err(_) => Err(GetError::InvalidHeapPages),
114
                                }
115
                            }
116
0
                            (Ok(None), Ok(_)) => Err(GetError::NoCode),
117
                            (Err(database_thread::StorageAccessError::UnknownBlock), _)
118
                            | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => {
119
                                // Note that we don't put the `CorruptedError` in the cache, in
120
                                // case the database somehow recovers.
121
0
                                let _ = result_tx.send(Err(GetError::UnknownBlock));
122
0
                                continue;
123
                            }
124
                            (Err(database_thread::StorageAccessError::IncompleteStorage), _)
125
                            | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => {
126
                                // Note that we don't put the `CorruptedError` in the cache, in
127
                                // case the database somehow recovers.
128
0
                                let _ = result_tx.send(Err(GetError::Pruned));
129
0
                                continue;
130
                            }
131
                            (Err(database_thread::StorageAccessError::Corrupted(_)), _)
132
                            | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => {
133
                                // Note that we don't put the `CorruptedError` in the cache, in
134
                                // case the database somehow recovers.
135
0
                                let _ = result_tx.send(Err(GetError::CorruptedDatabase));
136
0
                                continue;
137
                            }
138
                        };
139
140
2
                        let runtime = runtime.map(Arc::new);
141
2
                        cache.put(block_hash, runtime.clone());
142
2
                        let _ = result_tx.send(runtime);
143
                    }
144
                    None => {
145
                        // Stop the service.
146
21
                        return;
147
21
                    }
148
21
                }
149
21
            }
150
21
        }));
Unexecuted instantiation: _RNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3new0B8_
151
21
152
21
        RuntimeCachesService {
153
21
            to_background: Mutex::new(to_background),
154
21
        }
155
21
    }
_RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3new
Line
Count
Source
61
21
    pub fn new(config: Config) -> Self {
62
21
        let (to_background, from_foreground) = async_channel::bounded(16);
63
21
64
21
        (config.tasks_executor)(Box::pin(async move {
65
            let mut from_foreground = pin::pin!(from_foreground);
66
            let mut cache =
67
                lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries);
68
69
            loop {
70
                match from_foreground.next().await {
71
                    Some(Message::Get {
72
                        block_hash,
73
                        result_tx,
74
                    }) => {
75
                        // Look in the cache.
76
                        if let Some(cache_entry) = cache.get(&block_hash) {
77
                            let _ = result_tx.send(cache_entry.clone());
78
                            continue;
79
                        }
80
81
                        let (code, heap_pages) = config
82
                            .database
83
                            .with_database(move |database| {
84
                                let code = database.block_storage_get(
85
                                    &block_hash,
86
                                    iter::empty::<iter::Empty<_>>(),
87
                                    trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from),
88
                                );
89
                                let heap_pages = database.block_storage_get(
90
                                    &block_hash,
91
                                    iter::empty::<iter::Empty<_>>(),
92
                                    trie::bytes_to_nibbles(b":heappages".iter().copied())
93
                                        .map(u8::from),
94
                                );
95
                                (code, heap_pages)
96
                            })
97
                            .await;
98
99
                        let runtime = match (code, heap_pages) {
100
                            (Ok(Some((code, _))), Ok(heap_pages)) => {
101
                                match executor::storage_heap_pages_to_value(
102
                                    heap_pages.as_ref().map(|(h, _)| &h[..]),
103
                                ) {
104
                                    Ok(heap_pages) => executor::host::HostVmPrototype::new(
105
                                        executor::host::Config {
106
                                            module: &code,
107
                                            heap_pages,
108
                                            exec_hint: executor::vm::ExecHint::ValidateAndCompile,
109
                                            allow_unresolved_imports: true, // TODO: configurable? or if not, document
110
                                        },
111
                                    )
112
                                    .map_err(GetError::InvalidRuntime),
113
                                    Err(_) => Err(GetError::InvalidHeapPages),
114
                                }
115
                            }
116
                            (Ok(None), Ok(_)) => Err(GetError::NoCode),
117
                            (Err(database_thread::StorageAccessError::UnknownBlock), _)
118
                            | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => {
119
                                // Note that we don't put the `CorruptedError` in the cache, in
120
                                // case the database somehow recovers.
121
                                let _ = result_tx.send(Err(GetError::UnknownBlock));
122
                                continue;
123
                            }
124
                            (Err(database_thread::StorageAccessError::IncompleteStorage), _)
125
                            | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => {
126
                                // Note that we don't put the `CorruptedError` in the cache, in
127
                                // case the database somehow recovers.
128
                                let _ = result_tx.send(Err(GetError::Pruned));
129
                                continue;
130
                            }
131
                            (Err(database_thread::StorageAccessError::Corrupted(_)), _)
132
                            | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => {
133
                                // Note that we don't put the `CorruptedError` in the cache, in
134
                                // case the database somehow recovers.
135
                                let _ = result_tx.send(Err(GetError::CorruptedDatabase));
136
                                continue;
137
                            }
138
                        };
139
140
                        let runtime = runtime.map(Arc::new);
141
                        cache.put(block_hash, runtime.clone());
142
                        let _ = result_tx.send(runtime);
143
                    }
144
                    None => {
145
                        // Stop the service.
146
                        return;
147
                    }
148
                }
149
            }
150
21
        }));
151
21
152
21
        RuntimeCachesService {
153
21
            to_background: Mutex::new(to_background),
154
21
        }
155
21
    }
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3new
156
157
    /// Obtains the runtime corresponding to a certain block.
158
2
    pub async fn get(
159
2
        &self,
160
2
        block_hash: [u8; 32],
161
2
    ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> {
_RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3get
Line
Count
Source
158
2
    pub async fn get(
159
2
        &self,
160
2
        block_hash: [u8; 32],
161
2
    ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> {
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3get
162
2
        let (result_tx, result_rx) = oneshot::channel();
163
2
        let _ = self
164
2
            .to_background
165
2
            .lock()
166
0
            .await
167
2
            .send(Message::Get {
168
2
                block_hash,
169
2
                result_tx,
170
2
            })
171
0
            .await;
172
2
        result_rx.await.unwrap()
173
2
    }
_RNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3get0B8_
Line
Count
Source
161
2
    ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> {
162
2
        let (result_tx, result_rx) = oneshot::channel();
163
2
        let _ = self
164
2
            .to_background
165
2
            .lock()
166
0
            .await
167
2
            .send(Message::Get {
168
2
                block_hash,
169
2
                result_tx,
170
2
            })
171
0
            .await;
172
2
        result_rx.await.unwrap()
173
2
    }
Unexecuted instantiation: _RNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3get0B8_
174
}
175
176
/// Error potentially returned by [`RuntimeCachesService::get`].
177
0
#[derive(Debug, Clone, derive_more::Display)]
Unexecuted instantiation: _RNvXs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB5_8GetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB5_8GetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
178
pub enum GetError {
179
    /// Requested block couldn't be found in the database.
180
    UnknownBlock,
181
    /// Storage of requested block is no longer in the database.
182
    Pruned,
183
    /// Block doesn't have any storage entry at the key `:code`.
184
    NoCode,
185
    /// Invalid storage entry at `:heappages`.
186
    InvalidHeapPages,
187
    /// Database is corrupted.
188
    CorruptedDatabase,
189
    /// Impossible to compile the runtime.
190
    InvalidRuntime(executor::host::NewErr),
191
}