smoldot_full_node/
lib.rs

1// Smoldot
2// Copyright (C) 2019-2022  Parity Technologies (UK) Ltd.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// This program is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// This program is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18#![deny(rustdoc::broken_intra_doc_links)]
19// TODO: #![deny(unused_crate_dependencies)] doesn't work because some deps are used only by the binary, figure if this can be fixed?
20
21use futures_util::{StreamExt as _, future};
22use rand::RngCore as _;
23use smol::lock::Mutex;
24use smoldot::{
25    chain, chain_spec,
26    database::full_sqlite,
27    executor, header,
28    identity::keystore,
29    informant::HashDisplay,
30    libp2p::{
31        connection, multiaddr,
32        peer_id::{self, PeerId},
33    },
34    trie,
35};
36use std::{array, borrow::Cow, io, iter, mem, net::SocketAddr, path::PathBuf, sync::Arc};
37
38mod consensus_service;
39mod database_thread;
40mod jaeger_service;
41mod json_rpc_service;
42mod network_service;
43mod util;
44
45pub struct Config<'a> {
46    /// Chain to connect to.
47    pub chain: ChainConfig<'a>,
48    /// If [`Config::chain`] contains a parachain, this field contains the configuration of the
49    /// relay chain.
50    pub relay_chain: Option<ChainConfig<'a>>,
51    /// Ed25519 private key of network identity.
52    pub libp2p_key: Box<[u8; 32]>,
53    /// List of addresses to listen on.
54    pub listen_addresses: Vec<multiaddr::Multiaddr>,
55    /// Function that can be used to spawn background tasks.
56    ///
57    /// The tasks passed as parameter must be executed until they shut down.
58    pub tasks_executor: Arc<dyn Fn(future::BoxFuture<'static, ()>) + Send + Sync>,
59    /// Function called whenever a part of the node wants to notify of something.
60    pub log_callback: Arc<dyn LogCallback + Send + Sync>,
61    /// Address of a Jaeger agent to send traces to. If `None`, do not send Jaeger traces.
62    pub jaeger_agent: Option<SocketAddr>,
63}
64
65/// See [`ChainConfig::json_rpc_listen`].
66#[derive(Debug, Clone)]
67pub struct JsonRpcListenConfig {
68    /// Bind point of the JSON-RPC server.
69    pub address: SocketAddr,
70    /// Maximum number of JSON-RPC clients that can be connected at the same time.
71    pub max_json_rpc_clients: u32,
72}
73
74/// Allow generating logs.
75///
76/// Implemented on closures.
77///
78/// > **Note**: The `log` crate isn't used because dependencies complete pollute the logs.
79pub trait LogCallback {
80    /// Add a log entry.
81    fn log(&self, log_level: LogLevel, message: String);
82}
83
84impl<T: ?Sized + Fn(LogLevel, String)> LogCallback for T {
85    fn log(&self, log_level: LogLevel, message: String) {
86        (*self)(log_level, message)
87    }
88}
89
90/// Log level of a log entry.
91#[derive(Debug)]
92pub enum LogLevel {
93    Error = 1,
94    Warn = 2,
95    Info = 3,
96    Debug = 4,
97    Trace = 5,
98}
99
100#[derive(Debug)]
101pub struct ChainConfig<'a> {
102    /// Specification of the chain.
103    pub chain_spec: Cow<'a, [u8]>,
104    /// Identity and address of nodes to try to connect to on startup.
105    pub additional_bootnodes: Vec<(peer_id::PeerId, multiaddr::Multiaddr)>,
106    /// List of secret phrases to insert in the keystore of the node. Used to author blocks.
107    // TODO: also automatically add the same keys through ed25519?
108    pub keystore_memory: Vec<Box<[u8; 64]>>,
109    /// Path to the SQLite database. If `None`, the database is opened in memory.
110    pub sqlite_database_path: Option<PathBuf>,
111    /// Maximum size, in bytes, of the cache SQLite uses.
112    pub sqlite_cache_size: usize,
113    /// Path to the directory where cryptographic keys are stored on disk.
114    ///
115    /// If `None`, no keys are stored in disk.
116    pub keystore_path: Option<PathBuf>,
117    /// Configuration of the JSON-RPC server. If `None`, no TCP server is started.
118    pub json_rpc_listen: Option<JsonRpcListenConfig>,
119}
120
121/// Running client. As long as this object is alive, the client reads/writes the database and has
122/// a JSON-RPC server open.
123pub struct Client {
124    json_rpc_service: json_rpc_service::JsonRpcService,
125    relay_chain_json_rpc_service: Option<json_rpc_service::JsonRpcService>,
126    consensus_service: Arc<consensus_service::ConsensusService>,
127    relay_chain_consensus_service: Option<Arc<consensus_service::ConsensusService>>,
128    network_service: Arc<network_service::NetworkService>,
129    network_known_best: Arc<Mutex<Option<u64>>>,
130}
131
132impl Client {
133    /// Returns the address the JSON-RPC server is listening on.
134    ///
135    /// Returns `None` if and only if [`ChainConfig::json_rpc_listen`] was `None`
136    /// in [`Config::chain`].
137    pub fn json_rpc_server_addr(&self) -> Option<SocketAddr> {
138        self.json_rpc_service.listen_addr()
139    }
140
141    /// Returns the address the relay chain JSON-RPC server is listening on.
142    ///
143    /// Returns `None` if and only if [`Config::relay_chain`] was `None` or if
144    /// [`ChainConfig::json_rpc_listen`] was `None` in [`Config::relay_chain`].
145    pub fn relay_chain_json_rpc_server_addr(&self) -> Option<SocketAddr> {
146        self.relay_chain_json_rpc_service
147            .as_ref()
148            .and_then(|j| j.listen_addr())
149    }
150
151    /// Returns the best block according to the networking.
152    pub async fn network_known_best(&self) -> Option<u64> {
153        *self.network_known_best.lock().await
154    }
155
156    /// Returns the current total number of peers of the client.
157    // TODO: weird API
158    pub async fn num_peers(&self) -> u64 {
159        u64::try_from(self.network_service.num_total_peers().await).unwrap_or(u64::MAX)
160    }
161
162    /// Returns the current total number of network connections of the client.
163    // TODO: weird API
164    pub async fn num_network_connections(&self) -> u64 {
165        u64::try_from(self.network_service.num_connections().await).unwrap_or(u64::MAX)
166    }
167
168    // TODO: not the best API
169    pub async fn sync_state(&self) -> consensus_service::SyncState {
170        self.consensus_service.sync_state().await
171    }
172
173    // TODO: not the best API
174    pub async fn relay_chain_sync_state(&self) -> Option<consensus_service::SyncState> {
175        if let Some(s) = &self.relay_chain_consensus_service {
176            Some(s.sync_state().await)
177        } else {
178            None
179        }
180    }
181
182    /// Adds a JSON-RPC request to the queue of requests of the virtual endpoint of the chain.
183    ///
184    /// The virtual endpoint doesn't have any limit.
185    pub fn send_json_rpc_request(&self, request: String) {
186        self.json_rpc_service.send_request(request)
187    }
188
189    /// Returns the new JSON-RPC response or notification for requests sent using
190    /// [`Client::send_json_rpc_request`].
191    ///
192    /// If this function is called multiple times simultaneously, only one invocation will receive
193    /// each response. Which one is unspecified.
194    pub async fn next_json_rpc_response(&self) -> String {
195        self.json_rpc_service.next_response().await
196    }
197
198    /// Adds a JSON-RPC request to the queue of requests of the virtual endpoint of the
199    /// relay chain.
200    ///
201    /// The virtual endpoint doesn't have any limit.
202    pub fn relay_chain_send_json_rpc_request(
203        &self,
204        request: String,
205    ) -> Result<(), RelayChainSendJsonRpcRequestError> {
206        let Some(relay_chain_json_rpc_service) = &self.relay_chain_json_rpc_service else {
207            return Err(RelayChainSendJsonRpcRequestError::NoRelayChain);
208        };
209
210        relay_chain_json_rpc_service.send_request(request);
211        Ok(())
212    }
213
214    /// Returns the new JSON-RPC response or notification for requests sent using
215    /// [`Client::relay_chain_send_json_rpc_request`].
216    ///
217    /// If this function is called multiple times simultaneously, only one invocation will receive
218    /// each response. Which one is unspecified.
219    ///
220    /// If [`Config::relay_chain`] was `None`, this function waits indefinitely.
221    pub async fn relay_chain_next_json_rpc_response(&self) -> String {
222        if let Some(relay_chain_json_rpc_service) = &self.relay_chain_json_rpc_service {
223            relay_chain_json_rpc_service.next_response().await
224        } else {
225            future::pending().await
226        }
227    }
228}
229
230/// Error potentially returned by [`start`].
231#[derive(Debug, derive_more::Display, derive_more::Error)]
232pub enum StartError {
233    /// Failed to parse the chain specification.
234    ChainSpecParse(chain_spec::ParseError),
235    /// Error building the chain information of the genesis block.
236    InvalidGenesisInformation(chain_spec::FromGenesisStorageError),
237    /// Failed to parse the chain specification of the relay chain.
238    RelayChainSpecParse(chain_spec::ParseError),
239    /// Error building the chain information of the genesis block of the relay chain.
240    InvalidRelayGenesisInformation(chain_spec::FromGenesisStorageError),
241    /// Error initializing the networking service.
242    NetworkInit(network_service::InitError),
243    /// Error initializing the JSON-RPC service.
244    JsonRpcServiceInit(json_rpc_service::InitError),
245    /// Error initializing the JSON-RPC service of the relay chain.
246    RelayChainJsonRpcServiceInit(json_rpc_service::InitError),
247    ConsensusServiceInit(consensus_service::InitError),
248    RelayChainConsensusServiceInit(consensus_service::InitError),
249    /// Error initializing the keystore of the chain.
250    KeystoreInit(io::Error),
251    /// Error initializing the keystore of the relay chain.
252    RelayChainKeystoreInit(io::Error),
253    /// Error initializing the Jaeger service.
254    JaegerInit(io::Error),
255}
256
257/// Error potentially returned by [`Client::relay_chain_send_json_rpc_request`].
258#[derive(Debug, derive_more::Display, derive_more::Error)]
259pub enum RelayChainSendJsonRpcRequestError {
260    /// There is no relay chain to send the JSON-RPC request to.
261    NoRelayChain,
262}
263
264/// Runs the node using the given configuration.
265// TODO: this function has several code paths that panic instead of returning an error; it is especially unclear what to do in case of database corruption, given that a database corruption would crash the node later on anyway
266pub async fn start(mut config: Config<'_>) -> Result<Client, StartError> {
267    let chain_spec = {
268        chain_spec::ChainSpec::from_json_bytes(&config.chain.chain_spec)
269            .map_err(StartError::ChainSpecParse)?
270    };
271
272    // TODO: don't just throw away the runtime
273    // TODO: building the genesis chain information is pretty expensive and we throw away most of the information
274    let genesis_chain_information = chain_spec
275        .to_chain_information()
276        .map_err(StartError::InvalidGenesisInformation)?
277        .0;
278
279    let relay_chain_spec = match &config.relay_chain {
280        Some(cfg) => Some(
281            chain_spec::ChainSpec::from_json_bytes(&cfg.chain_spec)
282                .map_err(StartError::RelayChainSpecParse)?,
283        ),
284        None => None,
285    };
286
287    // TODO: don't just throw away the runtime
288    // TODO: building the genesis chain information is pretty expensive and we throw away most of the information
289    let relay_genesis_chain_information = match &relay_chain_spec {
290        Some(r) => Some(
291            r.to_chain_information()
292                .map_err(StartError::InvalidRelayGenesisInformation)?
293                .0,
294        ),
295        None => None,
296    };
297
298    // The `protocolId` field of chain specifications is deprecated. Print a warning.
299    if chain_spec.protocol_id().is_some() {
300        config.log_callback.log(
301            LogLevel::Warn,
302            format!("chain-spec-has-protocol-id; chain={}", chain_spec.id()),
303        );
304    }
305    if let Some(relay_chain_spec) = &relay_chain_spec {
306        if relay_chain_spec.protocol_id().is_some() {
307            config.log_callback.log(
308                LogLevel::Warn,
309                format!(
310                    "chain-spec-has-protocol-id; chain={}",
311                    relay_chain_spec.id()
312                ),
313            );
314        }
315    }
316
317    // The `telemetryEndpoints` field of chain specifications isn't supported.
318    if chain_spec.telemetry_endpoints().count() != 0 {
319        config.log_callback.log(
320            LogLevel::Warn,
321            format!(
322                "chain-spec-has-telemetry-endpoints; chain={}",
323                chain_spec.id()
324            ),
325        );
326    }
327    if let Some(relay_chain_spec) = &relay_chain_spec {
328        if relay_chain_spec.telemetry_endpoints().count() != 0 {
329            config.log_callback.log(
330                LogLevel::Warn,
331                format!(
332                    "chain-spec-has-telemetry-endpoints; chain={}",
333                    relay_chain_spec.id()
334                ),
335            );
336        }
337    }
338
339    // Printing the SQLite version number can be useful for debugging purposes for example in case
340    // a query fails.
341    config.log_callback.log(
342        LogLevel::Debug,
343        format!("sqlite-version; version={}", full_sqlite::sqlite_version()),
344    );
345
346    let (database, database_existed) = {
347        let (db, existed) = open_database(
348            &chain_spec,
349            genesis_chain_information.as_ref(),
350            config.chain.sqlite_database_path,
351            config.chain.sqlite_cache_size,
352        )
353        .await;
354
355        (Arc::new(database_thread::DatabaseThread::from(db)), existed)
356    };
357
358    let relay_chain_database = if let Some(relay_chain) = &config.relay_chain {
359        Some(Arc::new(database_thread::DatabaseThread::from(
360            open_database(
361                relay_chain_spec.as_ref().unwrap(),
362                relay_genesis_chain_information.as_ref().unwrap().as_ref(),
363                relay_chain.sqlite_database_path.clone(),
364                relay_chain.sqlite_cache_size,
365            )
366            .await
367            .0,
368        )))
369    } else {
370        None
371    };
372
373    let database_finalized_block_hash = database
374        .with_database(|db| db.finalized_block_hash().unwrap())
375        .await;
376    let database_finalized_block_number = header::decode(
377        &database
378            .with_database(move |db| {
379                db.block_scale_encoded_header(&database_finalized_block_hash)
380                    .unwrap()
381                    .unwrap()
382            })
383            .await,
384        chain_spec.block_number_bytes().into(),
385    )
386    .unwrap()
387    .number;
388
389    let noise_key = {
390        let mut noise_static_key = zeroize::Zeroizing::new([0u8; 32]);
391        rand::thread_rng().fill_bytes(&mut *noise_static_key);
392        connection::NoiseKey::new(&config.libp2p_key, &noise_static_key)
393    };
394    zeroize::Zeroize::zeroize(&mut *config.libp2p_key);
395    let local_peer_id =
396        peer_id::PublicKey::Ed25519(*noise_key.libp2p_public_ed25519_key()).into_peer_id();
397
398    let genesis_block_hash = genesis_chain_information
399        .as_ref()
400        .finalized_block_header
401        .hash(chain_spec.block_number_bytes().into());
402
403    let jaeger_service = jaeger_service::JaegerService::new(jaeger_service::Config {
404        tasks_executor: &mut |task| (config.tasks_executor)(task),
405        service_name: local_peer_id.to_string(),
406        jaeger_agent: config.jaeger_agent,
407    })
408    .await
409    .map_err(StartError::JaegerInit)?;
410
411    let (network_service, network_service_chain_ids, network_events_receivers) =
412        network_service::NetworkService::new(network_service::Config {
413            listen_addresses: config.listen_addresses,
414            num_events_receivers: 2 + if relay_chain_database.is_some() { 1 } else { 0 },
415            chains: iter::once(network_service::ChainConfig {
416                log_name: chain_spec.id().to_owned(),
417                fork_id: chain_spec.fork_id().map(|n| n.to_owned()),
418                block_number_bytes: usize::from(chain_spec.block_number_bytes()),
419                database: database.clone(),
420                grandpa_protocol_finalized_block_height: if matches!(
421                    genesis_chain_information.as_ref().finality,
422                    chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
423                ) {
424                    Some({
425                        let block_number_bytes = chain_spec.block_number_bytes();
426                        database
427                            .with_database(move |database| {
428                                let hash = database.finalized_block_hash().unwrap();
429                                let header = database.block_scale_encoded_header(&hash).unwrap().unwrap();
430                                header::decode(&header, block_number_bytes.into(),).unwrap().number
431                            })
432                            .await
433                    })
434                } else {
435                    None
436                },
437                genesis_block_hash,
438                best_block: {
439                    let block_number_bytes = chain_spec.block_number_bytes();
440                    database
441                        .with_database(move |database| {
442                            let hash = database.finalized_block_hash().unwrap();
443                            let header = database.block_scale_encoded_header(&hash).unwrap().unwrap();
444                            let number = header::decode(&header, block_number_bytes.into(),).unwrap().number;
445                            (number, hash)
446                        })
447                        .await
448                },
449                max_in_peers: 25,
450                max_slots: 15,
451                bootstrap_nodes: {
452                    let mut list = Vec::with_capacity(
453                        chain_spec.boot_nodes().len() + config.chain.additional_bootnodes.len(),
454                    );
455
456                    for node in chain_spec.boot_nodes() {
457                        match node {
458                            chain_spec::Bootnode::UnrecognizedFormat(raw) => {
459                                config.log_callback.log(
460                                    LogLevel::Warn,
461                                    format!("bootnode-unrecognized-addr; value={:?}", raw),
462                                );
463                            }
464                            chain_spec::Bootnode::Parsed { multiaddr, peer_id } => {
465                                let multiaddr: multiaddr::Multiaddr = match multiaddr.parse() {
466                                    Ok(a) => a,
467                                    Err(_) => {
468                                        config.log_callback.log(
469                                            LogLevel::Warn,
470                                            format!("bootnode-unrecognized-addr; value={:?}", multiaddr),
471                                        );
472                                        continue;
473                                    },
474                                };
475                                let peer_id = PeerId::from_bytes(peer_id.to_vec()).unwrap();
476                                list.push((peer_id, multiaddr));
477                            }
478                        }
479                    }
480
481                    list.extend(config.chain.additional_bootnodes);
482                    list
483                },
484            })
485            .chain(
486                if let Some(relay_chains_specs) = &relay_chain_spec {
487                    Some(network_service::ChainConfig {
488                        log_name: relay_chains_specs.id().to_owned(),
489                        fork_id: relay_chains_specs.fork_id().map(|n| n.to_owned()),
490                        block_number_bytes: usize::from(relay_chains_specs.block_number_bytes()),
491                        database: relay_chain_database.clone().unwrap(),
492                        grandpa_protocol_finalized_block_height: if matches!(
493                            genesis_chain_information.as_ref().finality,
494                            chain::chain_information::ChainInformationFinalityRef::Grandpa { .. }
495                        ) {
496                            Some(relay_chain_database
497                                .as_ref()
498                                .unwrap()
499                                .with_database({
500                                    let block_number_bytes = chain_spec.block_number_bytes();
501                                    move |db| {
502                                        let hash = db.finalized_block_hash().unwrap();
503                                        let header = db.block_scale_encoded_header(&hash).unwrap().unwrap();
504                                        header::decode(&header, block_number_bytes.into()).unwrap().number
505                                    }
506                                })
507                                .await)
508                        } else {
509                            None
510                        },
511                        genesis_block_hash: relay_genesis_chain_information
512                            .as_ref()
513                            .unwrap()
514                            .as_ref().finalized_block_header
515                            .hash(chain_spec.block_number_bytes().into(),),
516                        best_block: relay_chain_database
517                            .as_ref()
518                            .unwrap()
519                            .with_database({
520                                let block_number_bytes = chain_spec.block_number_bytes();
521                                move |db| {
522                                    let hash = db.finalized_block_hash().unwrap();
523                                    let header = db.block_scale_encoded_header(&hash).unwrap().unwrap();
524                                    let number = header::decode(&header, block_number_bytes.into()).unwrap().number;
525                                    (number, hash)
526                                }
527                            })
528                            .await,
529                        max_in_peers: 25,
530                        max_slots: 15,
531                        bootstrap_nodes: {
532                            let mut list =
533                                Vec::with_capacity(relay_chains_specs.boot_nodes().len());
534                            for node in relay_chains_specs.boot_nodes() {
535                                match node {
536                                    chain_spec::Bootnode::UnrecognizedFormat(raw) => {
537                                        config.log_callback.log(
538                                            LogLevel::Warn,
539                                            format!("relay-chain-bootnode-unrecognized-addr; value={:?}", raw),
540                                        );
541                                    }
542                                    chain_spec::Bootnode::Parsed { multiaddr, peer_id } => {
543                                        let multiaddr: multiaddr::Multiaddr = match multiaddr.parse() {
544                                            Ok(a) => a,
545                                            Err(_) => {
546                                                config.log_callback.log(
547                                                    LogLevel::Warn,
548                                                    format!("relay-chain-bootnode-unrecognized-addr; value={:?}", multiaddr),
549                                                );
550                                                continue;
551                                            }
552                                        };
553                                        let peer_id = PeerId::from_bytes(peer_id.to_vec()).unwrap();
554                                        list.push((peer_id, multiaddr));
555                                    }
556                                }
557                            }
558                            list
559                        },
560                    })
561                } else {
562                    None
563                }
564                .into_iter(),
565            )
566            .collect(),
567            identify_agent_version: concat!(env!("CARGO_PKG_NAME"), " ", env!("CARGO_PKG_VERSION")).to_owned(),
568            noise_key,
569            tasks_executor: {
570                let executor = config.tasks_executor.clone();
571                Box::new(move |task| executor(task))
572            },
573            log_callback: config.log_callback.clone(),
574            jaeger_service: jaeger_service.clone(),
575        })
576        .await
577        .map_err(StartError::NetworkInit)?;
578
579    let mut network_events_receivers = network_events_receivers.into_iter();
580
581    let keystore = Arc::new({
582        let mut keystore = keystore::Keystore::new(config.chain.keystore_path, rand::random())
583            .await
584            .map_err(StartError::KeystoreInit)?;
585        for mut private_key in config.chain.keystore_memory {
586            keystore.insert_sr25519_memory(keystore::KeyNamespace::all(), &private_key);
587            zeroize::Zeroize::zeroize(&mut *private_key);
588        }
589        keystore
590    });
591
592    let consensus_service = consensus_service::ConsensusService::new(consensus_service::Config {
593        tasks_executor: {
594            let executor = config.tasks_executor.clone();
595            Box::new(move |task| executor(task))
596        },
597        log_callback: config.log_callback.clone(),
598        genesis_block_hash,
599        network_events_receiver: network_events_receivers.next().unwrap(),
600        network_service: (network_service.clone(), network_service_chain_ids[0]),
601        database: database.clone(),
602        block_number_bytes: usize::from(chain_spec.block_number_bytes()),
603        keystore,
604        jaeger_service: jaeger_service.clone(),
605        slot_duration_author_ratio: 43691_u16,
606    })
607    .await
608    .map_err(StartError::ConsensusServiceInit)?;
609
610    let relay_chain_consensus_service = if let Some(relay_chain_database) = &relay_chain_database {
611        Some(
612            consensus_service::ConsensusService::new(consensus_service::Config {
613                tasks_executor: {
614                    let executor = config.tasks_executor.clone();
615                    Box::new(move |task| executor(task))
616                },
617                log_callback: config.log_callback.clone(),
618                genesis_block_hash: relay_genesis_chain_information
619                    .as_ref()
620                    .unwrap()
621                    .as_ref()
622                    .finalized_block_header
623                    .hash(usize::from(
624                        relay_chain_spec.as_ref().unwrap().block_number_bytes(),
625                    )),
626                network_events_receiver: network_events_receivers.next().unwrap(),
627                network_service: (network_service.clone(), network_service_chain_ids[1]),
628                database: relay_chain_database.clone(),
629                block_number_bytes: usize::from(
630                    relay_chain_spec.as_ref().unwrap().block_number_bytes(),
631                ),
632                keystore: Arc::new({
633                    let mut keystore = keystore::Keystore::new(
634                        config.relay_chain.as_ref().unwrap().keystore_path.clone(),
635                        rand::random(),
636                    )
637                    .await
638                    .map_err(StartError::RelayChainKeystoreInit)?;
639                    for mut private_key in
640                        mem::take(&mut config.relay_chain.as_mut().unwrap().keystore_memory)
641                    {
642                        keystore.insert_sr25519_memory(keystore::KeyNamespace::all(), &private_key);
643                        zeroize::Zeroize::zeroize(&mut *private_key);
644                    }
645                    keystore
646                }),
647                jaeger_service, // TODO: consider passing a different jaeger service with a different service name
648                slot_duration_author_ratio: 43691_u16,
649            })
650            .await
651            .map_err(StartError::RelayChainConsensusServiceInit)?,
652        )
653    } else {
654        None
655    };
656
657    // Start the JSON-RPC service.
658    // It only needs to be kept alive in order to function.
659    //
660    // Note that initialization can fail if, for example, the port is already occupied. It is
661    // preferable to fail to start the node altogether rather than make the user believe that they
662    // are connected to the JSON-RPC endpoint of the node while they are in reality connected to
663    // something else.
664    let json_rpc_service = json_rpc_service::JsonRpcService::new(json_rpc_service::Config {
665        tasks_executor: config.tasks_executor.clone(),
666        log_callback: config.log_callback.clone(),
667        database,
668        consensus_service: consensus_service.clone(),
669        network_service: (network_service.clone(), network_service_chain_ids[0]),
670        bind_address: config.chain.json_rpc_listen.as_ref().map(|cfg| cfg.address),
671        max_parallel_requests: 32,
672        max_json_rpc_clients: config
673            .chain
674            .json_rpc_listen
675            .map_or(0, |cfg| cfg.max_json_rpc_clients),
676        chain_name: chain_spec.name().to_owned(),
677        chain_type: chain_spec.chain_type().to_owned(),
678        chain_properties_json: chain_spec.properties().to_owned(),
679        chain_is_live: chain_spec.has_live_network(),
680        genesis_block_hash: genesis_chain_information
681            .as_ref()
682            .finalized_block_header
683            .hash(usize::from(chain_spec.block_number_bytes())),
684    })
685    .await
686    .map_err(StartError::JsonRpcServiceInit)?;
687
688    // Start the JSON-RPC service of the relay chain.
689    // See remarks above.
690    let relay_chain_json_rpc_service = if let Some(relay_chain_cfg) = config.relay_chain {
691        let relay_chain_spec = relay_chain_spec.as_ref().unwrap();
692        Some(
693            json_rpc_service::JsonRpcService::new(json_rpc_service::Config {
694                tasks_executor: config.tasks_executor.clone(),
695                log_callback: config.log_callback.clone(),
696                database: relay_chain_database.clone().unwrap(),
697                consensus_service: relay_chain_consensus_service.clone().unwrap(),
698                network_service: (network_service.clone(), network_service_chain_ids[1]),
699                bind_address: relay_chain_cfg
700                    .json_rpc_listen
701                    .as_ref()
702                    .map(|cfg| cfg.address),
703                max_parallel_requests: 32,
704                max_json_rpc_clients: relay_chain_cfg
705                    .json_rpc_listen
706                    .map_or(0, |cfg| cfg.max_json_rpc_clients),
707                chain_name: relay_chain_spec.name().to_owned(),
708                chain_type: relay_chain_spec.chain_type().to_owned(),
709                chain_properties_json: relay_chain_spec.properties().to_owned(),
710                chain_is_live: relay_chain_spec.has_live_network(),
711                genesis_block_hash: relay_genesis_chain_information
712                    .as_ref()
713                    .unwrap()
714                    .as_ref()
715                    .finalized_block_header
716                    .hash(usize::from(relay_chain_spec.block_number_bytes())),
717            })
718            .await
719            .map_err(StartError::JsonRpcServiceInit)?,
720        )
721    } else {
722        None
723    };
724
725    // Spawn the task printing the informant.
726    // This is not just a dummy task that just prints on the output, but is actually the main
727    // task that holds everything else alive. Without it, all the services that we have created
728    // above would be cleanly dropped and nothing would happen.
729    // For this reason, it must be spawned even if no informant is started, in which case we simply
730    // inhibit the printing.
731    let network_known_best = Arc::new(Mutex::new(None));
732    (config.tasks_executor)(Box::pin({
733        let mut main_network_events_receiver = network_events_receivers.next().unwrap();
734        let network_service_chain_id = network_service_chain_ids[0];
735        let network_known_best = network_known_best.clone();
736
737        // TODO: shut down this task if the client stops?
738        async move {
739            loop {
740                let network_event = main_network_events_receiver.next().await.unwrap();
741                let mut network_known_best = network_known_best.lock().await;
742
743                match network_event {
744                    network_service::Event::BlockAnnounce {
745                        chain_id,
746                        scale_encoded_header,
747                        ..
748                    } if chain_id == network_service_chain_id => match (
749                        *network_known_best,
750                        header::decode(
751                            &scale_encoded_header,
752                            usize::from(chain_spec.block_number_bytes()),
753                        ),
754                    ) {
755                        (Some(n), Ok(header)) if n >= header.number => {}
756                        (_, Ok(header)) => *network_known_best = Some(header.number),
757                        (_, Err(_)) => {
758                            // Do nothing if the block is invalid. This is just for the
759                            // informant and not for consensus-related purposes.
760                        }
761                    },
762                    network_service::Event::Connected {
763                        chain_id,
764                        best_block_number,
765                        ..
766                    } if chain_id == network_service_chain_id => match *network_known_best {
767                        Some(n) if n >= best_block_number => {}
768                        _ => *network_known_best = Some(best_block_number),
769                    },
770                    _ => {}
771                }
772            }
773        }
774    }));
775
776    config.log_callback.log(
777        LogLevel::Info,
778        format!(
779            "successful-initialization; local_peer_id={}; database_is_new={:?}; \
780                finalized_block_hash={}; finalized_block_number={}",
781            local_peer_id,
782            !database_existed,
783            HashDisplay(&database_finalized_block_hash),
784            database_finalized_block_number
785        ),
786    );
787
788    debug_assert!(network_events_receivers.next().is_none());
789    Ok(Client {
790        consensus_service,
791        relay_chain_consensus_service,
792        json_rpc_service,
793        relay_chain_json_rpc_service,
794        network_service,
795        network_known_best,
796    })
797}
798
799/// Opens the database from the file system, or create a new database if none is found.
800///
801/// If `db_path` is `None`, open the database in memory instead.
802///
803/// The returned boolean is `true` if the database existed before.
804///
805/// # Panic
806///
807/// Panics if the database can't be open. This function is expected to be called from the `main`
808/// function.
809///
810async fn open_database(
811    chain_spec: &chain_spec::ChainSpec,
812    genesis_chain_information: chain::chain_information::ChainInformationRef<'_>,
813    db_path: Option<PathBuf>,
814    sqlite_cache_size: usize,
815) -> (full_sqlite::SqliteFullDatabase, bool) {
816    // The `unwrap()` here can panic for example in case of access denied.
817    match full_sqlite::open(full_sqlite::Config {
818        block_number_bytes: chain_spec.block_number_bytes().into(),
819        cache_size: sqlite_cache_size,
820        ty: if let Some(path) = &db_path {
821            full_sqlite::ConfigTy::Disk {
822                path,
823                memory_map_size: 1000000000, // TODO: make configurable
824            }
825        } else {
826            full_sqlite::ConfigTy::Memory
827        },
828    })
829    .unwrap()
830    {
831        // Database already exists and contains data.
832        full_sqlite::DatabaseOpen::Open(database) => {
833            if database.block_hash_by_number(0).unwrap().next().unwrap()
834                != genesis_chain_information
835                    .finalized_block_header
836                    .hash(chain_spec.block_number_bytes().into())
837            {
838                panic!("Mismatch between database and chain specification. Shutting down node.");
839            }
840
841            (database, true)
842        }
843
844        // The database doesn't exist or is empty.
845        full_sqlite::DatabaseOpen::Empty(empty) => {
846            let genesis_storage = chain_spec.genesis_storage().into_genesis_items().unwrap(); // TODO: return error instead
847
848            // In order to determine the state_version of the genesis block, we need to compile
849            // the runtime.
850            // TODO: return errors instead of panicking
851            // TODO: consider not throwing away the runtime
852            let state_version = executor::host::HostVmPrototype::new(executor::host::Config {
853                module: genesis_storage.value(b":code").unwrap(),
854                heap_pages: executor::storage_heap_pages_to_value(
855                    genesis_storage.value(b":heappages"),
856                )
857                .unwrap(),
858                exec_hint: executor::vm::ExecHint::ValidateAndExecuteOnce,
859                allow_unresolved_imports: true,
860            })
861            .unwrap()
862            .runtime_version()
863            .decode()
864            .state_version
865            .map(u8::from)
866            .unwrap_or(0);
867
868            // The chain specification only contains trie nodes that have a storage value attached
869            // to them, while the database needs to know all trie nodes (including branch nodes).
870            // The good news is that we can determine the latter from the former, which we do
871            // here.
872            // TODO: consider moving this block to the chain spec module
873            // TODO: poorly optimized
874            let mut trie_structure = {
875                let mut trie_structure = trie::trie_structure::TrieStructure::new();
876                for (key, value) in genesis_storage.iter() {
877                    match trie_structure.node(trie::bytes_to_nibbles(key.iter().copied())) {
878                        trie::trie_structure::Entry::Vacant(e) => {
879                            e.insert_storage_value().insert(
880                                (Some(value), None::<trie::trie_node::MerkleValueOutput>),
881                                (None, None),
882                            );
883                        }
884                        trie::trie_structure::Entry::Occupied(
885                            trie::trie_structure::NodeAccess::Branch(mut e),
886                        ) => {
887                            *e.user_data() = (Some(value), None);
888                            e.insert_storage_value();
889                        }
890                        trie::trie_structure::Entry::Occupied(
891                            trie::trie_structure::NodeAccess::Storage(_),
892                        ) => {
893                            // Duplicate entry.
894                            panic!() // TODO: don't panic?
895                        }
896                    }
897                }
898
899                // Calculate the Merkle values of the nodes.
900                for node_index in trie_structure
901                    .iter_ordered()
902                    .collect::<Vec<_>>()
903                    .into_iter()
904                    .rev()
905                {
906                    let mut node_access = trie_structure.node_by_index(node_index).unwrap();
907
908                    let children = core::array::from_fn::<_, 16, _>(|n| {
909                        node_access
910                            .child(trie::Nibble::try_from(u8::try_from(n).unwrap()).unwrap())
911                            .map(|mut child| child.user_data().1.as_ref().unwrap().clone())
912                    });
913
914                    let is_root_node = node_access.is_root_node();
915                    let partial_key = node_access.partial_key().collect::<Vec<_>>().into_iter();
916
917                    // We have to hash the storage value ahead of time if necessary due to borrow
918                    // checking difficulties.
919                    let storage_value_hashed =
920                        match (node_access.user_data().0.as_ref(), state_version) {
921                            (Some(v), 1) => {
922                                if v.len() >= 33 {
923                                    Some(blake2_rfc::blake2b::blake2b(32, &[], v))
924                                } else {
925                                    None
926                                }
927                            }
928                            _ => None,
929                        };
930                    let storage_value = match (
931                        node_access.user_data().0.as_ref(),
932                        storage_value_hashed.as_ref(),
933                    ) {
934                        (_, Some(storage_value_hashed)) => trie::trie_node::StorageValue::Hashed(
935                            <&[u8; 32]>::try_from(storage_value_hashed.as_bytes()).unwrap(),
936                        ),
937                        (Some(v), None) => trie::trie_node::StorageValue::Unhashed(&v[..]),
938                        (None, _) => trie::trie_node::StorageValue::None,
939                    };
940
941                    let merkle_value = trie::trie_node::calculate_merkle_value(
942                        trie::trie_node::Decoded {
943                            children,
944                            partial_key,
945                            storage_value,
946                        },
947                        trie::HashFunction::Blake2,
948                        is_root_node,
949                    )
950                    .unwrap();
951
952                    node_access.into_user_data().1 = Some(merkle_value);
953                }
954
955                trie_structure
956            };
957
958            // Build the iterator of trie nodes.
959            let genesis_storage_full_trie = trie_structure
960                .iter_unordered()
961                .collect::<Vec<_>>()
962                .into_iter()
963                .map(|node_index| {
964                    let (storage_value, Some(merkle_value)) = &trie_structure[node_index] else {
965                        unreachable!()
966                    };
967                    // Cloning to solve borrow checker restriction. // TODO: optimize?
968                    let storage_value = if let Some(storage_value) = storage_value {
969                        // TODO: child tries support?
970                        full_sqlite::InsertTrieNodeStorageValue::Value {
971                            value: Cow::Owned(storage_value.to_vec()),
972                            references_merkle_value: false,
973                        }
974                    } else {
975                        full_sqlite::InsertTrieNodeStorageValue::NoValue
976                    };
977                    let merkle_value = merkle_value.as_ref().to_owned();
978                    let mut node_access = trie_structure.node_by_index(node_index).unwrap();
979
980                    full_sqlite::InsertTrieNode {
981                        storage_value,
982                        merkle_value: Cow::Owned(merkle_value),
983                        children_merkle_values: array::from_fn::<_, 16, _>(|n| {
984                            let child_index =
985                                trie::Nibble::try_from(u8::try_from(n).unwrap()).unwrap();
986                            node_access.child(child_index).map(|mut child| {
987                                Cow::Owned(child.user_data().1.as_ref().unwrap().as_ref().to_vec())
988                            })
989                        }),
990                        partial_key_nibbles: Cow::Owned(
991                            node_access.partial_key().map(u8::from).collect::<Vec<_>>(),
992                        ),
993                    }
994                });
995
996            // The finalized block is the genesis block. As such, it has an empty body and
997            // no justification.
998            let database = empty
999                .initialize(
1000                    &genesis_chain_information
1001                        .finalized_block_header
1002                        .scale_encoding_vec(chain_spec.block_number_bytes().into()),
1003                    iter::empty(),
1004                    None,
1005                )
1006                .unwrap();
1007            database
1008                .insert_trie_nodes(genesis_storage_full_trie, state_version)
1009                .unwrap();
1010            (database, false)
1011        }
1012    }
1013}