Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/wasm-node/rust/src/lib.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Contains a light client implementation usable from a browser environment.
19
20
#![cfg_attr(not(feature = "std"), no_std)]
21
#![deny(rustdoc::broken_intra_doc_links)]
22
#![deny(unused_crate_dependencies)]
23
24
extern crate alloc;
25
26
use alloc::{
27
    boxed::Box,
28
    format,
29
    string::{String, ToString as _},
30
    sync::Arc,
31
    vec::Vec,
32
};
33
use async_lock::Mutex;
34
use core::{num::NonZeroU32, pin::Pin, str, task};
35
use futures_util::{stream, Stream as _, StreamExt as _};
36
use smoldot_light::{platform::PlatformRef, HandleRpcError};
37
38
pub mod bindings;
39
40
mod allocator;
41
mod init;
42
mod platform;
43
mod timers;
44
45
static CLIENT: Mutex<init::Client<platform::PlatformRef, ()>> = Mutex::new(init::Client {
46
    smoldot: smoldot_light::Client::new(platform::PLATFORM_REF),
47
    chains: slab::Slab::new(),
48
});
49
50
0
fn init(max_log_level: u32) {
51
0
    init::init(max_log_level);
52
0
}
53
54
0
fn add_chain(
55
0
    chain_spec: Vec<u8>,
56
0
    database_content: Vec<u8>,
57
0
    json_rpc_max_pending_requests: u32,
58
0
    json_rpc_max_subscriptions: u32,
59
0
    potential_relay_chains: Vec<u8>,
60
0
) -> u32 {
61
0
    let mut client_lock = CLIENT.try_lock().unwrap();
62
63
    // Retrieve the potential relay chains parameter passed through the FFI layer.
64
    // TODO: this is kind of racy, as the API user could remove the relay chain while adding a parachain; it would be stupid to do that so this issue is low priority, and this code will likely change again in the future so it's not worth solving immediately
65
0
    let potential_relay_chains: Vec<_> = {
66
0
        assert_eq!(potential_relay_chains.len() % 4, 0);
67
0
        potential_relay_chains
68
0
            .chunks(4)
69
0
            .map(|c| u32::from_le_bytes(<[u8; 4]>::try_from(c).unwrap()))
70
0
            .filter_map(|c| {
71
                if let Some(init::Chain::Created {
72
0
                    smoldot_chain_id, ..
73
0
                }) = client_lock.chains.get(usize::try_from(c).ok()?)
74
                {
75
0
                    Some(*smoldot_chain_id)
76
                } else {
77
0
                    None
78
                }
79
0
            })
80
0
            .collect()
81
0
    };
82
0
83
0
    // This function only allocates a "chain id", then spawns a task that performs the actual
84
0
    // chain creation in the background.
85
0
    // This makes it possible to measure the CPU usage of chain creation the same way as the CPU
86
0
    // is measured for all other background tasks.
87
0
    // It also makes it possible in the future to make chain creation asynchronous in the
88
0
    // `light-base` crate, which will make it possible to periodically yield and avoid using too
89
0
    // much CPU at once.
90
0
    // TODO: act on that last sentence ^
91
0
    let outer_chain_id = client_lock.chains.insert(init::Chain::Initializing);
92
0
    let outer_chain_id_u32 = u32::try_from(outer_chain_id).unwrap();
93
0
94
0
    platform::PLATFORM_REF.spawn_task(
95
0
        format!("add-chain-{outer_chain_id_u32}").into(),
96
0
        async move {
97
0
            let mut client_lock = CLIENT.try_lock().unwrap();
98
0
99
0
            // Fail any new chain initialization if we're running low on memory space, which can
100
0
            // realistically happen as Wasm is a 32 bits platform. This avoids potentially running
101
0
            // into OOM errors. The threshold is completely empirical and should probably be
102
0
            // updated regularly to account for changes in the implementation.
103
0
            if allocator::total_alloc_bytes() >= usize::MAX - 400 * 1024 * 1024 {
104
0
                client_lock.chains.remove(outer_chain_id);
105
0
                unsafe {
106
0
                    let error = "Wasm node is running low on memory and will prevent any new chain from being added";
107
0
                    bindings::chain_initialized(
108
0
                        outer_chain_id_u32,
109
0
                        u32::try_from(error.as_bytes().as_ptr() as usize).unwrap(),
110
0
                        u32::try_from(error.as_bytes().len()).unwrap(),
111
0
                    );
112
0
                }
113
0
                return;
114
0
            }
115
116
            // Insert the chain in the client.
117
            let smoldot_light::AddChainSuccess {
118
0
                chain_id: smoldot_chain_id,
119
0
                json_rpc_responses,
120
0
            } = match client_lock
121
0
                .smoldot
122
0
                .add_chain(smoldot_light::AddChainConfig {
123
0
                    user_data: (),
124
0
                    specification: str::from_utf8(&chain_spec)
125
0
                        .unwrap_or_else(|_| panic!("non-utf8 chain spec")),
126
0
                    database_content: str::from_utf8(&database_content)
127
0
                        .unwrap_or_else(|_| panic!("non-utf8 database content")),
128
0
                    json_rpc: if let Some(json_rpc_max_pending_requests) =
129
0
                        NonZeroU32::new(json_rpc_max_pending_requests)
130
                    {
131
0
                        smoldot_light::AddChainConfigJsonRpc::Enabled {
132
0
                            max_pending_requests: json_rpc_max_pending_requests,
133
0
                            // Note: the PolkadotJS UI is very heavy in terms of subscriptions.
134
0
                            max_subscriptions: json_rpc_max_subscriptions,
135
0
                        }
136
                    } else {
137
0
                        smoldot_light::AddChainConfigJsonRpc::Disabled
138
                    },
139
0
                    potential_relay_chains: potential_relay_chains.into_iter(),
140
                }) {
141
0
                Ok(c) => c,
142
0
                Err(error) => {
143
0
                    client_lock.chains.remove(outer_chain_id);
144
0
                    unsafe {
145
0
                        let error = error.to_string();
146
0
                        bindings::chain_initialized(
147
0
                            outer_chain_id_u32,
148
0
                            u32::try_from(error.as_bytes().as_ptr() as usize).unwrap(),
149
0
                            u32::try_from(error.as_bytes().len()).unwrap(),
150
0
                        );
151
0
                    }
152
0
                    return;
153
                }
154
            };
155
156
0
            client_lock.chains[outer_chain_id] = init::Chain::Created {
157
0
                smoldot_chain_id,
158
0
                json_rpc_response: None,
159
0
                json_rpc_response_info: Box::new(bindings::JsonRpcResponseInfo { ptr: 0, len: 0 }),
160
0
                json_rpc_responses_rx: None,
161
0
            };
162
0
163
0
            // We wrap the JSON-RPC responses stream into a proper stream in order to be able to
164
0
            // guarantee that `poll_next()` always operates on the same future.
165
0
            let json_rpc_responses = json_rpc_responses.map(|json_rpc_responses| {
166
0
                stream::unfold(json_rpc_responses, |mut json_rpc_responses| async {
167
                    // The stream ends when we remove the chain. Once the chain is removed, the user
168
                    // cannot poll the stream anymore. Therefore it is safe to unwrap the result here.
169
0
                    let msg = json_rpc_responses.next().await.unwrap();
170
0
                    Some((msg, json_rpc_responses))
171
0
                })
172
0
                .boxed()
173
0
            });
174
175
            if let init::Chain::Created {
176
0
                json_rpc_responses_rx,
177
                ..
178
0
            } = client_lock.chains.get_mut(outer_chain_id).unwrap()
179
0
            {
180
0
                *json_rpc_responses_rx = json_rpc_responses;
181
0
            }
182
183
0
            unsafe {
184
0
                bindings::chain_initialized(outer_chain_id_u32, 0, 0);
185
0
            }
186
187
0
        },
188
0
    );
189
0
190
0
    outer_chain_id_u32
191
0
}
192
193
0
fn remove_chain(chain_id: u32) {
194
0
    let mut client_lock = CLIENT.try_lock().unwrap();
195
0
196
0
    match client_lock
197
0
        .chains
198
0
        .remove(usize::try_from(chain_id).unwrap())
199
    {
200
        init::Chain::Created {
201
0
            smoldot_chain_id,
202
0
            json_rpc_responses_rx,
203
            ..
204
        } => {
205
            // We've polled the JSON-RPC receiver with a waker that calls
206
            // `json_rpc_responses_non_empty`. Once the sender is destroyed, this waker will be
207
            // called in order to inform of the destruction. We don't want that to happen.
208
            // Therefore, we poll the receiver again with a dummy "no-op" waker for the sole
209
            // purpose of erasing the previously-registered waker.
210
0
            if let Some(mut json_rpc_responses_rx) = json_rpc_responses_rx {
211
0
                let _ = Pin::new(&mut json_rpc_responses_rx).poll_next(
212
0
                    &mut task::Context::from_waker(futures_util::task::noop_waker_ref()),
213
0
                );
214
0
            }
215
216
0
            let () = client_lock.smoldot.remove_chain(smoldot_chain_id);
217
        }
218
0
        init::Chain::Initializing => {} // TODO: /!\
219
    }
220
0
}
221
222
0
fn json_rpc_send(json_rpc_request: Vec<u8>, chain_id: u32) -> u32 {
223
0
    // As mentioned in the documentation, the bytes *must* be valid UTF-8.
224
0
    let json_rpc_request: String = String::from_utf8(json_rpc_request)
225
0
        .unwrap_or_else(|_| panic!("non-UTF-8 JSON-RPC request"));
226
0
227
0
    let mut client_lock = CLIENT.try_lock().unwrap();
228
0
    let client_chain_id = match client_lock
229
0
        .chains
230
0
        .get(usize::try_from(chain_id).unwrap())
231
0
        .unwrap()
232
    {
233
        init::Chain::Created {
234
0
            smoldot_chain_id, ..
235
0
        } => *smoldot_chain_id,
236
0
        init::Chain::Initializing => panic!(), // Forbidden.
237
    };
238
239
0
    match client_lock
240
0
        .smoldot
241
0
        .json_rpc_request(json_rpc_request, client_chain_id)
242
    {
243
0
        Ok(()) => 0,
244
0
        Err(HandleRpcError::TooManyPendingRequests { .. }) => 1,
245
    }
246
0
}
247
248
0
fn json_rpc_responses_peek(chain_id: u32) -> u32 {
249
0
    let mut client_lock = CLIENT.try_lock().unwrap();
250
0
    match client_lock
251
0
        .chains
252
0
        .get_mut(usize::try_from(chain_id).unwrap())
253
0
        .unwrap()
254
    {
255
        init::Chain::Created {
256
0
            json_rpc_response,
257
0
            json_rpc_responses_rx,
258
0
            json_rpc_response_info,
259
0
            ..
260
0
        } => {
261
0
            if json_rpc_response.is_none() {
262
0
                if let Some(json_rpc_responses_rx) = json_rpc_responses_rx.as_mut() {
263
                    loop {
264
0
                        match Pin::new(&mut *json_rpc_responses_rx).poll_next(
265
0
                            &mut task::Context::from_waker(
266
0
                                &Arc::new(JsonRpcResponsesNonEmptyWaker { chain_id }).into(),
267
0
                            ),
268
0
                        ) {
269
0
                            task::Poll::Ready(Some(response)) if response.is_empty() => {
270
0
                                // The API of `json_rpc_responses_peek` says that a length of 0
271
0
                                // indicates that the queue is empty. For this reason, we skip
272
0
                                // this response.
273
0
                                // This is a pretty niche situation, but at least we handle it
274
0
                                // properly.
275
0
                            }
276
0
                            task::Poll::Ready(Some(response)) => {
277
0
                                debug_assert!(!response.is_empty());
278
0
                                *json_rpc_response = Some(response);
279
0
                                break;
280
                            }
281
0
                            task::Poll::Ready(None) => unreachable!(),
282
0
                            task::Poll::Pending => break,
283
                        }
284
                    }
285
0
                }
286
0
            }
287
288
            // Note that we might be returning the last item in the queue. In principle, this means
289
            // that the next time an entry is added to the queue, `json_rpc_responses_non_empty`
290
            // should be called. Due to the way the implementation works, this will not happen
291
            // until the user calls `json_rpc_responses_peek`. However, this is not a problem:
292
            // it is impossible for the user to observe that the queue is empty, and as such there
293
            // is simply not correct implementation of the API that can't work because of this
294
            // property.
295
296
0
            match &json_rpc_response {
297
0
                Some(rp) => {
298
0
                    debug_assert!(!rp.is_empty());
299
0
                    json_rpc_response_info.ptr = rp.as_bytes().as_ptr() as u32;
300
0
                    json_rpc_response_info.len = rp.as_bytes().len() as u32;
301
                }
302
0
                None => {
303
0
                    json_rpc_response_info.ptr = 0;
304
0
                    json_rpc_response_info.len = 0;
305
0
                }
306
            }
307
308
0
            (&**json_rpc_response_info) as *const bindings::JsonRpcResponseInfo as usize as u32
309
        }
310
0
        _ => panic!(),
311
    }
312
0
}
313
314
0
fn json_rpc_responses_pop(chain_id: u32) {
315
0
    let mut client_lock = CLIENT.try_lock().unwrap();
316
0
    match client_lock
317
0
        .chains
318
0
        .get_mut(usize::try_from(chain_id).unwrap())
319
0
        .unwrap()
320
    {
321
        init::Chain::Created {
322
0
            json_rpc_response, ..
323
0
        } => *json_rpc_response = None,
324
0
        init::Chain::Initializing => panic!(), // Forbidden.
325
    }
326
0
}
327
328
struct JsonRpcResponsesNonEmptyWaker {
329
    chain_id: u32,
330
}
331
332
impl alloc::task::Wake for JsonRpcResponsesNonEmptyWaker {
333
0
    fn wake(self: Arc<Self>) {
334
0
        unsafe { bindings::json_rpc_responses_non_empty(self.chain_id) }
335
0
    }
336
}
337
338
/// List of light tasks waiting to be executed.
339
static TASKS_QUEUE: crossbeam_queue::SegQueue<async_task::Runnable> =
340
    crossbeam_queue::SegQueue::new();
341
342
0
fn advance_execution() {
343
    // This function executes one task then returns. This ensures that the Wasm doesn't use up
344
    // all the available CPU of the host.
345
346
0
    let Some(runnable) = TASKS_QUEUE.pop() else {
347
0
        return;
348
    };
349
350
0
    runnable.run();
351
0
352
0
    if !TASKS_QUEUE.is_empty() {
353
0
        unsafe {
354
0
            bindings::advance_execution_ready();
355
0
        }
356
0
    }
357
0
}