Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/light-base/src/transactions_service.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
//! Background transactions service.
19
//!
20
//! The role of the [`TransactionsService`] is to manage the transactions that the user wants to
21
//! send out, and report about their status.
22
//!
23
//! The [`TransactionsService`] is most of the time idle. When the user wants to emit a
24
//! transaction on the network, it gets reported to the service, which then tries to send it to
25
//! the peers the node is currently connected to. Afterwards, the service will inspect the stream
26
//! of best and finalized blocks to find out whether the transaction has been included or not.
27
//!
28
//! # How watching transactions works
29
//!
30
//! Calling [`TransactionsService::submit_transaction`] returns a channel receiver that will contain
31
//! status updates about this transaction.
32
//!
33
//! In order to implement this, the [`TransactionsService`] will follow all the blocks that are
34
//! verified locally by the [`runtime_service::RuntimeService`] (see
35
//! [`runtime_service::RuntimeService::subscribe_all`]) and download from the network the body of
36
//! all the blocks in the best chain.
37
//!
38
//! When a block body download fails, it is ignored, in the hopes that the block will not be part
39
//! of the finalized chain. If the block body download of a finalized block fails, we enter "panic
40
//! mode" (not an actual Rust panic, just a way to describe the logic) and all watched
41
//! transactions are dropped.
42
//!
43
//! The same "panic mode" happens if there's an accidental gap in the chain, which will typically
44
//! happen if the [`runtime_service::RuntimeService`] is overwhelmed.
45
//!
46
//! If the channel returned by [`TransactionsService::submit_transaction`] is full, it will
47
//! automatically be closed so as to not block the transactions service if the receive is too slow
48
//! to be processed.
49
//!
50
//! # About duplicate unsigned transactions
51
//!
52
//! The Substrate and Polkadot runtimes support nonce-less unsigned transactions. In other words,
53
//! a user can submit the same transaction (the exact same bytes every time) as many time as they
54
//! want.
55
//!
56
//! While the chain can accept the same transaction multiple times over time, a Substrate node
57
//! will only allow submitting it *once at a time*. In other words, any given unsigned transaction
58
//! will never be included more than once in any given block. If you try to submit an unsigned
59
//! transaction while the same transaction is already pending, the Substrate node will ignore it
60
//! or return an error.
61
//!
62
//! Contrary to Substrate, the smoldot Wasm client can be used by multiple UIs at the same time.
63
//! When a UI submits an unsigned transaction, we don't want to do the same as Substrate and
64
//! refuse it if it is already pending, as it would make it possible for a UI to determine
65
//! whether another UI has already submitted this transaction, and thus allow communications
66
//! between UIs. Instead, the smoldot Wasm client return another sender to the same already-pending
67
//! transaction.
68
//!
69
70
use crate::{log, network_service, platform::PlatformRef, runtime_service, sync_service};
71
72
use alloc::{
73
    borrow::ToOwned as _,
74
    boxed::Box,
75
    format,
76
    string::{String, ToString as _},
77
    sync::Arc,
78
    vec::Vec,
79
};
80
use core::{
81
    cmp, iter,
82
    num::{NonZeroU32, NonZeroUsize},
83
    pin,
84
    time::Duration,
85
};
86
use futures_channel::oneshot;
87
use futures_lite::FutureExt as _;
88
use futures_util::stream::FuturesUnordered;
89
use futures_util::{future, FutureExt as _, StreamExt as _};
90
use itertools::Itertools as _;
91
use smoldot::{
92
    header,
93
    informant::HashDisplay,
94
    libp2p::peer_id::PeerId,
95
    network::codec,
96
    transactions::{light_pool, validate},
97
};
98
99
/// Configuration for a [`TransactionsService`].
100
pub struct Config<TPlat: PlatformRef> {
101
    /// Name of the chain, for logging purposes.
102
    ///
103
    /// > **Note**: This name will be directly printed out. Any special character should already
104
    /// >           have been filtered out from this name.
105
    pub log_name: String,
106
107
    /// Access to the platform's capabilities.
108
    pub platform: TPlat,
109
110
    /// Service responsible for synchronizing the chain.
111
    pub sync_service: Arc<sync_service::SyncService<TPlat>>,
112
113
    /// Service responsible for synchronizing the chain.
114
    pub runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
115
116
    /// Access to the network, and identifier of the chain to use to gossip transactions from the
117
    /// point of view of the network service.
118
    pub network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
119
120
    /// Maximum number of pending transactions allowed in the service.
121
    ///
122
    /// Any extra transaction will lead to [`DropReason::MaxPendingTransactionsReached`].
123
    pub max_pending_transactions: NonZeroU32,
124
125
    /// Maximum number of block body downloads that can be performed in parallel.
126
    ///
127
    /// > **Note**: This is the maximum number of *blocks* whose body is being download, not the
128
    /// >           number of block requests emitted on the network.
129
    pub max_concurrent_downloads: NonZeroU32,
130
131
    /// Maximum number of transaction validations that can be performed in parallel.
132
    pub max_concurrent_validations: NonZeroU32,
133
}
134
135
/// See [the module-level documentation](..).
136
pub struct TransactionsService<TPlat: PlatformRef> {
137
    /// Sending messages to the background task.
138
    to_background: async_lock::Mutex<async_channel::Sender<ToBackground>>,
139
140
    /// Configuration of the background task. Used in order to restart it if necessary.
141
    background_task_config: BackgroundTaskConfig<TPlat>,
142
}
143
144
impl<TPlat: PlatformRef> TransactionsService<TPlat> {
145
    /// Builds a new service.
146
0
    pub fn new(config: Config<TPlat>) -> Self {
147
0
        let log_target = format!("tx-service-{}", config.log_name);
148
0
        let (to_background, from_foreground) = async_channel::bounded(8);
149
0
150
0
        let background_task_config = BackgroundTaskConfig {
151
0
            log_target: log_target.clone(),
152
0
            platform: config.platform.clone(),
153
0
            sync_service: config.sync_service,
154
0
            runtime_service: config.runtime_service,
155
0
            network_service: config.network_service,
156
0
            max_concurrent_downloads: usize::try_from(config.max_concurrent_downloads.get())
157
0
                .unwrap_or(usize::MAX),
158
0
            max_pending_transactions: usize::try_from(config.max_pending_transactions.get())
159
0
                .unwrap_or(usize::MAX),
160
0
            max_concurrent_validations: usize::try_from(config.max_concurrent_validations.get())
161
0
                .unwrap_or(usize::MAX),
162
0
        };
163
0
164
0
        let task = Box::pin(background_task::<TPlat>(
165
0
            background_task_config.clone(),
166
0
            from_foreground,
167
0
        ));
168
0
169
0
        config.platform.spawn_task(log_target.clone().into(), {
170
0
            let platform = config.platform.clone();
171
0
            async move {
172
0
                task.await;
173
0
                log!(&platform, Debug, &log_target, "shutdown");
174
0
            }
Unexecuted instantiation: _RNCNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE3new0B6_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE3new0B1q_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE3new0B6_
175
0
        });
176
0
177
0
        TransactionsService {
178
0
            to_background: async_lock::Mutex::new(to_background),
179
0
            background_task_config,
180
0
        }
181
0
    }
Unexecuted instantiation: _RNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE3newB4_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE3newB1o_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE3newB4_
182
183
    /// Adds a transaction to the service. The service will try to send it out as soon as
184
    /// possible.
185
    ///
186
    /// Must pass as parameter the SCALE-encoded transaction.
187
    ///
188
    /// The return value of this method is an object receives updates on the state of the
189
    /// transaction.
190
    ///
191
    /// If `detached` is `true`, then dropping the value returned does not cancel sending out
192
    /// the transaction. If `detached` is `false`, then it does.
193
    ///
194
    /// If this exact same transaction has already been submitted before, the transaction isn't
195
    /// added a second time. Instead, a second channel is created pointing to the already-existing
196
    /// transaction.
197
0
    pub async fn submit_and_watch_transaction(
198
0
        &self,
199
0
        transaction_bytes: Vec<u8>,
200
0
        channel_size: usize,
201
0
        detached: bool,
202
0
    ) -> TransactionWatcher {
Unexecuted instantiation: _RNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE28submit_and_watch_transactionB4_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE28submit_and_watch_transactionB1o_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE28submit_and_watch_transactionB4_
203
0
        let (updates_report, rx) = async_channel::bounded(channel_size);
204
0
205
0
        self.send_to_background(ToBackground::SubmitTransaction {
206
0
            transaction_bytes,
207
0
            updates_report: Some((updates_report, detached)),
208
0
        })
209
0
        .await;
210
211
        TransactionWatcher {
212
0
            rx,
213
0
            has_yielded_drop_reason: false,
214
0
            _dummy_keep_alive: self.to_background.lock().await.clone(),
215
0
        }
216
0
    }
Unexecuted instantiation: _RNCNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE28submit_and_watch_transaction0B6_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE28submit_and_watch_transaction0B1q_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE28submit_and_watch_transaction0B6_
217
218
    /// Similar to [`TransactionsService::submit_and_watch_transaction`], but doesn't return any
219
    /// channel.
220
0
    pub async fn submit_transaction(&self, transaction_bytes: Vec<u8>) {
Unexecuted instantiation: _RNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE18submit_transactionB4_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE18submit_transactionB1o_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE18submit_transactionB4_
221
0
        self.send_to_background(ToBackground::SubmitTransaction {
222
0
            transaction_bytes,
223
0
            updates_report: None,
224
0
        })
225
0
        .await;
226
0
    }
Unexecuted instantiation: _RNCNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE18submit_transaction0B6_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE18submit_transaction0B1q_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE18submit_transaction0B6_
227
228
0
    async fn send_to_background(&self, message: ToBackground) {
Unexecuted instantiation: _RNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE18send_to_backgroundB4_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE18send_to_backgroundB1o_
Unexecuted instantiation: _RNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB2_19TransactionsServicepE18send_to_backgroundB4_
229
0
        let mut lock = self.to_background.lock().await;
230
231
0
        if lock.is_closed() {
232
0
            let log_target = self.background_task_config.log_target.clone();
233
0
            let (tx, rx) = async_channel::bounded(8);
234
0
            let platform = self.background_task_config.platform.clone();
235
0
            let task = background_task(self.background_task_config.clone(), rx);
236
0
            self.background_task_config.platform.spawn_task(
237
0
                log_target.clone().into(),
238
0
                async move {
239
0
                    // Sleep for a bit in order to avoid potential infinite loops
240
0
                    // of repeated crashing.
241
0
                    platform.sleep(Duration::from_secs(2)).await;
242
0
                    log!(&platform, Debug, &log_target, "restart");
243
0
                    task.await;
244
0
                    log!(&platform, Debug, &log_target, "shutdown");
245
0
                },
Unexecuted instantiation: _RNCNCNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB6_19TransactionsServicepE18send_to_background00B8_
Unexecuted instantiation: _RNCNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB6_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE18send_to_background00B1s_
Unexecuted instantiation: _RNCNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB6_19TransactionsServicepE18send_to_background00B8_
246
0
            );
247
0
            *lock = tx;
248
0
        }
249
250
        // Note that the background task might have crashed already at this point, so errors can
251
        // be expected.
252
0
        let _ = lock.send(message).await;
253
0
    }
Unexecuted instantiation: _RNCNvMNtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE18send_to_background0B6_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServiceNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE18send_to_background0B1q_
Unexecuted instantiation: _RNCNvMNtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB4_19TransactionsServicepE18send_to_background0B6_
254
}
255
256
/// Returned by [`TransactionsService::submit_and_watch_transaction`].
257
0
#[pin_project::pin_project]
Unexecuted instantiation: _RNvMNvNtCsiGub1lfKphe_13smoldot_light20transactions_service1__NtB4_18TransactionWatcher7project
Unexecuted instantiation: _RNvMNvNtCsiGub1lfKphe_13smoldot_light20transactions_service1__NtB4_18TransactionWatcher11project_ref
Unexecuted instantiation: _RNvNvNtCsiGub1lfKphe_13smoldot_light20transactions_service1__24___assert_not_repr_packed
Unexecuted instantiation: _RNvXs3_NvNtCsiGub1lfKphe_13smoldot_light20transactions_service1__NtB7_18TransactionWatcherNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop
Unexecuted instantiation: _RNvMNvNtCsih6EgvAwZF2_13smoldot_light20transactions_service1__NtB4_18TransactionWatcher7project
Unexecuted instantiation: _RNvMNvNtCsih6EgvAwZF2_13smoldot_light20transactions_service1__NtB4_18TransactionWatcher11project_ref
Unexecuted instantiation: _RNvNvNtCsih6EgvAwZF2_13smoldot_light20transactions_service1__24___assert_not_repr_packed
Unexecuted instantiation: _RNvXs3_NvNtCsih6EgvAwZF2_13smoldot_light20transactions_service1__NtB7_18TransactionWatcherNtNtCs5f0qqrr6ZYa_11pin_project9___private10PinnedDrop4drop
258
pub struct TransactionWatcher {
259
    /// Channel connected to the background task.
260
    #[pin]
261
    rx: async_channel::Receiver<TransactionStatus>,
262
    /// `true` if a [`TransactionStatus::Dropped`] has already been yielded.
263
    has_yielded_drop_reason: bool,
264
    /// Dummy copy of [`TransactionsService::to_background`] that guarantees that the background
265
    /// stays alive.
266
    _dummy_keep_alive: async_channel::Sender<ToBackground>,
267
}
268
269
impl TransactionWatcher {
270
    /// Returns the next status update of the transaction.
271
    ///
272
    /// The last event is always a [`TransactionStatus::Dropped`], and then `None` is yielded
273
    /// repeatedly forever.
274
0
    pub async fn next(self: pin::Pin<&mut Self>) -> Option<TransactionStatus> {
Unexecuted instantiation: _RNvMs_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceNtB4_18TransactionWatcher4next
Unexecuted instantiation: _RNvMs_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceNtB4_18TransactionWatcher4next
275
0
        let mut this = self.project();
276
0
        if *this.has_yielded_drop_reason {
277
0
            debug_assert!(this.rx.is_closed() || this.rx.next().await.is_none());
278
0
            return None;
279
0
        }
280
0
281
0
        match this.rx.next().await {
282
0
            Some(update) => {
283
0
                if matches!(update, TransactionStatus::Dropped(_)) {
284
0
                    debug_assert!(!*this.has_yielded_drop_reason);
285
0
                    *this.has_yielded_drop_reason = true;
286
0
                }
287
0
                Some(update)
288
            }
289
            None => {
290
0
                *this.has_yielded_drop_reason = true;
291
0
                Some(TransactionStatus::Dropped(DropReason::Crashed))
292
            }
293
        }
294
0
    }
Unexecuted instantiation: _RNCNvMs_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceNtB6_18TransactionWatcher4next0B8_
Unexecuted instantiation: _RNCNvMs_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceNtB6_18TransactionWatcher4next0CsDDUKWWCHAU_18smoldot_light_wasm
Unexecuted instantiation: _RNCNvMs_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceNtB6_18TransactionWatcher4next0B8_
295
}
296
297
/// Update on the state of a transaction in the service.
298
///
299
/// > **Note**: Because this code isn't an *actual* transactions pool that leverages the runtime,
300
/// >           some variants (e.g. `Invalid`) are missing compared to the ones that can be found
301
/// >           in Substrate, as they can't possibly be generated by this implementation.
302
/// >           Additionally, an equivalent to the `Ready` state in Substrate is missing as it
303
/// >           is the default state.
304
#[derive(Debug, Clone)]
305
pub enum TransactionStatus {
306
    /// Transaction has been broadcasted to the given peers.
307
    Broadcast(Vec<PeerId>),
308
309
    /// Transaction is now known to be valid. If it ever becomes invalid in the future, a
310
    /// [`TransactionStatus::Dropped`] will be generated.
311
    Validated,
312
313
    /// The block in which a block is included has changed.
314
    IncludedBlockUpdate {
315
        /// If `Some`, the transaction is included in the block of the best chain with the given
316
        /// hash and at the given index. If `None`, the transaction isn't present in the best
317
        /// chain.
318
        block_hash: Option<([u8; 32], u32)>,
319
    },
320
321
    /// Transaction has been removed from the pool.
322
    ///
323
    /// This is always the last message sent back by the channel reporting the status.
324
    Dropped(DropReason),
325
}
326
327
/// See [`TransactionStatus::Dropped`].
328
#[derive(Debug, Clone)]
329
pub enum DropReason {
330
    /// Transaction has been included in a finalized block.
331
    ///
332
    /// This is a success path.
333
    Finalized { block_hash: [u8; 32], index: u32 },
334
335
    /// Transaction has been dropped because there was a gap in the chain of blocks. It is
336
    /// impossible to know.
337
    GapInChain,
338
339
    /// Transaction has been dropped because the maximum number of transactions in the pool has
340
    /// been reached.
341
    MaxPendingTransactionsReached,
342
343
    /// Transaction has been dropped because it is invalid.
344
    Invalid(validate::TransactionValidityError),
345
346
    /// Transaction has been dropped because we have failed to validate it.
347
    ValidateError(ValidateTransactionError),
348
349
    /// Transaction service background task has crashed.
350
    Crashed,
351
}
352
353
/// Failed to check the validity of a transaction.
354
0
#[derive(Debug, derive_more::Display, Clone)]
Unexecuted instantiation: _RNvXs7_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceNtB5_24ValidateTransactionErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
Unexecuted instantiation: _RNvXs7_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceNtB5_24ValidateTransactionErrorNtNtCsaYZPK01V26L_4core3fmt7Display3fmt
355
pub enum ValidateTransactionError {
356
    /// The runtime of the requested block is invalid.
357
    InvalidRuntime(runtime_service::RuntimeError),
358
359
    /// The runtime doesn't implement the API required to validate transactions.
360
    ApiVersionRequirementUnfulfilled,
361
362
    /// Runtime service has crashed while the call was in progress.
363
    Crash,
364
365
    /// Error during the execution of the runtime.
366
    ///
367
    /// There is no point in trying to validate the transaction call again, as it would result
368
    /// in the same error.
369
    #[display(fmt = "Error during the execution of the runtime: {_0}")]
370
    Execution(runtime_service::RuntimeCallExecutionError),
371
372
    /// Error trying to access the storage required for the runtime call.
373
    ///
374
    /// Because these errors are non-fatal, the operation is attempted multiple times, and as such
375
    /// there can be multiple errors.
376
    ///
377
    /// Trying the same transaction again might succeed.
378
    #[display(fmt = "Error trying to access the storage required for the runtime call")]
379
    // TODO: better display?
380
    Inaccessible(Vec<runtime_service::RuntimeCallInaccessibleError>),
381
382
    /// Error while decoding the output of the runtime.
383
    OutputDecodeError(validate::DecodeError),
384
}
385
386
#[derive(Debug, Clone)]
387
enum InvalidOrError {
388
    Invalid(validate::TransactionValidityError),
389
    ValidateError(ValidateTransactionError),
390
}
391
392
#[derive(Debug, Clone)]
393
enum ValidationError {
394
    InvalidOrError(InvalidOrError),
395
    ObsoleteSubscription,
396
}
397
398
/// Message sent from the foreground service to the background.
399
enum ToBackground {
400
    SubmitTransaction {
401
        transaction_bytes: Vec<u8>,
402
        updates_report: Option<(async_channel::Sender<TransactionStatus>, bool)>,
403
    },
404
}
405
406
/// Configuration for [`background_task`].
407
#[derive(Clone)]
408
struct BackgroundTaskConfig<TPlat: PlatformRef> {
409
    log_target: String,
410
    platform: TPlat,
411
    sync_service: Arc<sync_service::SyncService<TPlat>>,
412
    runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
413
    network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
414
    max_concurrent_downloads: usize,
415
    max_pending_transactions: usize,
416
    max_concurrent_validations: usize,
417
}
418
419
/// Background task running in parallel of the front service.
420
0
async fn background_task<TPlat: PlatformRef>(
421
0
    config: BackgroundTaskConfig<TPlat>,
422
0
    from_foreground: async_channel::Receiver<ToBackground>,
423
0
) {
Unexecuted instantiation: _RINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpEB4_
Unexecuted instantiation: _RINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefEB1e_
Unexecuted instantiation: _RINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpEB4_
424
0
    let transactions_capacity = cmp::min(8, config.max_pending_transactions);
425
0
    let blocks_capacity = 32;
426
0
    let mut from_foreground = pin::pin!(from_foreground);
427
0
428
0
    let mut worker = Worker {
429
0
        platform: config.platform,
430
0
        sync_service: config.sync_service,
431
0
        runtime_service: config.runtime_service,
432
0
        network_service: config.network_service,
433
0
        pending_transactions: light_pool::LightPool::new(light_pool::Config {
434
0
            transactions_capacity,
435
0
            blocks_capacity,
436
0
            finalized_block_hash: [0; 32], // Dummy value. Pool is re-initialized below.
437
0
        }),
438
0
        block_downloads: FuturesUnordered::new(),
439
0
        validations_in_progress: FuturesUnordered::new(),
440
0
        next_reannounce: FuturesUnordered::new(),
441
0
        max_concurrent_downloads: config.max_concurrent_downloads,
442
0
        max_pending_transactions: config.max_pending_transactions,
443
0
    };
444
445
    // TODO: must periodically re-send transactions that aren't included in block yet
446
447
    'channels_rebuild: loop {
448
        // This loop is entered when it is necessary to rebuild the subscriptions with the runtime
449
        // service. This happens when there is a gap in the blocks, either intentionally (e.g.
450
        // after a Grandpa warp sync) or because the transactions service was too busy to process
451
        // the new blocks.
452
0
        let mut subscribe_all = {
453
0
            let sub_future = async {
454
0
                Some(
455
0
                    // The buffer size should be large enough so that, if the CPU is busy, it
456
0
                    // doesn't become full before the execution of the transactions service resumes.
457
0
                    // The maximum number of pinned block is ignored, as this maximum is a way to
458
0
                    // avoid malicious behaviors. This code is by definition not considered
459
0
                    // malicious.
460
0
                    worker
461
0
                        .runtime_service
462
0
                        .subscribe_all(32, NonZeroUsize::new(usize::MAX).unwrap())
463
0
                        .await,
464
                )
465
0
            };
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE00B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE00B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE00B8_
466
467
            // Because `runtime_service.subscribe_all()` might take a long time (potentially
468
            // forever), we need to process messages coming from the foreground in parallel.
469
0
            let from_foreground = &mut from_foreground;
470
0
            let messages_process = async move {
471
                loop {
472
0
                    match from_foreground.next().await {
473
                        Some(ToBackground::SubmitTransaction {
474
0
                            updates_report: Some(updates_report),
475
0
                            ..
476
0
                        }) => {
477
0
                            let _ = updates_report
478
0
                                .0
479
0
                                .send(TransactionStatus::Dropped(DropReason::GapInChain))
480
0
                                .await;
481
                        }
482
0
                        Some(ToBackground::SubmitTransaction { .. }) => {}
483
0
                        None => break None,
484
0
                    }
485
0
                }
486
0
            };
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s_0B8_
487
488
0
            match sub_future.or(messages_process).await {
489
0
                Some(s) => s,
490
0
                None => return,
491
            }
492
        };
493
494
0
        let initial_finalized_block_hash = header::hash_from_scale_encoded_header(
495
0
            &subscribe_all.finalized_block_scale_encoded_header,
496
0
        );
497
498
        // Drop all pending transactions of the pool.
499
0
        for (_, pending) in worker.pending_transactions.transactions_iter_mut() {
500
0
            // TODO: only do this if transaction hasn't been validated yet
501
0
            pending.update_status(TransactionStatus::Dropped(DropReason::GapInChain));
502
0
        }
503
504
        // Reset the blocks tracking state machine.
505
0
        let dropped_transactions = worker
506
0
            .pending_transactions
507
0
            .transactions_iter()
508
0
            .map(|(tx_id, _)| {
509
0
                HashDisplay(worker.pending_transactions.scale_encoding(tx_id).unwrap())
510
0
            })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s0_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s0_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s0_0B8_
511
0
            .join(",");
512
0
        worker.pending_transactions = light_pool::LightPool::new(light_pool::Config {
513
0
            transactions_capacity,
514
0
            blocks_capacity,
515
0
            finalized_block_hash: initial_finalized_block_hash,
516
0
        });
517
518
0
        for block in subscribe_all.non_finalized_blocks_ancestry_order {
519
0
            let hash = header::hash_from_scale_encoded_header(&block.scale_encoded_header);
520
0
            worker.pending_transactions.add_block(
521
0
                hash,
522
0
                &block.parent_hash,
523
0
                Block {
524
0
                    scale_encoded_header: block.scale_encoded_header,
525
0
                    failed_downloads: 0,
526
0
                    downloading: false,
527
0
                },
528
0
            );
529
0
            if block.is_new_best {
530
0
                worker.set_best_block(&config.log_target, &hash);
531
0
            }
532
        }
533
534
        // Reset the other fields.
535
0
        worker.block_downloads.clear();
536
0
        worker.validations_in_progress.clear();
537
0
        worker.next_reannounce.clear();
538
0
539
0
        log!(
540
0
            &worker.platform,
541
0
            Debug,
542
0
            &config.log_target,
543
0
            "reset",
544
0
            new_finalized = HashDisplay(&initial_finalized_block_hash),
545
0
            dropped_transactions
546
0
        );
547
548
0
        loop {
549
0
            // If the finalized block moved in such a way that there would be blocks in the
550
0
            // pool whose height is inferior to `latest_finalized - 32`, then jump to
551
0
            // "catastrophic mode" and reset everything. This is to avoid the possibility of an
552
0
            // unreasonable memory consumption.
553
0
            if worker.pending_transactions.oldest_block_finality_lag() >= 32 {
554
0
                continue 'channels_rebuild;
555
0
            }
556
557
            // Try to find transactions whose status update channels have all been closed.
558
0
            while let Some(tx_id) = {
559
0
                let id = worker
560
0
                    .pending_transactions
561
0
                    .transactions_iter()
562
0
                    .find(|(_, tx)| {
563
0
                        !tx.status_update.iter().any(|s| !s.is_closed()) && !tx.detached
Unexecuted instantiation: _RNCNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s1_00Ba_
Unexecuted instantiation: _RNCNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_00B1k_
Unexecuted instantiation: _RNCNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s1_00Ba_
564
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s1_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s1_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s1_0B8_
565
0
                    .map(|(id, _)| id);
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s2_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s2_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s2_0B8_
566
0
                id
567
0
            } {
568
0
                worker.pending_transactions.remove_transaction(tx_id);
569
0
            }
570
571
            // Start the validation process of transactions that need to be validated.
572
0
            while worker.validations_in_progress.len() < config.max_concurrent_validations {
573
                // Find a transaction that needs to be validated.
574
                //
575
                // While this is an `O(n)` process, in practice we pick the first transaction not
576
                // currently being validated, and only `max_concurrent_validations` transactions
577
                // in the list don't match that criteria. Since `max_concurrent_validations`
578
                // should be pretty low, this search should complete very quickly.
579
0
                let to_start_validate = worker
580
0
                    .pending_transactions
581
0
                    .unvalidated_transactions()
582
0
                    .find(|(_, tx)| tx.validation_in_progress.is_none())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s3_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s3_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s3_0B8_
583
0
                    .map(|(tx_id, ..)| tx_id);
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s4_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s4_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s4_0B8_
584
0
                let to_start_validate = match to_start_validate {
585
0
                    Some(tx_id) => tx_id,
586
0
                    None => break,
587
                };
588
589
                // Create the `Future` of the validation process.
590
0
                let validation_future = {
591
                    // Find which block to validate the transaction against.
592
0
                    let block_hash = *worker.pending_transactions.best_block_hash();
593
594
                    // It is possible for the current best block to be equal to the finalized
595
                    // block, in which case it will not be in the data structure and will already
596
                    // be unpinned in the runtime service.
597
                    // In that situation, we simply don't start any validation.
598
                    // TODO: is this problem worth solving? ^
599
0
                    let scale_encoded_header =
600
0
                        match worker.pending_transactions.block_user_data(&block_hash) {
601
0
                            Some(b) => b.scale_encoded_header.clone(),
602
0
                            None => break,
603
                        };
604
605
                    // Make copies of everything in order to move the values into the future.
606
0
                    let runtime_service = worker.runtime_service.clone();
607
0
                    let platform = worker.platform.clone();
608
0
                    let log_target = config.log_target.clone();
609
0
                    let relay_chain_sync_subscription_id = subscribe_all.new_blocks.id();
610
0
                    let scale_encoded_transaction = worker
611
0
                        .pending_transactions
612
0
                        .scale_encoding(to_start_validate)
613
0
                        .unwrap()
614
0
                        .to_owned();
615
                    // TODO: race condition /!\ the block could be pruned and unpinned before this future starts executing
616
0
                    async move {
617
0
                        let result = validate_transaction(
618
0
                            &platform,
619
0
                            &log_target,
620
0
                            &runtime_service,
621
0
                            relay_chain_sync_subscription_id,
622
0
                            block_hash,
623
0
                            &scale_encoded_header,
624
0
                            scale_encoded_transaction,
625
0
                            validate::TransactionSource::External,
626
0
                        )
627
0
                        .await;
628
0
                        (block_hash, result)
629
0
                    }
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s5_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s5_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s5_0B8_
630
                };
631
632
                // The future that will receive the validation result is stored in the
633
                // `PendingTransaction`, while the future that executes the validation (and
634
                // yields `()`) is stored in `validations_in_progress`.
635
0
                let (result_tx, result_rx) = oneshot::channel();
636
0
                worker
637
0
                    .validations_in_progress
638
0
                    .push(Box::pin(validation_future.map(move |result| {
639
0
                        let _ = result_tx.send(result);
640
0
                        to_start_validate
641
0
                    })));
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s6_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s6_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s6_0B8_
642
0
                let tx = worker
643
0
                    .pending_transactions
644
0
                    .transaction_user_data_mut(to_start_validate)
645
0
                    .unwrap();
646
0
                debug_assert!(tx.validation_in_progress.is_none());
647
0
                tx.validation_in_progress = Some(result_rx);
648
            }
649
650
            // Remove transactions that have been determined to be invalid.
651
            loop {
652
                // Note that we really would like to use a `while let` loop, but the Rust borrow
653
                // checker doesn't permit it.
654
0
                let (tx_id, _, error) = match worker
655
0
                    .pending_transactions
656
0
                    .invalid_transactions_finalized_block()
657
0
                    .next()
658
                {
659
0
                    Some(v) => v,
660
0
                    None => break,
661
                };
662
663
                // Clone the error because we need to unborrow `worker.pending_transactions`.
664
0
                let error = error.clone();
665
0
666
0
                let (tx_body, mut transaction) =
667
0
                    worker.pending_transactions.remove_transaction(tx_id);
668
0
669
0
                log!(
670
0
                    &worker.platform,
671
0
                    Debug,
672
0
                    &config.log_target,
673
0
                    "discarded",
674
0
                    tx_hash = HashDisplay(&blake2_hash(&tx_body)),
675
0
                    ?error
676
0
                );
677
0
678
0
                transaction.update_status(TransactionStatus::Dropped(match error {
679
0
                    InvalidOrError::Invalid(err) => DropReason::Invalid(err),
680
0
                    InvalidOrError::ValidateError(err) => DropReason::ValidateError(err),
681
                }));
682
            }
683
684
            // Start block bodies downloads that need to be started.
685
0
            while worker.block_downloads.len() < worker.max_concurrent_downloads {
686
                // TODO: prioritize best chain?
687
0
                let block_hash_number = worker
688
0
                    .pending_transactions
689
0
                    .missing_block_bodies()
690
0
                    .find(|(_, block)| {
691
0
                        // The transaction pool isn't aware of the fact that we're currently
692
0
                        // downloading a block's body. Skip when that is the case.
693
0
                        if block.downloading {
694
0
                            return false;
695
0
                        }
696
0
697
0
                        // Don't try again block downloads that have failed before.
698
0
                        if block.failed_downloads >= 1 {
699
                            // TODO: try downloading again if finalized or best chain
700
0
                            return false;
701
0
                        }
702
0
703
0
                        true
704
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s7_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s7_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s7_0B8_
705
0
                    .map(|(hash, block)| {
706
0
                        // TODO: unwrap?! should only insert valid blocks in the worker
707
0
                        let decoded = header::decode(
708
0
                            &block.scale_encoded_header,
709
0
                            worker.sync_service.block_number_bytes(),
710
0
                        )
711
0
                        .unwrap();
712
0
                        (*hash, decoded.number)
713
0
                    });
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s8_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s8_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s8_0B8_
714
0
                let (block_hash, block_number) = match block_hash_number {
715
0
                    Some(b) => b,
716
0
                    None => break,
717
                };
718
719
                // Actual download start.
720
0
                worker.block_downloads.push({
721
0
                    let download_future = worker.sync_service.clone().block_query(
722
0
                        block_number,
723
0
                        block_hash,
724
0
                        codec::BlocksRequestFields {
725
0
                            body: true,
726
0
                            header: true, // TODO: must be true in order to avoid an error being generated, fix this in sync service
727
0
                            justifications: false,
728
0
                        },
729
0
                        3,
730
0
                        Duration::from_secs(8),
731
0
                        NonZeroU32::new(3).unwrap(),
732
0
                    );
733
0
734
0
                    Box::pin(async move {
735
0
                        (
736
0
                            block_hash,
737
0
                            download_future.await.and_then(|b| b.body.ok_or(())),
Unexecuted instantiation: _RNCNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s9_00Ba_
Unexecuted instantiation: _RNCNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s9_00B1k_
Unexecuted instantiation: _RNCNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s9_00Ba_
738
0
                        )
739
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0s9_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0s9_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0s9_0B8_
740
0
                });
741
0
742
0
                worker
743
0
                    .pending_transactions
744
0
                    .block_user_data_mut(&block_hash)
745
0
                    .unwrap()
746
0
                    .downloading = true;
747
0
748
0
                log!(
749
0
                    &worker.platform,
750
0
                    Debug,
751
0
                    &config.log_target,
752
0
                    "blocks-download-started",
753
0
                    block = HashDisplay(&block_hash)
754
0
                );
755
            }
756
757
            // Remove finalized blocks from the pool when possible.
758
0
            for block in worker.pending_transactions.prune_finalized_with_body() {
759
                // All blocks in `pending_transactions` are pinned within the runtime service.
760
                // Unpin them when they're removed.
761
0
                subscribe_all.new_blocks.unpin_block(block.block_hash).await;
762
763
0
                log!(
764
0
                    &worker.platform,
765
0
                    Debug,
766
0
                    &config.log_target,
767
0
                    "finalized",
768
0
                    block = HashDisplay(&block.block_hash),
769
0
                    body_transactions = block
770
0
                        .included_transactions
771
0
                        .iter()
772
0
                        .map(|tx| HashDisplay(&blake2_hash(&tx.scale_encoding)).to_string())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sj_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sj_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sj_0B8_
773
0
                        .join(", ")
774
0
                );
775
0
776
0
                debug_assert!(!block.user_data.downloading);
777
0
                for mut tx in block.included_transactions {
778
0
                    // We assume that there's no more than 2<<32 transactions per block.
779
0
                    let body_index = u32::try_from(tx.index_in_block).unwrap();
780
0
                    tx.user_data
781
0
                        .update_status(TransactionStatus::Dropped(DropReason::Finalized {
782
0
                            block_hash: block.block_hash,
783
0
                            index: body_index,
784
0
                        }));
785
0
                    // `tx` is no longer in the pool.
786
0
                }
787
            }
788
789
            // Yield at every loop in order to provide better tasks granularity.
790
0
            futures_lite::future::yield_now().await;
791
792
            enum WakeUpReason {
793
                Notification(Option<runtime_service::Notification>),
794
                BlockDownloadFinished([u8; 32], Result<Vec<Vec<u8>>, ()>),
795
                MustMaybeReannounce(light_pool::TransactionId),
796
                MaybeValidated(light_pool::TransactionId),
797
                ForegroundMessage(Option<ToBackground>),
798
            }
799
800
0
            let wake_up_reason: WakeUpReason = {
801
0
                async { WakeUpReason::Notification(subscribe_all.new_blocks.next().await) }
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sa_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sa_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sa_0B8_
802
0
                    .or(async {
803
0
                        if !worker.block_downloads.is_empty() {
804
0
                            let (block_hash, result) =
805
0
                                worker.block_downloads.select_next_some().await;
806
0
                            WakeUpReason::BlockDownloadFinished(block_hash, result)
807
                        } else {
808
0
                            future::pending().await
809
                        }
810
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sb_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sb_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sb_0B8_
811
0
                    .or(async {
812
0
                        if !worker.next_reannounce.is_empty() {
813
                            WakeUpReason::MustMaybeReannounce(
814
0
                                worker.next_reannounce.select_next_some().await,
815
                            )
816
                        } else {
817
0
                            future::pending().await
818
                        }
819
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sc_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sc_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sc_0B8_
820
0
                    .or(async {
821
0
                        if !worker.validations_in_progress.is_empty() {
822
                            WakeUpReason::MaybeValidated(
823
0
                                worker.validations_in_progress.select_next_some().await,
824
                            )
825
                        } else {
826
0
                            future::pending().await
827
                        }
828
0
                    })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sd_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sd_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sd_0B8_
829
0
                    .or(async { WakeUpReason::ForegroundMessage(from_foreground.next().await) })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0se_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0se_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0se_0B8_
830
0
                    .await
831
            };
832
833
0
            match wake_up_reason {
834
                WakeUpReason::Notification(Some(runtime_service::Notification::Block(
835
0
                    new_block,
836
0
                ))) => {
837
0
                    let hash =
838
0
                        header::hash_from_scale_encoded_header(&new_block.scale_encoded_header);
839
0
                    worker.pending_transactions.add_block(
840
0
                        header::hash_from_scale_encoded_header(&new_block.scale_encoded_header),
841
0
                        &new_block.parent_hash,
842
0
                        Block {
843
0
                            scale_encoded_header: new_block.scale_encoded_header,
844
0
                            failed_downloads: 0,
845
0
                            downloading: false,
846
0
                        },
847
0
                    );
848
0
                    if new_block.is_new_best {
849
0
                        worker.set_best_block(&config.log_target, &hash);
850
0
                    }
851
                }
852
                WakeUpReason::Notification(Some(runtime_service::Notification::Finalized {
853
0
                    hash,
854
0
                    best_block_hash_if_changed,
855
                    ..
856
                })) => {
857
0
                    if let Some(best_block_hash_if_changed) = best_block_hash_if_changed {
858
0
                        worker.set_best_block(&config.log_target, &best_block_hash_if_changed);
859
0
                    }
860
0
                    for pruned in worker.pending_transactions.set_finalized_block(&hash) {
861
                        // All blocks in `pending_transactions` are pinned within the
862
                        // runtime service. Unpin them when they're removed.
863
0
                        subscribe_all.new_blocks.unpin_block(pruned.0).await;
864
865
                        // Note that we could in principle interrupt any on-going
866
                        // download of that block, but it is not worth the effort.
867
                    }
868
                }
869
                WakeUpReason::Notification(Some(
870
0
                    runtime_service::Notification::BestBlockChanged { hash },
871
0
                )) => {
872
0
                    worker.set_best_block(&config.log_target, &hash);
873
0
                }
874
0
                WakeUpReason::Notification(None) => continue 'channels_rebuild,
875
876
0
                WakeUpReason::BlockDownloadFinished(block_hash, mut block_body) => {
877
                    // A block body download has finished, successfully or not.
878
0
                    let block = match worker.pending_transactions.block_user_data_mut(&block_hash) {
879
0
                        Some(b) => b,
880
                        None => {
881
                            // It is possible that this block has been discarded because a sibling
882
                            // or uncle has been finalized. This is a normal situation.
883
0
                            continue;
884
                        }
885
                    };
886
887
0
                    debug_assert!(block.downloading);
888
0
                    block.downloading = false;
889
890
                    // Make sure that the downloaded body is the one of this block, otherwise
891
                    // we consider the download as failed.
892
0
                    if let Ok(body) = &block_body {
893
                        // TODO: unwrap the decoding?! is that correct?
894
0
                        if header::extrinsics_root(body)
895
0
                            != *header::decode(
896
0
                                &block.scale_encoded_header,
897
0
                                worker.sync_service.block_number_bytes(),
898
0
                            )
899
0
                            .unwrap()
900
0
                            .extrinsics_root
901
0
                        {
902
0
                            block_body = Err(());
903
0
                        }
904
0
                    }
905
906
0
                    if let Ok(block_body) = block_body {
907
0
                        let block_body_size = block_body.len();
908
0
                        let included_transactions = worker
909
0
                            .pending_transactions
910
0
                            .set_block_body(&block_hash, block_body.into_iter())
911
0
                            .collect::<Vec<_>>();
912
0
913
0
                        log!(
914
0
                            &worker.platform,
915
0
                            Debug,
916
0
                            &config.log_target,
917
0
                            "blocks-download-success",
918
0
                            block = HashDisplay(&block_hash),
919
0
                            included_transactions = included_transactions
920
0
                                .iter()
921
0
                                .map(|(id, _)| HashDisplay(&blake2_hash(
922
0
                                    worker.pending_transactions.scale_encoding(*id).unwrap()
923
0
                                ))
924
0
                                .to_string())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sk_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sk_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sk_0B8_
925
0
                                .join(", ")
926
0
                        );
927
928
0
                        for (tx_id, body_index) in included_transactions {
929
0
                            debug_assert!(body_index < block_body_size);
930
0
                            let tx = worker
931
0
                                .pending_transactions
932
0
                                .transaction_user_data_mut(tx_id)
933
0
                                .unwrap();
934
0
                            // We assume that there's no more than 2<<32 transactions per block.
935
0
                            let body_index = u32::try_from(body_index).unwrap();
936
0
                            tx.update_status(TransactionStatus::IncludedBlockUpdate {
937
0
                                block_hash: Some((block_hash, body_index)),
938
0
                            });
939
                        }
940
0
                    } else {
941
0
                        block.failed_downloads = block.failed_downloads.saturating_add(1);
942
0
                        log!(
943
0
                            &worker.platform,
944
0
                            Debug,
945
0
                            &config.log_target,
946
0
                            "blocks-download-failure",
947
0
                            block = HashDisplay(&block_hash)
948
0
                        );
949
0
                    }
950
                }
951
952
0
                WakeUpReason::MustMaybeReannounce(maybe_reannounce_tx_id) => {
953
0
                    // A transaction reannounce future has finished. This doesn't necessarily mean
954
0
                    // that a validation actually needs to be reannounced. The provided
955
0
                    // `maybe_reannounce_tx_id` is a hint as to which transaction might need to be
956
0
                    // reannounced, but without a strong guarantee.
957
0
958
0
                    // `continue` if transaction doesn't exist. False positive.
959
0
                    if worker
960
0
                        .pending_transactions
961
0
                        .transaction_user_data(maybe_reannounce_tx_id)
962
0
                        .is_none()
963
                    {
964
0
                        continue;
965
0
                    }
966
0
967
0
                    // Don't gossip the transaction if it hasn't been validated or is already
968
0
                    // included.
969
0
                    // TODO: if best block changes, we would need to reset all the re-announce period of all transactions, awkward!
970
0
                    // TODO: also, if this is false, then the transaction might never be re-announced ever again
971
0
                    if worker
972
0
                        .pending_transactions
973
0
                        .is_included_best_chain(maybe_reannounce_tx_id)
974
0
                        || !worker
975
0
                            .pending_transactions
976
0
                            .is_valid_against_best_block(maybe_reannounce_tx_id)
977
                    {
978
0
                        continue;
979
0
                    }
980
0
981
0
                    let now = worker.platform.now();
982
0
                    let tx = worker
983
0
                        .pending_transactions
984
0
                        .transaction_user_data_mut(maybe_reannounce_tx_id)
985
0
                        .unwrap();
986
0
                    if tx.when_reannounce > now {
987
0
                        continue;
988
0
                    }
989
0
990
0
                    // TODO: only announce if propagate is true
991
0
992
0
                    // Update transaction state for the next re-announce.
993
0
                    tx.when_reannounce = now + Duration::from_secs(5);
994
0
                    worker.next_reannounce.push({
995
0
                        let platform = worker.platform.clone();
996
0
                        Box::pin(async move {
997
0
                            platform.sleep(Duration::from_secs(5)).await;
998
0
                            maybe_reannounce_tx_id
999
0
                        })
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sf_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sf_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sf_0B8_
1000
0
                    });
1001
1002
                    // Perform the announce.
1003
0
                    let peers_sent = worker
1004
0
                        .network_service
1005
0
                        .clone()
1006
0
                        .announce_transaction(
1007
0
                            worker
1008
0
                                .pending_transactions
1009
0
                                .scale_encoding(maybe_reannounce_tx_id)
1010
0
                                .unwrap(),
1011
0
                        )
1012
0
                        .await;
1013
0
                    log!(
1014
0
                        &worker.platform,
1015
0
                        Debug,
1016
0
                        &config.log_target,
1017
0
                        "announced-to-network",
1018
0
                        transaction = HashDisplay(&blake2_hash(
1019
0
                            worker
1020
0
                                .pending_transactions
1021
0
                                .scale_encoding(maybe_reannounce_tx_id)
1022
0
                                .unwrap()
1023
0
                        )),
1024
0
                        peers = peers_sent.iter().join(", ")
1025
0
                    );
1026
0
1027
0
                    // TODO: is this correct? and what should we do if announcing the same transaction multiple times? is it cumulative? `Broadcast` isn't super well documented
1028
0
                    if !peers_sent.is_empty() {
1029
0
                        worker
1030
0
                            .pending_transactions
1031
0
                            .transaction_user_data_mut(maybe_reannounce_tx_id)
1032
0
                            .unwrap()
1033
0
                            .update_status(TransactionStatus::Broadcast(peers_sent));
1034
0
                    }
1035
                }
1036
1037
0
                WakeUpReason::MaybeValidated(maybe_validated_tx_id) => {
1038
                    // A transaction validation future has finished. This doesn't necessarily mean
1039
                    // that a validation has actually finished. The provided
1040
                    // `maybe_validated_tx_id` is a hint as to which transaction might have
1041
                    // finished being validated, but without a strong guarantee.
1042
1043
                    // Try extract the validation result of this transaction, or `continue` if it
1044
                    // is a false positive.
1045
0
                    let (block_hash, validation_result) = match worker
1046
0
                        .pending_transactions
1047
0
                        .transaction_user_data_mut(maybe_validated_tx_id)
1048
                    {
1049
0
                        None => continue, // Normal. `maybe_validated_tx_id` is just a hint.
1050
0
                        Some(tx) => match tx
1051
0
                            .validation_in_progress
1052
0
                            .as_mut()
1053
0
                            .and_then(|f| f.now_or_never())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sg_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sg_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sg_0B8_
1054
                        {
1055
0
                            None => continue,               // Normal. `maybe_validated_tx_id` is just a hint.
1056
0
                            Some(Err(_)) => unreachable!(), // Validations are never interrupted.
1057
0
                            Some(Ok(result)) => {
1058
0
                                tx.validation_in_progress = None;
1059
0
                                result
1060
0
                            }
1061
0
                        },
1062
0
                    };
1063
0
1064
0
                    let tx_hash = blake2_hash(
1065
0
                        worker
1066
0
                            .pending_transactions
1067
0
                            .scale_encoding(maybe_validated_tx_id)
1068
0
                            .unwrap(),
1069
0
                    );
1070
0
1071
0
                    // The validation is made using the runtime service, while the state
1072
0
                    // of the chain is tracked using the sync service. As such, it is
1073
0
                    // possible for the validation to have been performed against a block
1074
0
                    // that has already been finalized and removed from the pool.
1075
0
                    if !worker.pending_transactions.has_block(&block_hash) {
1076
0
                        log!(
1077
0
                            &worker.platform,
1078
0
                            Debug,
1079
0
                            &config.log_target,
1080
0
                            "transaction-validation-obsolete-block",
1081
0
                            transaction = HashDisplay(&tx_hash),
1082
0
                            block = HashDisplay(&block_hash)
1083
0
                        );
1084
0
                        continue;
1085
0
                    }
1086
1087
0
                    let validation_result = match validation_result {
1088
0
                        Ok(result) => {
1089
0
                            log!(
1090
0
                                &worker.platform,
1091
0
                                Debug,
1092
0
                                &config.log_target,
1093
0
                                "transaction-validation-success",
1094
0
                                transaction = HashDisplay(&tx_hash),
1095
0
                                block = HashDisplay(&block_hash),
1096
0
                                priority = result.priority,
1097
0
                                longevity = result.longevity,
1098
0
                                propagate = ?result.propagate,
1099
0
                            );
1100
0
1101
0
                            log!(
1102
0
                                &worker.platform,
1103
0
                                Info,
1104
0
                                &config.log_target,
1105
0
                                format!(
1106
0
                                    "Successfully validated transaction {}",
1107
0
                                    HashDisplay(&tx_hash)
1108
0
                                )
1109
0
                            );
1110
0
1111
0
                            worker
1112
0
                                .pending_transactions
1113
0
                                .transaction_user_data_mut(maybe_validated_tx_id)
1114
0
                                .unwrap_or_else(|| unreachable!())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0sh_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0sh_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0sh_0B8_
1115
0
                                .update_status(TransactionStatus::Validated);
1116
0
1117
0
                            // Schedule this transaction for announcement.
1118
0
                            worker
1119
0
                                .next_reannounce
1120
0
                                .push(Box::pin(async move { maybe_validated_tx_id }));
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0si_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0si_0B1i_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0si_0B8_
1121
0
1122
0
                            Ok(result)
1123
                        }
1124
                        Err(ValidationError::ObsoleteSubscription) => {
1125
                            // Runtime service subscription is obsolete. Throw away everything and
1126
                            // rebuild it.
1127
0
                            continue 'channels_rebuild;
1128
                        }
1129
0
                        Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(error))) => {
1130
0
                            log!(
1131
0
                                &worker.platform,
1132
0
                                Debug,
1133
0
                                &config.log_target,
1134
0
                                "transaction-validation-invalid-tx",
1135
0
                                transaction = HashDisplay(&tx_hash),
1136
0
                                block = HashDisplay(&block_hash),
1137
0
                                ?error,
1138
0
                            );
1139
0
1140
0
                            log!(
1141
0
                                &worker.platform,
1142
0
                                Warn,
1143
0
                                &config.log_target,
1144
0
                                format!(
1145
0
                                    "Transaction {} invalid against block {}: {}",
1146
0
                                    HashDisplay(&tx_hash),
1147
0
                                    HashDisplay(&block_hash),
1148
0
                                    error,
1149
0
                                )
1150
0
                            );
1151
0
1152
0
                            Err(InvalidOrError::Invalid(error))
1153
                        }
1154
                        Err(ValidationError::InvalidOrError(InvalidOrError::ValidateError(
1155
0
                            error,
1156
0
                        ))) => {
1157
0
                            log!(
1158
0
                                &worker.platform,
1159
0
                                Debug,
1160
0
                                &config.log_target,
1161
0
                                "transaction-validation-error",
1162
0
                                transaction = HashDisplay(&tx_hash),
1163
0
                                block = HashDisplay(&block_hash),
1164
0
                                ?error,
1165
0
                            );
1166
0
1167
0
                            log!(
1168
0
                                &worker.platform,
1169
0
                                Warn,
1170
0
                                &config.log_target,
1171
0
                                format!(
1172
0
                                    "Failed to validate transaction {}: {}",
1173
0
                                    HashDisplay(&tx_hash),
1174
0
                                    error
1175
0
                                )
1176
0
                            );
1177
0
1178
0
                            Err(InvalidOrError::ValidateError(error))
1179
                        }
1180
                    };
1181
1182
                    // No matter whether the validation is successful, we store the result in
1183
                    // the transactions pool. This will later be picked up by the code that removes
1184
                    // invalid transactions from the pool.
1185
                    // TODO: shouldn't mark a transaction as invalid if it failed due to network errors
1186
0
                    worker.pending_transactions.set_validation_result(
1187
0
                        maybe_validated_tx_id,
1188
0
                        &block_hash,
1189
0
                        validation_result,
1190
0
                    );
1191
                }
1192
1193
0
                WakeUpReason::ForegroundMessage(None) => return,
1194
1195
                WakeUpReason::ForegroundMessage(Some(ToBackground::SubmitTransaction {
1196
0
                    transaction_bytes,
1197
0
                    updates_report,
1198
0
                })) => {
1199
0
                    // Handle the situation where the same transaction has already been
1200
0
                    // submitted in the pool before.
1201
0
                    let existing_tx_id = worker
1202
0
                        .pending_transactions
1203
0
                        .find_transaction(&transaction_bytes)
1204
0
                        .next();
1205
0
                    if let Some(existing_tx_id) = existing_tx_id {
1206
0
                        let existing_tx = worker
1207
0
                            .pending_transactions
1208
0
                            .transaction_user_data_mut(existing_tx_id)
1209
0
                            .unwrap();
1210
0
                        if let Some((channel, detached)) = updates_report {
1211
0
                            existing_tx.add_status_update(channel);
1212
0
                            if detached {
1213
0
                                existing_tx.detached = true;
1214
0
                            }
1215
0
                        }
1216
0
                        continue;
1217
0
                    }
1218
0
1219
0
                    // We intentionally limit the number of transactions in the pool,
1220
0
                    // and immediately drop new transactions of this limit is reached.
1221
0
                    if worker.pending_transactions.num_transactions()
1222
0
                        >= worker.max_pending_transactions
1223
                    {
1224
0
                        if let Some((updates_report, _)) = updates_report {
1225
0
                            let _ = updates_report.try_send(TransactionStatus::Dropped(
1226
0
                                DropReason::MaxPendingTransactionsReached,
1227
0
                            ));
1228
0
                        }
1229
0
                        continue;
1230
0
                    }
1231
0
1232
0
                    // Success path. Inserting in pool.
1233
0
                    worker.pending_transactions.add_unvalidated(
1234
0
                        transaction_bytes,
1235
0
                        PendingTransaction {
1236
0
                            when_reannounce: worker.platform.now(),
1237
0
                            detached: match &updates_report {
1238
0
                                Some((_, true)) | None => true,
1239
0
                                Some((_, false)) => false,
1240
                            },
1241
                            status_update: {
1242
0
                                let mut vec = Vec::with_capacity(1);
1243
0
                                if let Some((channel, _)) = updates_report {
1244
0
                                    vec.push(channel);
1245
0
                                }
1246
0
                                vec
1247
0
                            },
1248
0
                            latest_status: None,
1249
0
                            validation_in_progress: None,
1250
                        },
1251
                    );
1252
                }
1253
            }
1254
        }
1255
    }
1256
0
}
Unexecuted instantiation: _RNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service15background_taskpE0B6_
Unexecuted instantiation: _RNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE0B1g_
Unexecuted instantiation: _RNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service15background_taskpE0B6_
1257
1258
/// Background worker running in parallel of the front service.
1259
struct Worker<TPlat: PlatformRef> {
1260
    /// Access to the platform's capabilities.
1261
    platform: TPlat,
1262
1263
    // How to download the bodies of blocks and synchronize the chain.
1264
    sync_service: Arc<sync_service::SyncService<TPlat>>,
1265
1266
    /// How to validate transactions.
1267
    runtime_service: Arc<runtime_service::RuntimeService<TPlat>>,
1268
1269
    /// How to gossip transactions.
1270
    network_service: Arc<network_service::NetworkServiceChain<TPlat>>,
1271
1272
    /// List of pending transactions.
1273
    ///
1274
    /// Contains all transactions that were submitted with
1275
    /// [`TransactionsService::submit_transaction`] and their channel to send back their status.
1276
    ///
1277
    /// All the entries in this map represent transactions that we're trying to include on the
1278
    /// network. It is normal to find entries where the status report channels are closed, as they
1279
    /// still represent transactions that we're trying to include but whose status isn't
1280
    /// interesting the frontend.
1281
    ///
1282
    /// All the blocks within this data structure are also pinned within the runtime service. They
1283
    /// must be unpinned when they leave the data structure.
1284
    pending_transactions: light_pool::LightPool<PendingTransaction<TPlat>, Block, InvalidOrError>,
1285
1286
    /// See [`Config::max_pending_transactions`].
1287
    max_pending_transactions: usize,
1288
1289
    /// List of ongoing block body downloads.
1290
    /// The output of the future is a block hash and a block body.
1291
    block_downloads:
1292
        FuturesUnordered<future::BoxFuture<'static, ([u8; 32], Result<Vec<Vec<u8>>, ()>)>>,
1293
1294
    /// List of transactions currently being validated.
1295
    /// Returns the [`light_pool::TransactionId`] of the transaction that has finished being
1296
    /// validated. The result can then be read from [`PendingTransaction::validation_in_progress`].
1297
    /// Since transaction IDs can be reused, the returned ID might not correspond to a transaction
1298
    /// or might correspond to the wrong transaction. This ID being returned is just a hint as to
1299
    /// which transaction to check, and not an authoritative value.
1300
    validations_in_progress:
1301
        FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
1302
1303
    /// List of transactions that need to be reannounced.
1304
    /// Returns the [`light_pool::TransactionId]` of the transaction that needs to be re-announced.
1305
    /// Since transaction IDs can be reused, the returned ID might not correspond to a transaction
1306
    /// or might correspond to the wrong transaction. This ID being returned is just a hint as to
1307
    /// which transaction to check, not an authoritative value, and
1308
    /// [`PendingTransaction::when_reannounce`] should be checked.
1309
    next_reannounce: FuturesUnordered<future::BoxFuture<'static, light_pool::TransactionId>>,
1310
1311
    /// See [`Config::max_concurrent_downloads`]. Maximum number of elements in
1312
    /// [`Worker::block_downloads`].
1313
    max_concurrent_downloads: usize,
1314
}
1315
1316
impl<TPlat: PlatformRef> Worker<TPlat> {
1317
    /// Update the best block. Must have been previously inserted with
1318
    /// [`light_pool::LightPool::add_block`].
1319
0
    fn set_best_block(&mut self, log_target: &str, new_best_block_hash: &[u8; 32]) {
1320
0
        let updates = self
1321
0
            .pending_transactions
1322
0
            .set_best_block(new_best_block_hash);
1323
0
1324
0
        // There might be entries in common between `retracted_transactions` and
1325
0
        // `included_transactions`, in the case of a re-org where a transaction is part of both
1326
0
        // the old and new best chain.
1327
0
        // In that situation we need to first signal `Retracted`, then only `InBlock`.
1328
0
        // Consequently, process `retracted_transactions` first.
1329
0
1330
0
        log!(
1331
0
            &self.platform,
1332
0
            Debug,
1333
0
            &log_target,
1334
0
            "best-chain-update",
1335
0
            new_best_block = HashDisplay(new_best_block_hash),
1336
0
            included_transactions = updates
1337
0
                .included_transactions
1338
0
                .iter()
1339
0
                .map(|(id, _, _)| HashDisplay(&blake2_hash(
1340
0
                    self.pending_transactions.scale_encoding(*id).unwrap()
1341
0
                ))
1342
0
                .to_string())
Unexecuted instantiation: _RNCNvMs0_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB7_6WorkerpE14set_best_block0B9_
Unexecuted instantiation: _RNCNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB7_6WorkerNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE14set_best_block0B1f_
Unexecuted instantiation: _RNCNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB7_6WorkerpE14set_best_block0B9_
1343
0
                .join(", "),
1344
0
            retracted_transactions = updates
1345
0
                .retracted_transactions
1346
0
                .iter()
1347
0
                .map(|(id, _, _)| HashDisplay(&blake2_hash(
1348
0
                    self.pending_transactions.scale_encoding(*id).unwrap()
1349
0
                ))
1350
0
                .to_string())
Unexecuted instantiation: _RNCNvMs0_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB7_6WorkerpE14set_best_blocks_0B9_
Unexecuted instantiation: _RNCNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB7_6WorkerNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE14set_best_blocks_0B1f_
Unexecuted instantiation: _RNCNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB7_6WorkerpE14set_best_blocks_0B9_
1351
0
                .join(", ")
1352
0
        );
1353
1354
0
        for (tx_id, _, _) in updates.retracted_transactions {
1355
0
            let tx = self
1356
0
                .pending_transactions
1357
0
                .transaction_user_data_mut(tx_id)
1358
0
                .unwrap();
1359
0
            tx.update_status(TransactionStatus::IncludedBlockUpdate { block_hash: None });
1360
0
        }
1361
1362
0
        for (tx_id, block_hash, block_body_index) in updates.included_transactions {
1363
0
            let tx = self
1364
0
                .pending_transactions
1365
0
                .transaction_user_data_mut(tx_id)
1366
0
                .unwrap();
1367
0
            // We assume that there's no more than 2<<32 transactions per block.
1368
0
            let block_body_index = u32::try_from(block_body_index).unwrap();
1369
0
            tx.update_status(TransactionStatus::IncludedBlockUpdate {
1370
0
                block_hash: Some((block_hash, block_body_index)),
1371
0
            });
1372
0
        }
1373
0
    }
Unexecuted instantiation: _RNvMs0_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB5_6WorkerpE14set_best_blockB7_
Unexecuted instantiation: _RNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_6WorkerNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE14set_best_blockB1d_
Unexecuted instantiation: _RNvMs0_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_6WorkerpE14set_best_blockB7_
1374
}
1375
1376
struct Block {
1377
    /// Header of the block, in SCALE encoding. Necessary in order to be able to validate blocks.
1378
    scale_encoded_header: Vec<u8>,
1379
1380
    /// Number of previous downloads that have failed.
1381
    failed_downloads: u8,
1382
1383
    /// `True` if the body of this block is currently being downloaded.
1384
    downloading: bool,
1385
}
1386
1387
struct PendingTransaction<TPlat: PlatformRef> {
1388
    /// Earliest moment when to gossip the transaction on the network again.
1389
    ///
1390
    /// This should be interpreted as the moment before which to not reannounce, rather than the
1391
    /// moment when to announce.
1392
    ///
1393
    /// In particular, this value might be long in the past, in case for example of a transaction
1394
    /// that is not validated.
1395
    when_reannounce: TPlat::Instant,
1396
1397
    /// List of channels that should receive changes to the transaction status.
1398
    status_update: Vec<async_channel::Sender<TransactionStatus>>,
1399
1400
    /// If `false`, then dropping all the [`PendingTransaction::status_update`] channels will
1401
    /// remove the transaction from the pool.
1402
    detached: bool,
1403
1404
    /// Latest known status of the transaction. Used when a new sender is added to
1405
    /// [`PendingTransaction::status_update`].
1406
    latest_status: Option<TransactionStatus>,
1407
1408
    /// If `Some`, will receive the result of the validation of the transaction.
1409
    validation_in_progress: Option<
1410
        oneshot::Receiver<(
1411
            [u8; 32],
1412
            Result<validate::ValidTransaction, ValidationError>,
1413
        )>,
1414
    >,
1415
}
1416
1417
impl<TPlat: PlatformRef> PendingTransaction<TPlat> {
1418
0
    fn add_status_update(&mut self, channel: async_channel::Sender<TransactionStatus>) {
1419
0
        if let Some(latest_status) = &self.latest_status {
1420
0
            if channel.try_send(latest_status.clone()).is_err() {
1421
0
                return;
1422
0
            }
1423
0
        }
1424
1425
0
        self.status_update.push(channel);
1426
0
    }
Unexecuted instantiation: _RNvMs1_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB5_18PendingTransactionpE17add_status_updateB7_
Unexecuted instantiation: _RNvMs1_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_18PendingTransactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE17add_status_updateB1q_
Unexecuted instantiation: _RNvMs1_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_18PendingTransactionpE17add_status_updateB7_
1427
1428
0
    fn update_status(&mut self, status: TransactionStatus) {
1429
0
        for n in 0..self.status_update.len() {
1430
0
            let channel = self.status_update.swap_remove(n);
1431
0
            if channel.try_send(status.clone()).is_ok() {
1432
0
                self.status_update.push(channel);
1433
0
            }
1434
        }
1435
1436
0
        self.latest_status = Some(status);
1437
0
    }
Unexecuted instantiation: _RNvMs1_NtCsiGub1lfKphe_13smoldot_light20transactions_serviceINtB5_18PendingTransactionpE13update_statusB7_
Unexecuted instantiation: _RNvMs1_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_18PendingTransactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefE13update_statusB1q_
Unexecuted instantiation: _RNvMs1_NtCsih6EgvAwZF2_13smoldot_light20transactions_serviceINtB5_18PendingTransactionpE13update_statusB7_
1438
}
1439
1440
/// Actual transaction validation logic. Validates the transaction against the given block of the
1441
/// [`runtime_service::RuntimeService`].
1442
///
1443
/// Returns the result of the validation, and the hash of the block it was validated against.
1444
0
async fn validate_transaction<TPlat: PlatformRef>(
1445
0
    platform: &TPlat,
1446
0
    log_target: &str,
1447
0
    relay_chain_sync: &Arc<runtime_service::RuntimeService<TPlat>>,
1448
0
    relay_chain_sync_subscription_id: runtime_service::SubscriptionId,
1449
0
    block_hash: [u8; 32],
1450
0
    block_scale_encoded_header: &[u8],
1451
0
    scale_encoded_transaction: impl AsRef<[u8]> + Clone,
1452
0
    source: validate::TransactionSource,
1453
0
) -> Result<validate::ValidTransaction, ValidationError> {
Unexecuted instantiation: _RINvNtCsiGub1lfKphe_13smoldot_light20transactions_service20validate_transactionppEB4_
Unexecuted instantiation: _RINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefINtNtCsdZExvAaxgia_5alloc3vec3VechEEB1j_
Unexecuted instantiation: _RINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionppEB4_
1454
0
    // TODO: move somewhere else?
1455
0
    log!(
1456
0
        platform,
1457
0
        Debug,
1458
0
        &log_target,
1459
0
        "transaction-validation-started",
1460
0
        transaction = HashDisplay(&blake2_hash(scale_encoded_transaction.as_ref())),
1461
0
        block = HashDisplay(&block_hash),
1462
0
        block_height = header::decode(
1463
0
            block_scale_encoded_header,
1464
0
            relay_chain_sync.block_number_bytes()
1465
0
        )
1466
0
        .ok()
1467
0
        .map(|h| format!("#{}", h.number))
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service20validate_transactionppE0s_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefINtNtCsdZExvAaxgia_5alloc3vec3VechEE0s_0B1n_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionppE0s_0B8_
1468
0
        .unwrap_or_else(|| "unknown".to_owned())
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service20validate_transactionppE0s0_0B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefINtNtCsdZExvAaxgia_5alloc3vec3VechEE0s0_0B1n_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionppE0s0_0B8_
1469
0
    );
1470
1471
0
    let (pinned_runtime, block_state_root_hash, block_number) = match relay_chain_sync
1472
0
        .pin_pinned_block_runtime(relay_chain_sync_subscription_id, block_hash)
1473
0
        .await
1474
    {
1475
0
        Ok(r) => r,
1476
        Err(runtime_service::PinPinnedBlockRuntimeError::ObsoleteSubscription) => {
1477
0
            return Err(ValidationError::ObsoleteSubscription)
1478
        }
1479
0
        Err(runtime_service::PinPinnedBlockRuntimeError::BlockNotPinned) => unreachable!(),
1480
    };
1481
1482
0
    let runtime_call_future = relay_chain_sync.runtime_call(
1483
0
        pinned_runtime,
1484
0
        block_hash,
1485
0
        block_number,
1486
0
        block_state_root_hash,
1487
0
        validate::VALIDATION_FUNCTION_NAME.to_owned(),
1488
0
        Some(("TaggedTransactionQueue".to_owned(), 3..=3)),
1489
0
        validate::validate_transaction_runtime_parameters_v3(
1490
0
            iter::once(scale_encoded_transaction.as_ref()),
1491
0
            source,
1492
0
            &block_hash,
1493
0
        )
1494
0
        .fold(Vec::new(), |mut a, b| {
1495
0
            a.extend_from_slice(b.as_ref());
1496
0
            a
1497
0
        }),
Unexecuted instantiation: _RNCNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service20validate_transactionppE00B8_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefINtNtCsdZExvAaxgia_5alloc3vec3VechEE00B1n_
Unexecuted instantiation: _RNCNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionppE00B8_
1498
0
        3,
1499
0
        Duration::from_secs(8),
1500
0
        NonZeroU32::new(1).unwrap(),
1501
0
    );
1502
1503
0
    let success = match runtime_call_future.await {
1504
0
        Ok(output) => output,
1505
0
        Err(runtime_service::RuntimeCallError::Execution(error)) => {
1506
0
            return Err(ValidationError::InvalidOrError(
1507
0
                InvalidOrError::ValidateError(ValidateTransactionError::Execution(error)),
1508
0
            ))
1509
        }
1510
        Err(runtime_service::RuntimeCallError::Crash) => {
1511
0
            return Err(ValidationError::InvalidOrError(
1512
0
                InvalidOrError::ValidateError(ValidateTransactionError::Crash),
1513
0
            ))
1514
        }
1515
0
        Err(runtime_service::RuntimeCallError::Inaccessible(errors)) => {
1516
0
            return Err(ValidationError::InvalidOrError(
1517
0
                InvalidOrError::ValidateError(ValidateTransactionError::Inaccessible(errors)),
1518
0
            ))
1519
        }
1520
0
        Err(runtime_service::RuntimeCallError::InvalidRuntime(error)) => {
1521
0
            return Err(ValidationError::InvalidOrError(
1522
0
                InvalidOrError::ValidateError(ValidateTransactionError::InvalidRuntime(error)),
1523
0
            ))
1524
        }
1525
        Err(runtime_service::RuntimeCallError::ApiVersionRequirementUnfulfilled) => {
1526
0
            return Err(ValidationError::InvalidOrError(
1527
0
                InvalidOrError::ValidateError(
1528
0
                    ValidateTransactionError::ApiVersionRequirementUnfulfilled,
1529
0
                ),
1530
0
            ))
1531
        }
1532
    };
1533
1534
0
    match validate::decode_validate_transaction_return_value(&success.output) {
1535
0
        Ok(Ok(decoded)) => Ok(decoded),
1536
0
        Ok(Err(err)) => Err(ValidationError::InvalidOrError(InvalidOrError::Invalid(
1537
0
            err,
1538
0
        ))),
1539
0
        Err(err) => Err(ValidationError::InvalidOrError(
1540
0
            InvalidOrError::ValidateError(ValidateTransactionError::OutputDecodeError(err)),
1541
0
        )),
1542
    }
1543
0
}
Unexecuted instantiation: _RNCINvNtCsiGub1lfKphe_13smoldot_light20transactions_service20validate_transactionppE0B6_
Unexecuted instantiation: _RNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionNtNtCsDDUKWWCHAU_18smoldot_light_wasm8platform11PlatformRefINtNtCsdZExvAaxgia_5alloc3vec3VechEE0B1l_
Unexecuted instantiation: _RNCINvNtCsih6EgvAwZF2_13smoldot_light20transactions_service20validate_transactionppE0B6_
1544
1545
/// Utility. Calculates the BLAKE2 hash of the given bytes.
1546
0
fn blake2_hash(bytes: &[u8]) -> [u8; 32] {
1547
0
    <[u8; 32]>::try_from(blake2_rfc::blake2b::blake2b(32, &[], bytes).as_bytes()).unwrap()
1548
0
}
Unexecuted instantiation: _RNvNtCsiGub1lfKphe_13smoldot_light20transactions_service11blake2_hash
Unexecuted instantiation: _RNvNtCsih6EgvAwZF2_13smoldot_light20transactions_service11blake2_hash