/__w/smoldot/smoldot/repo/full-node/src/json_rpc_service/runtime_caches_service.rs
Line | Count | Source (jump to first uncovered line) |
1 | | // Smoldot |
2 | | // Copyright (C) 2023 Pierre Krieger |
3 | | // SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 |
4 | | |
5 | | // This program is free software: you can redistribute it and/or modify |
6 | | // it under the terms of the GNU General Public License as published by |
7 | | // the Free Software Foundation, either version 3 of the License, or |
8 | | // (at your option) any later version. |
9 | | |
10 | | // This program is distributed in the hope that it will be useful, |
11 | | // but WITHOUT ANY WARRANTY; without even the implied warranty of |
12 | | // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
13 | | // GNU General Public License for more details. |
14 | | |
15 | | // You should have received a copy of the GNU General Public License |
16 | | // along with this program. If not, see <http://www.gnu.org/licenses/>. |
17 | | |
18 | | use crate::{database_thread, LogCallback}; |
19 | | |
20 | | use futures_channel::oneshot; |
21 | | use futures_lite::{Future, StreamExt as _}; |
22 | | use smol::lock::Mutex; |
23 | | use smoldot::{executor, trie}; |
24 | | use std::{ |
25 | | iter, |
26 | | num::NonZeroUsize, |
27 | | pin::{self, Pin}, |
28 | | sync::Arc, |
29 | | }; |
30 | | |
31 | | /// Configuration of the service. |
32 | | pub struct Config { |
33 | | /// Closure that spawns background tasks. |
34 | | pub tasks_executor: Arc<dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Send + Sync>, |
35 | | |
36 | | /// Function called in order to notify of something. |
37 | | pub log_callback: Arc<dyn LogCallback + Send + Sync>, |
38 | | |
39 | | /// Database to access blocks. |
40 | | pub database: Arc<database_thread::DatabaseThread>, |
41 | | |
42 | | /// Number of entries in the cache of runtimes. |
43 | | pub num_cache_entries: NonZeroUsize, |
44 | | } |
45 | | |
46 | | /// A running runtime caches service. |
47 | | pub struct RuntimeCachesService { |
48 | | to_background: Mutex<async_channel::Sender<Message>>, |
49 | | } |
50 | | |
51 | | /// Message sent from the frontend to the background task. |
52 | | enum Message { |
53 | | Get { |
54 | | block_hash: [u8; 32], |
55 | | result_tx: oneshot::Sender<Result<Arc<executor::host::HostVmPrototype>, GetError>>, |
56 | | }, |
57 | | } |
58 | | |
59 | | impl RuntimeCachesService { |
60 | | /// Start a new service. |
61 | 21 | pub fn new(config: Config) -> Self { |
62 | 21 | let (to_background, from_foreground) = async_channel::bounded(16); |
63 | 21 | |
64 | 21 | (config.tasks_executor)(Box::pin(async move { |
65 | 21 | let mut from_foreground = pin::pin!(from_foreground); |
66 | 21 | let mut cache = |
67 | 21 | lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries); |
68 | | |
69 | | loop { |
70 | 23 | match from_foreground.next().await { |
71 | | Some(Message::Get { |
72 | 2 | block_hash, |
73 | 2 | result_tx, |
74 | | }) => { |
75 | | // Look in the cache. |
76 | 2 | if let Some(cache_entry0 ) = cache.get(&block_hash) { |
77 | 0 | let _ = result_tx.send(cache_entry.clone()); |
78 | 0 | continue; |
79 | 2 | } |
80 | | |
81 | 2 | let (code, heap_pages) = config |
82 | 2 | .database |
83 | 2 | .with_database(move |database| { |
84 | 2 | let code = database.block_storage_get( |
85 | 2 | &block_hash, |
86 | 2 | iter::empty::<iter::Empty<_>>(), |
87 | 2 | trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from), |
88 | 2 | ); |
89 | 2 | let heap_pages = database.block_storage_get( |
90 | 2 | &block_hash, |
91 | 2 | iter::empty::<iter::Empty<_>>(), |
92 | 2 | trie::bytes_to_nibbles(b":heappages".iter().copied()) |
93 | 2 | .map(u8::from), |
94 | 2 | ); |
95 | 2 | (code, heap_pages) |
96 | 2 | }) _RNCNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new00Ba_ Line | Count | Source | 83 | 2 | .with_database(move |database| { | 84 | 2 | let code = database.block_storage_get( | 85 | 2 | &block_hash, | 86 | 2 | iter::empty::<iter::Empty<_>>(), | 87 | 2 | trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from), | 88 | 2 | ); | 89 | 2 | let heap_pages = database.block_storage_get( | 90 | 2 | &block_hash, | 91 | 2 | iter::empty::<iter::Empty<_>>(), | 92 | 2 | trie::bytes_to_nibbles(b":heappages".iter().copied()) | 93 | 2 | .map(u8::from), | 94 | 2 | ); | 95 | 2 | (code, heap_pages) | 96 | 2 | }) |
Unexecuted instantiation: _RNCNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new00Ba_ |
97 | 2 | .await; |
98 | | |
99 | 2 | let runtime = match (code, heap_pages) { |
100 | 2 | (Ok(Some((code, _))), Ok(heap_pages)) => { |
101 | 2 | match executor::storage_heap_pages_to_value( |
102 | 2 | heap_pages.as_ref().map(|(h, _)| &h[..]0 ), Unexecuted instantiation: _RNCNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new0s_0Ba_ Unexecuted instantiation: _RNCNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB6_20RuntimeCachesService3new0s_0Ba_ |
103 | 2 | ) { |
104 | 2 | Ok(heap_pages) => executor::host::HostVmPrototype::new( |
105 | 2 | executor::host::Config { |
106 | 2 | module: &code, |
107 | 2 | heap_pages, |
108 | 2 | exec_hint: executor::vm::ExecHint::ValidateAndCompile, |
109 | 2 | allow_unresolved_imports: true, // TODO: configurable? or if not, document |
110 | 2 | }, |
111 | 2 | ) |
112 | 2 | .map_err(GetError::InvalidRuntime), |
113 | 0 | Err(_) => Err(GetError::InvalidHeapPages), |
114 | | } |
115 | | } |
116 | 0 | (Ok(None), Ok(_)) => Err(GetError::NoCode), |
117 | | (Err(database_thread::StorageAccessError::UnknownBlock), _) |
118 | | | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => { |
119 | | // Note that we don't put the `CorruptedError` in the cache, in |
120 | | // case the database somehow recovers. |
121 | 0 | let _ = result_tx.send(Err(GetError::UnknownBlock)); |
122 | 0 | continue; |
123 | | } |
124 | | (Err(database_thread::StorageAccessError::IncompleteStorage), _) |
125 | | | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => { |
126 | | // Note that we don't put the `CorruptedError` in the cache, in |
127 | | // case the database somehow recovers. |
128 | 0 | let _ = result_tx.send(Err(GetError::Pruned)); |
129 | 0 | continue; |
130 | | } |
131 | | (Err(database_thread::StorageAccessError::Corrupted(_)), _) |
132 | | | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => { |
133 | | // Note that we don't put the `CorruptedError` in the cache, in |
134 | | // case the database somehow recovers. |
135 | 0 | let _ = result_tx.send(Err(GetError::CorruptedDatabase)); |
136 | 0 | continue; |
137 | | } |
138 | | }; |
139 | | |
140 | 2 | let runtime = runtime.map(Arc::new); |
141 | 2 | cache.put(block_hash, runtime.clone()); |
142 | 2 | let _ = result_tx.send(runtime); |
143 | | } |
144 | | None => { |
145 | | // Stop the service. |
146 | 21 | return; |
147 | 21 | } |
148 | 21 | } |
149 | 21 | } |
150 | 21 | })); _RNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3new0B8_ Line | Count | Source | 64 | 21 | (config.tasks_executor)(Box::pin(async move { | 65 | 21 | let mut from_foreground = pin::pin!(from_foreground); | 66 | 21 | let mut cache = | 67 | 21 | lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries); | 68 | | | 69 | | loop { | 70 | 23 | match from_foreground.next().await { | 71 | | Some(Message::Get { | 72 | 2 | block_hash, | 73 | 2 | result_tx, | 74 | | }) => { | 75 | | // Look in the cache. | 76 | 2 | if let Some(cache_entry0 ) = cache.get(&block_hash) { | 77 | 0 | let _ = result_tx.send(cache_entry.clone()); | 78 | 0 | continue; | 79 | 2 | } | 80 | | | 81 | 2 | let (code, heap_pages) = config | 82 | 2 | .database | 83 | 2 | .with_database(move |database| { | 84 | | let code = database.block_storage_get( | 85 | | &block_hash, | 86 | | iter::empty::<iter::Empty<_>>(), | 87 | | trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from), | 88 | | ); | 89 | | let heap_pages = database.block_storage_get( | 90 | | &block_hash, | 91 | | iter::empty::<iter::Empty<_>>(), | 92 | | trie::bytes_to_nibbles(b":heappages".iter().copied()) | 93 | | .map(u8::from), | 94 | | ); | 95 | | (code, heap_pages) | 96 | 2 | }) | 97 | 2 | .await; | 98 | | | 99 | 2 | let runtime = match (code, heap_pages) { | 100 | 2 | (Ok(Some((code, _))), Ok(heap_pages)) => { | 101 | 2 | match executor::storage_heap_pages_to_value( | 102 | 2 | heap_pages.as_ref().map(|(h, _)| &h[..]), | 103 | 2 | ) { | 104 | 2 | Ok(heap_pages) => executor::host::HostVmPrototype::new( | 105 | 2 | executor::host::Config { | 106 | 2 | module: &code, | 107 | 2 | heap_pages, | 108 | 2 | exec_hint: executor::vm::ExecHint::ValidateAndCompile, | 109 | 2 | allow_unresolved_imports: true, // TODO: configurable? or if not, document | 110 | 2 | }, | 111 | 2 | ) | 112 | 2 | .map_err(GetError::InvalidRuntime), | 113 | 0 | Err(_) => Err(GetError::InvalidHeapPages), | 114 | | } | 115 | | } | 116 | 0 | (Ok(None), Ok(_)) => Err(GetError::NoCode), | 117 | | (Err(database_thread::StorageAccessError::UnknownBlock), _) | 118 | | | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => { | 119 | | // Note that we don't put the `CorruptedError` in the cache, in | 120 | | // case the database somehow recovers. | 121 | 0 | let _ = result_tx.send(Err(GetError::UnknownBlock)); | 122 | 0 | continue; | 123 | | } | 124 | | (Err(database_thread::StorageAccessError::IncompleteStorage), _) | 125 | | | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => { | 126 | | // Note that we don't put the `CorruptedError` in the cache, in | 127 | | // case the database somehow recovers. | 128 | 0 | let _ = result_tx.send(Err(GetError::Pruned)); | 129 | 0 | continue; | 130 | | } | 131 | | (Err(database_thread::StorageAccessError::Corrupted(_)), _) | 132 | | | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => { | 133 | | // Note that we don't put the `CorruptedError` in the cache, in | 134 | | // case the database somehow recovers. | 135 | 0 | let _ = result_tx.send(Err(GetError::CorruptedDatabase)); | 136 | 0 | continue; | 137 | | } | 138 | | }; | 139 | | | 140 | 2 | let runtime = runtime.map(Arc::new); | 141 | 2 | cache.put(block_hash, runtime.clone()); | 142 | 2 | let _ = result_tx.send(runtime); | 143 | | } | 144 | | None => { | 145 | | // Stop the service. | 146 | 21 | return; | 147 | 21 | } | 148 | 21 | } | 149 | 21 | } | 150 | 21 | })); |
Unexecuted instantiation: _RNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3new0B8_ |
151 | 21 | |
152 | 21 | RuntimeCachesService { |
153 | 21 | to_background: Mutex::new(to_background), |
154 | 21 | } |
155 | 21 | } _RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3new Line | Count | Source | 61 | 21 | pub fn new(config: Config) -> Self { | 62 | 21 | let (to_background, from_foreground) = async_channel::bounded(16); | 63 | 21 | | 64 | 21 | (config.tasks_executor)(Box::pin(async move { | 65 | | let mut from_foreground = pin::pin!(from_foreground); | 66 | | let mut cache = | 67 | | lru::LruCache::<[u8; 32], Result<_, GetError>>::new(config.num_cache_entries); | 68 | | | 69 | | loop { | 70 | | match from_foreground.next().await { | 71 | | Some(Message::Get { | 72 | | block_hash, | 73 | | result_tx, | 74 | | }) => { | 75 | | // Look in the cache. | 76 | | if let Some(cache_entry) = cache.get(&block_hash) { | 77 | | let _ = result_tx.send(cache_entry.clone()); | 78 | | continue; | 79 | | } | 80 | | | 81 | | let (code, heap_pages) = config | 82 | | .database | 83 | | .with_database(move |database| { | 84 | | let code = database.block_storage_get( | 85 | | &block_hash, | 86 | | iter::empty::<iter::Empty<_>>(), | 87 | | trie::bytes_to_nibbles(b":code".iter().copied()).map(u8::from), | 88 | | ); | 89 | | let heap_pages = database.block_storage_get( | 90 | | &block_hash, | 91 | | iter::empty::<iter::Empty<_>>(), | 92 | | trie::bytes_to_nibbles(b":heappages".iter().copied()) | 93 | | .map(u8::from), | 94 | | ); | 95 | | (code, heap_pages) | 96 | | }) | 97 | | .await; | 98 | | | 99 | | let runtime = match (code, heap_pages) { | 100 | | (Ok(Some((code, _))), Ok(heap_pages)) => { | 101 | | match executor::storage_heap_pages_to_value( | 102 | | heap_pages.as_ref().map(|(h, _)| &h[..]), | 103 | | ) { | 104 | | Ok(heap_pages) => executor::host::HostVmPrototype::new( | 105 | | executor::host::Config { | 106 | | module: &code, | 107 | | heap_pages, | 108 | | exec_hint: executor::vm::ExecHint::ValidateAndCompile, | 109 | | allow_unresolved_imports: true, // TODO: configurable? or if not, document | 110 | | }, | 111 | | ) | 112 | | .map_err(GetError::InvalidRuntime), | 113 | | Err(_) => Err(GetError::InvalidHeapPages), | 114 | | } | 115 | | } | 116 | | (Ok(None), Ok(_)) => Err(GetError::NoCode), | 117 | | (Err(database_thread::StorageAccessError::UnknownBlock), _) | 118 | | | (_, Err(database_thread::StorageAccessError::UnknownBlock)) => { | 119 | | // Note that we don't put the `CorruptedError` in the cache, in | 120 | | // case the database somehow recovers. | 121 | | let _ = result_tx.send(Err(GetError::UnknownBlock)); | 122 | | continue; | 123 | | } | 124 | | (Err(database_thread::StorageAccessError::IncompleteStorage), _) | 125 | | | (_, Err(database_thread::StorageAccessError::IncompleteStorage)) => { | 126 | | // Note that we don't put the `CorruptedError` in the cache, in | 127 | | // case the database somehow recovers. | 128 | | let _ = result_tx.send(Err(GetError::Pruned)); | 129 | | continue; | 130 | | } | 131 | | (Err(database_thread::StorageAccessError::Corrupted(_)), _) | 132 | | | (_, Err(database_thread::StorageAccessError::Corrupted(_))) => { | 133 | | // Note that we don't put the `CorruptedError` in the cache, in | 134 | | // case the database somehow recovers. | 135 | | let _ = result_tx.send(Err(GetError::CorruptedDatabase)); | 136 | | continue; | 137 | | } | 138 | | }; | 139 | | | 140 | | let runtime = runtime.map(Arc::new); | 141 | | cache.put(block_hash, runtime.clone()); | 142 | | let _ = result_tx.send(runtime); | 143 | | } | 144 | | None => { | 145 | | // Stop the service. | 146 | | return; | 147 | | } | 148 | | } | 149 | | } | 150 | 21 | })); | 151 | 21 | | 152 | 21 | RuntimeCachesService { | 153 | 21 | to_background: Mutex::new(to_background), | 154 | 21 | } | 155 | 21 | } |
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3new |
156 | | |
157 | | /// Obtains the runtime corresponding to a certain block. |
158 | 2 | pub async fn get( |
159 | 2 | &self, |
160 | 2 | block_hash: [u8; 32], |
161 | 2 | ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> { _RNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3get Line | Count | Source | 158 | 2 | pub async fn get( | 159 | 2 | &self, | 160 | 2 | block_hash: [u8; 32], | 161 | 2 | ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> { |
Unexecuted instantiation: _RNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB2_20RuntimeCachesService3get |
162 | 2 | let (result_tx, result_rx) = oneshot::channel(); |
163 | 2 | let _ = self |
164 | 2 | .to_background |
165 | 2 | .lock() |
166 | 0 | .await |
167 | 2 | .send(Message::Get { |
168 | 2 | block_hash, |
169 | 2 | result_tx, |
170 | 2 | }) |
171 | 0 | .await; |
172 | 2 | result_rx.await.unwrap() |
173 | 2 | } _RNCNvMNtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3get0B8_ Line | Count | Source | 161 | 2 | ) -> Result<Arc<executor::host::HostVmPrototype>, GetError> { | 162 | 2 | let (result_tx, result_rx) = oneshot::channel(); | 163 | 2 | let _ = self | 164 | 2 | .to_background | 165 | 2 | .lock() | 166 | 0 | .await | 167 | 2 | .send(Message::Get { | 168 | 2 | block_hash, | 169 | 2 | result_tx, | 170 | 2 | }) | 171 | 0 | .await; | 172 | 2 | result_rx.await.unwrap() | 173 | 2 | } |
Unexecuted instantiation: _RNCNvMNtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB4_20RuntimeCachesService3get0B8_ |
174 | | } |
175 | | |
176 | | /// Error potentially returned by [`RuntimeCachesService::get`]. |
177 | 0 | #[derive(Debug, Clone, derive_more::Display)] Unexecuted instantiation: _RNvXs1_NtNtCsiUjFBJteJ7x_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB5_8GetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt Unexecuted instantiation: _RNvXs1_NtNtCshBwayKnNXDT_17smoldot_full_node16json_rpc_service22runtime_caches_serviceNtB5_8GetErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt |
178 | | pub enum GetError { |
179 | | /// Requested block couldn't be found in the database. |
180 | | UnknownBlock, |
181 | | /// Storage of requested block is no longer in the database. |
182 | | Pruned, |
183 | | /// Block doesn't have any storage entry at the key `:code`. |
184 | | NoCode, |
185 | | /// Invalid storage entry at `:heappages`. |
186 | | InvalidHeapPages, |
187 | | /// Database is corrupted. |
188 | | CorruptedDatabase, |
189 | | /// Impossible to compile the runtime. |
190 | | InvalidRuntime(executor::host::NewErr), |
191 | | } |