diff --git a/CHANGELOG.md b/CHANGELOG.md index fbf5ee18bf0..327829f32f4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,10 +10,12 @@ Description of the upcoming release here. ### Fixed +- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Can handle missing tables in snapshot. - [#1814](https://github.com/FuelLabs/fuel-core/pull/1814): Bugfix: the `iter_all_by_prefix` was not working for all tables. The change adds a `Rust` level filtering. ### Added +- [#1821](https://github.com/FuelLabs/fuel-core/pull/1821): Propagate shutdown signal to (re)genesis. Also add progress bar for (re)genesis. - [#1813](https://github.com/FuelLabs/fuel-core/pull/1813): Added back support for `/health` endpoint. - [#1799](https://github.com/FuelLabs/fuel-core/pull/1799): Snapshot creation is now concurrent. diff --git a/Cargo.lock b/Cargo.lock index f801f28a068..45ab62acd3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1282,6 +1282,7 @@ dependencies = [ "encode_unicode", "lazy_static", "libc", + "unicode-width", "windows-sys 0.52.0", ] @@ -2877,6 +2878,7 @@ dependencies = [ "futures", "hex", "hyper", + "indicatif", "itertools 0.12.1", "mockall", "num_cpus", @@ -4365,6 +4367,19 @@ dependencies = [ "serde", ] +[[package]] +name = "indicatif" +version = "0.17.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "763a5a8f45087d6bcea4222e7b72c291a054edf80e4ef6efd2a4979878c7bea3" +dependencies = [ + "console", + "instant", + "number_prefix", + "portable-atomic", + "unicode-width", +] + [[package]] name = "inout" version = "0.1.3" @@ -5804,6 +5819,12 @@ dependencies = [ "syn 2.0.58", ] +[[package]] +name = "number_prefix" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "830b246a0e5f20af87141b25c173cd1b609bd7779a4617d6ec582abaf90870f3" + [[package]] name = "numtoa" version = "0.1.0" @@ -6298,6 +6319,12 @@ dependencies = [ "universal-hash", ] +[[package]] +name = "portable-atomic" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7170ef9988bc169ba16dd36a7fa041e5c4cbeb6a35b76d4c03daded371eae7c0" + [[package]] name = "postcard" version = "1.0.8" @@ -8761,6 +8788,12 @@ dependencies = [ "tinyvec", ] +[[package]] +name = "unicode-width" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e51733f11c9c4f72aa0c160008246859e340b00807569a0da0e7a1079b27ba85" + [[package]] name = "unicode-xid" version = "0.2.4" diff --git a/Cargo.toml b/Cargo.toml index 9cbedbaa96d..736476fd6eb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -127,6 +127,7 @@ pin-project-lite = "0.2" axum = "0.5" once_cell = "1.16" prometheus-client = "0.22.0" +indicatif = { version = "0.17", default-features = false } itertools = { version = "0.12", default-features = false } insta = "1.8" tempfile = "3.4" diff --git a/bin/fuel-core/src/cli/run.rs b/bin/fuel-core/src/cli/run.rs index 4dc0ea9b64b..9871834123b 100644 --- a/bin/fuel-core/src/cli/run.rs +++ b/bin/fuel-core/src/cli/run.rs @@ -14,7 +14,10 @@ use anyhow::Context; use clap::Parser; use fuel_core::{ chain_config::default_consensus_dev_key, - combined_database::CombinedDatabaseConfig, + combined_database::{ + CombinedDatabase, + CombinedDatabaseConfig, + }, producer::Config as ProducerConfig, service::{ config::Trigger, @@ -377,16 +380,30 @@ pub async fn exec(command: Command) -> anyhow::Result<()> { info!("Fuel Core version v{}", env!("CARGO_PKG_VERSION")); trace!("Initializing in TRACE mode."); // initialize the server - let server = FuelService::new_node(config).await?; + let combined_database = CombinedDatabase::from_config(&config.combined_db_config)?; + + let service = FuelService::new(combined_database, config)?; + + // Genesis could take a long time depending on the snapshot size. Start needs to be + // interruptible by the shutdown_signal + tokio::select! { + result = service.start_and_await() => { + result?; + } + _ = shutdown_signal() => { + service.stop(); + } + } + // pause the main task while service is running tokio::select! { - result = server.await_stop() => { + result = service.await_stop() => { result?; } _ = shutdown_signal() => {} } - server.stop_and_await().await?; + service.stop_and_await().await?; Ok(()) } diff --git a/bin/fuel-core/src/cli/snapshot.rs b/bin/fuel-core/src/cli/snapshot.rs index f79fb424654..7213c856642 100644 --- a/bin/fuel-core/src/cli/snapshot.rs +++ b/bin/fuel-core/src/cli/snapshot.rs @@ -323,7 +323,7 @@ mod tests { reader .read::() .unwrap() - .map_ok(|group| group.data) + .into_iter() .flatten_ok() .try_collect() .unwrap() @@ -784,11 +784,7 @@ mod tests { T::OwnedValue: serde::de::DeserializeOwned + core::fmt::Debug + PartialEq, StateConfig: AsTable, { - let actual = reader - .read() - .unwrap() - .map(|group| group.unwrap().data) - .collect_vec(); + let actual: Vec<_> = reader.read().unwrap().into_iter().try_collect().unwrap(); let expected = expected_data .into_iter() diff --git a/crates/chain-config/src/config/state.rs b/crates/chain-config/src/config/state.rs index 9f20824d02a..f3233926b96 100644 --- a/crates/chain-config/src/config/state.rs +++ b/crates/chain-config/src/config/state.rs @@ -418,7 +418,7 @@ impl StateConfig { let coins = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -426,7 +426,7 @@ impl StateConfig { let messages = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -434,7 +434,7 @@ impl StateConfig { let contract_state = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -442,7 +442,7 @@ impl StateConfig { let contract_balance = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -450,7 +450,7 @@ impl StateConfig { let contract_code = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -458,7 +458,7 @@ impl StateConfig { let contract_utxo = reader .read::()? - .map_ok(|batch| batch.data) + .into_iter() .flatten_ok() .try_collect()?; @@ -532,7 +532,8 @@ impl StateConfig { } pub use reader::{ - IntoIter, + GroupIter, + Groups, SnapshotReader, }; #[cfg(feature = "parquet")] @@ -544,13 +545,6 @@ pub use writer::{ }; pub const MAX_GROUP_SIZE: usize = usize::MAX; -#[derive(Debug, PartialEq, Eq, Clone)] -pub struct Group { - pub index: usize, - pub data: Vec, -} -pub(crate) type GroupResult = anyhow::Result>; - #[cfg(test)] mod tests { use std::path::Path; @@ -717,6 +711,25 @@ mod tests { pretty_assertions::assert_eq!(da_block_height, da_block_height_decoded); } + #[test_case::test_case(given_parquet_writer)] + #[test_case::test_case(given_json_writer)] + fn missing_tables_tolerated(writer: impl FnOnce(&Path) -> SnapshotWriter) { + // given + let temp_dir = tempfile::tempdir().unwrap(); + let writer = writer(temp_dir.path()); + let snapshot = writer + .close(13.into(), 14u64.into(), &ChainConfig::local_testnet()) + .unwrap(); + + let reader = SnapshotReader::open(snapshot).unwrap(); + + // when + let coins = reader.read::().unwrap(); + + // then + assert_eq!(coins.into_iter().count(), 0); + } + fn assert_roundtrip( writer: impl FnOnce(&Path) -> SnapshotWriter, reader: impl FnOnce(SnapshotMetadata, usize) -> SnapshotReader, @@ -755,7 +768,11 @@ mod tests { .close(10.into(), DaBlockHeight(11), &ChainConfig::local_testnet()) .unwrap(); - let actual_groups = reader(snapshot, group_size).read().unwrap().collect_vec(); + let actual_groups = reader(snapshot, group_size) + .read() + .unwrap() + .into_iter() + .collect_vec(); // then assert_groups_identical(&expected_groups, actual_groups, skip_n_groups); @@ -779,7 +796,7 @@ mod tests { fn write_groups( &mut self, encoder: &mut SnapshotWriter, - ) -> Vec>> + ) -> Vec>> where T: TableWithBlueprint, T::OwnedKey: serde::Serialize, @@ -789,12 +806,12 @@ mod tests { { let groups = self.generate_groups(); for group in &groups { - encoder.write(group.data.clone()).unwrap(); + encoder.write(group.clone()).unwrap(); } groups } - fn generate_groups(&mut self) -> Vec> + fn generate_groups(&mut self) -> Vec> where T: Randomize, { @@ -802,16 +819,14 @@ mod tests { .chunks(self.group_size) .into_iter() .map(|chunk| chunk.collect_vec()) - .enumerate() - .map(|(index, data)| Group { index, data }) .take(self.num_groups) .collect() } } fn assert_groups_identical( - original: &[Group], - read: impl IntoIterator, anyhow::Error>>, + original: &[Vec], + read: impl IntoIterator, anyhow::Error>>, skip: usize, ) where Vec: PartialEq, diff --git a/crates/chain-config/src/config/state/parquet.rs b/crates/chain-config/src/config/state/parquet.rs index bc068147549..6624a164842 100644 --- a/crates/chain-config/src/config/state/parquet.rs +++ b/crates/chain-config/src/config/state/parquet.rs @@ -3,7 +3,6 @@ pub mod encode; #[cfg(test)] mod tests { - use crate::Group; use bytes::Bytes; use itertools::Itertools; use parquet::{ @@ -128,7 +127,7 @@ mod tests { let mut decoder = Decoder::new(bytes).unwrap(); // when - let _: Group<_> = decoder.nth(1).unwrap().unwrap(); + let _: Vec<_> = decoder.nth(1).unwrap().unwrap(); // then let actually_read = bytes_read.load(std::sync::atomic::Ordering::SeqCst); diff --git a/crates/chain-config/src/config/state/parquet/decode.rs b/crates/chain-config/src/config/state/parquet/decode.rs index a0ee9925959..d410c724ecd 100644 --- a/crates/chain-config/src/config/state/parquet/decode.rs +++ b/crates/chain-config/src/config/state/parquet/decode.rs @@ -14,27 +14,20 @@ use parquet::{ record::RowAccessor, }; -use crate::config::state::{ - Group, - GroupResult, -}; - pub struct Decoder { data_source: SerializedFileReader, group_index: usize, } -pub trait Decode { - fn decode(bytes: &[u8]) -> anyhow::Result - where - Self: Sized; -} - impl Decoder where R: ChunkReader + 'static, { - fn current_group(&self) -> anyhow::Result>> { + pub fn num_groups(&self) -> usize { + self.data_source.num_row_groups() + } + + fn current_group(&self) -> anyhow::Result>> { let data = self .data_source .get_row_group(self.group_index)? @@ -51,10 +44,7 @@ where }) .collect::, _>>()?; - Ok(Group { - index: self.group_index, - data, - }) + Ok(data) } } @@ -62,7 +52,7 @@ impl Iterator for Decoder where R: ChunkReader + 'static, { - type Item = GroupResult>; + type Item = anyhow::Result>>; fn next(&mut self) -> Option { if self.group_index >= self.data_source.metadata().num_row_groups() { diff --git a/crates/chain-config/src/config/state/reader.rs b/crates/chain-config/src/config/state/reader.rs index 8c10e8f98c8..1ed2c3a8d28 100644 --- a/crates/chain-config/src/config/state/reader.rs +++ b/crates/chain-config/src/config/state/reader.rs @@ -1,6 +1,9 @@ use std::fmt::Debug; -use fuel_core_storage::structured_storage::TableWithBlueprint; +use fuel_core_storage::{ + structured_storage::TableWithBlueprint, + Mappable, +}; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, fuel_types::BlockHeight, @@ -11,15 +14,50 @@ use crate::{ config::table_entry::TableEntry, AsTable, ChainConfig, - Group, - GroupResult, StateConfig, MAX_GROUP_SIZE, }; -pub enum IntoIter { +pub struct Groups { + iter: GroupIter, +} + +impl Groups +where + T: Mappable, +{ + pub fn len(&self) -> usize { + match &self.iter { + GroupIter::InMemory { groups } => groups.len(), + #[cfg(feature = "parquet")] + GroupIter::Parquet { decoder } => decoder.num_groups(), + } + } + + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl IntoIterator for Groups +where + T: Mappable, + GroupIter: Iterator, +{ + type IntoIter = GroupIter; + type Item = ::Item; + + fn into_iter(self) -> Self::IntoIter { + self.iter + } +} + +pub enum GroupIter +where + T: Mappable, +{ InMemory { - groups: std::vec::IntoIter>, + groups: std::vec::IntoIter>>>, }, #[cfg(feature = "parquet")] Parquet { @@ -28,26 +66,24 @@ pub enum IntoIter { } #[cfg(feature = "parquet")] -impl Iterator for IntoIter +impl Iterator for GroupIter where - T: serde::de::DeserializeOwned, + T: Mappable, + TableEntry: serde::de::DeserializeOwned, { - type Item = GroupResult; + type Item = anyhow::Result>>; fn next(&mut self) -> Option { match self { - IntoIter::InMemory { groups } => groups.next(), - IntoIter::Parquet { decoder } => { - let group = decoder.next()?.and_then(|bytes_group| { - let decoded = bytes_group - .data + GroupIter::InMemory { groups } => groups.next(), + GroupIter::Parquet { decoder } => { + let group = decoder.next()?.and_then(|byte_group| { + byte_group .into_iter() - .map(|bytes| postcard::from_bytes(&bytes)) - .try_collect()?; - Ok(Group { - index: bytes_group.index, - data: decoded, - }) + .map(|group| { + postcard::from_bytes(&group).map_err(|e| anyhow::anyhow!(e)) + }) + .collect() }); Some(group) } @@ -56,12 +92,15 @@ where } #[cfg(not(feature = "parquet"))] -impl Iterator for IntoIter { - type Item = GroupResult; +impl Iterator for GroupIter +where + T: Mappable, +{ + type Item = anyhow::Result>>; fn next(&mut self) -> Option { match self { - IntoIter::InMemory { groups } => groups.next(), + GroupIter::InMemory { groups } => groups.next(), } } } @@ -176,8 +215,7 @@ impl SnapshotReader { let file = std::fs::File::open(path)?; let group = Decoder::new(file)? .next() - .ok_or_else(|| anyhow::anyhow!("No block height found"))?? - .data; + .ok_or_else(|| anyhow::anyhow!("No block height found"))??; let block_height = group .into_iter() .next() @@ -220,28 +258,32 @@ impl SnapshotReader { } } - pub fn read(&self) -> anyhow::Result>> + pub fn read(&self) -> anyhow::Result> where T: TableWithBlueprint, StateConfig: AsTable, TableEntry: serde::de::DeserializeOwned, { - match &self.data_source { + let iter = match &self.data_source { #[cfg(feature = "parquet")] DataSource::Parquet { tables, .. } => { use anyhow::Context; use fuel_core_storage::kv_store::StorageColumn; let name = T::column().name(); - let path = tables.get(name).ok_or_else(|| { - anyhow::anyhow!("table '{name}' not found in snapshot metadata.") - })?; + let Some(path) = tables.get(name) else { + return Ok(Groups { + iter: GroupIter::InMemory { + groups: vec![].into_iter(), + }, + }); + }; let file = std::fs::File::open(path).with_context(|| { format!("Could not open {path:?} in order to read table '{name}'") })?; - Ok(IntoIter::Parquet { + GroupIter::Parquet { decoder: super::parquet::decode::Decoder::new(file)?, - }) + } } DataSource::InMemory { state, group_size } => { let collection = state @@ -249,19 +291,15 @@ impl SnapshotReader { .into_iter() .chunks(*group_size) .into_iter() - .enumerate() - .map(|(index, vec_chunk)| { - Ok(Group { - data: vec_chunk.collect(), - index, - }) - }) + .map(|vec_chunk| Ok(vec_chunk.collect())) .collect_vec(); - Ok(IntoIter::InMemory { + GroupIter::InMemory { groups: collection.into_iter(), - }) + } } - } + }; + + Ok(Groups { iter }) } pub fn chain_config(&self) -> &ChainConfig { diff --git a/crates/fuel-core/Cargo.toml b/crates/fuel-core/Cargo.toml index 5076c8bec3e..851de726ca7 100644 --- a/crates/fuel-core/Cargo.toml +++ b/crates/fuel-core/Cargo.toml @@ -39,6 +39,7 @@ fuel-core-upgradable-executor = { workspace = true } futures = { workspace = true } hex = { version = "0.4", features = ["serde"] } hyper = { workspace = true } +indicatif = { workspace = true, default-features = true } itertools = { workspace = true } num_cpus = { version = "1.16.0", optional = true } rand = { workspace = true } @@ -86,6 +87,7 @@ test-helpers = [ "fuel-core-storage/test-helpers", "fuel-core-chain-config/test-helpers", "fuel-core-txpool/test-helpers", + "fuel-core-services/test-helpers", ] # features to enable in production, but increase build times rocksdb-production = ["rocksdb", "rocksdb/jemalloc"] diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 237229f4189..0597e45001b 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -380,12 +380,18 @@ pub fn make_config(name: String, mut node_config: Config) -> Config { pub async fn make_node(node_config: Config, test_txs: Vec) -> Node { let db = Database::in_memory(); + let time_limit = Duration::from_secs(4); let node = tokio::time::timeout( - Duration::from_secs(2), + time_limit, FuelService::from_database(db.clone(), node_config), ) .await - .expect("All services should start in less than 2 seconds") + .unwrap_or_else(|_| { + panic!( + "All services should start in less than {} seconds", + time_limit.as_secs() + ) + }) .expect("The `FuelService should start without error"); let config = node.shared.config.clone(); diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index de280a6dbfe..761941766c8 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -218,13 +218,14 @@ impl RunnableService for Task { async fn into_task( self, - _: &StateWatcher, + watcher: &StateWatcher, _: Self::TaskParams, ) -> anyhow::Result { // check if chain is initialized if let Err(err) = self.shared.database.on_chain().get_genesis() { if err.is_not_found() { let result = genesis::execute_genesis_block( + watcher.clone(), &self.shared.config, &self.shared.database, ) diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index 575efdd75e5..1f1660d4c88 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -10,6 +10,7 @@ use crate::{ service::config::Config, }; use fuel_core_chain_config::GenesisCommitment; +use fuel_core_services::StateWatcher; use fuel_core_storage::{ iter::IteratorOverTable, tables::{ @@ -54,19 +55,16 @@ mod importer; mod task_manager; pub use exporter::Exporter; -use tokio_util::sync::CancellationToken; use self::importer::SnapshotImporter; /// Performs the importing of the genesis block from the snapshot. pub async fn execute_genesis_block( + watcher: StateWatcher, config: &Config, db: &CombinedDatabase, ) -> anyhow::Result> { - // TODO: tie this with a SIGNAL for resumability - let cancel = CancellationToken::new(); - SnapshotImporter::import(db.clone(), config.snapshot_reader.clone(), cancel.clone()) - .await?; + SnapshotImporter::import(db.clone(), config.snapshot_reader.clone(), watcher).await?; let genesis_progress_on_chain: Vec = db .on_chain() @@ -139,7 +137,7 @@ pub async fn execute_and_commit_genesis_block( config: &Config, db: &CombinedDatabase, ) -> anyhow::Result<()> { - let result = execute_genesis_block(config, db).await?; + let result = execute_genesis_block(StateWatcher::default(), config, db).await?; let importer = fuel_core_importer::Importer::new( config .snapshot_reader diff --git a/crates/fuel-core/src/service/genesis/exporter.rs b/crates/fuel-core/src/service/genesis/exporter.rs index fd382d4e894..a055156f18f 100644 --- a/crates/fuel-core/src/service/genesis/exporter.rs +++ b/crates/fuel-core/src/service/genesis/exporter.rs @@ -18,6 +18,7 @@ use fuel_core_chain_config::{ StateConfigBuilder, TableEntry, }; +use fuel_core_services::State; use fuel_core_storage::{ blueprint::BlueprintInspect, iter::IterDirection, @@ -34,8 +35,7 @@ use fuel_core_storage::{ }; use fuel_core_types::fuel_types::ContractId; use itertools::Itertools; - -use tokio_util::sync::CancellationToken; +use tokio::sync::watch; use super::task_manager::TaskManager; @@ -57,12 +57,15 @@ where writer: Fun, group_size: usize, ) -> Self { + // TODO: Support graceful shutdown during the exporting of the snapshot. + // https://github.com/FuelLabs/fuel-core/issues/1828 + let (_, receiver) = watch::channel(State::Started); Self { db, prev_chain_config, writer, group_size, - task_manager: TaskManager::new(CancellationToken::new()), + task_manager: TaskManager::new(receiver.into()), } } diff --git a/crates/fuel-core/src/service/genesis/importer.rs b/crates/fuel-core/src/service/genesis/importer.rs index b17d9904405..e771d064095 100644 --- a/crates/fuel-core/src/service/genesis/importer.rs +++ b/crates/fuel-core/src/service/genesis/importer.rs @@ -1,13 +1,24 @@ -use self::import_task::{ - ImportTable, - ImportTask, +use self::{ + import_task::{ + ImportTable, + ImportTask, + }, + progress::{ + MultipleProgressReporter, + ProgressReporter, + Target, + }, }; use super::task_manager::TaskManager; mod import_task; mod off_chain; mod on_chain; -use std::marker::PhantomData; +mod progress; +use std::{ + io::IsTerminal, + marker::PhantomData, +}; use crate::{ combined_database::CombinedDatabase, @@ -31,7 +42,9 @@ use fuel_core_chain_config::{ StateConfig, TableEntry, }; +use fuel_core_services::StateWatcher; use fuel_core_storage::{ + kv_store::StorageColumn, structured_storage::TableWithBlueprint, tables::{ Coins, @@ -48,35 +61,37 @@ use fuel_core_types::{ fuel_types::BlockHeight, }; -use tokio_util::sync::CancellationToken; +use tracing::Level; pub struct SnapshotImporter { db: CombinedDatabase, task_manager: TaskManager<()>, snapshot_reader: SnapshotReader, + tracing_span: tracing::Span, + multi_progress_reporter: MultipleProgressReporter, } impl SnapshotImporter { fn new( db: CombinedDatabase, snapshot_reader: SnapshotReader, - cancel_token: CancellationToken, + watcher: StateWatcher, ) -> Self { Self { db, - task_manager: TaskManager::new(cancel_token), + task_manager: TaskManager::new(watcher), snapshot_reader, + tracing_span: tracing::info_span!("snapshot_importer"), + multi_progress_reporter: Self::init_multi_progress_reporter(), } } pub async fn import( db: CombinedDatabase, snapshot_reader: SnapshotReader, - cancel_token: CancellationToken, + watcher: StateWatcher, ) -> anyhow::Result<()> { - Self::new(db, snapshot_reader, cancel_token) - .run_workers() - .await + Self::new(db, snapshot_reader, watcher).run_workers().await } async fn run_workers(mut self) -> anyhow::Result<()> { @@ -96,31 +111,36 @@ impl SnapshotImporter { self.spawn_worker_off_chain::()?; self.task_manager.wait().await?; + Ok(()) } - pub fn spawn_worker_on_chain(&mut self) -> anyhow::Result<()> + pub fn spawn_worker_on_chain(&mut self) -> anyhow::Result<()> where - T: TableWithBlueprint + 'static, - TableEntry: serde::de::DeserializeOwned + Send, - StateConfig: AsTable, - Handler: ImportTable, + TableBeingWritten: TableWithBlueprint + 'static + Send, + TableEntry: serde::de::DeserializeOwned + Send, + StateConfig: AsTable, + Handler: + ImportTable, { - let groups = self.snapshot_reader.read::()?; + let groups = self.snapshot_reader.read::()?; + let num_groups = groups.len(); let block_height = self.snapshot_reader.block_height(); let da_block_height = self.snapshot_reader.da_block_height(); let db = self.db.on_chain().clone(); + + let progress_reporter = self.progress_reporter::(num_groups); + self.task_manager.spawn(move |token| { - tokio_rayon::spawn(move || { - ImportTask::new( - token, - Handler::new(block_height, da_block_height), - groups, - db, - ) - .run() - }) + let task = ImportTask::new( + token, + Handler::new(block_height, da_block_height), + groups, + db, + progress_reporter, + ); + tokio_rayon::spawn(move || task.run()) }); Ok(()) @@ -135,27 +155,62 @@ impl SnapshotImporter { StateConfig: AsTable, Handler: ImportTable, - TableBeingWritten: Send + 'static, + TableBeingWritten: TableWithBlueprint + Send + 'static, { let groups = self.snapshot_reader.read::()?; + let num_groups = groups.len(); let block_height = self.snapshot_reader.block_height(); let da_block_height = self.snapshot_reader.da_block_height(); let db = self.db.off_chain().clone(); + + let progress_reporter = self.progress_reporter::(num_groups); + self.task_manager.spawn(move |token| { - tokio_rayon::spawn(move || { - let runner = ImportTask::new( - token, - Handler::::new(block_height, da_block_height), - groups, - db, - ); - runner.run() - }) + let task = ImportTask::new( + token, + Handler::new(block_height, da_block_height), + groups, + db, + progress_reporter, + ); + tokio_rayon::spawn(move || task.run()) }); Ok(()) } + + fn init_multi_progress_reporter() -> MultipleProgressReporter { + if Self::should_display_bars() { + MultipleProgressReporter::new_sterr() + } else { + MultipleProgressReporter::new_hidden() + } + } + + fn should_display_bars() -> bool { + std::io::stderr().is_terminal() && !cfg!(test) + } + + fn progress_reporter(&self, num_groups: usize) -> ProgressReporter + where + T: TableWithBlueprint, + { + let target = if Self::should_display_bars() { + Target::Cli(T::column().name()) + } else { + let span = tracing::span!( + parent: &self.tracing_span, + Level::INFO, + "task", + table = T::column().name() + ); + Target::Logs(span) + }; + + let reporter = ProgressReporter::new(target, num_groups); + self.multi_progress_reporter.register(reporter) + } } #[derive(Debug, Clone, Copy)] diff --git a/crates/fuel-core/src/service/genesis/importer/import_task.rs b/crates/fuel-core/src/service/genesis/importer/import_task.rs index 335ca15f313..2b794e1adf8 100644 --- a/crates/fuel-core/src/service/genesis/importer/import_task.rs +++ b/crates/fuel-core/src/service/genesis/importer/import_task.rs @@ -1,7 +1,5 @@ -use fuel_core_chain_config::{ - Group, - TableEntry, -}; +use anyhow::bail; +use fuel_core_chain_config::TableEntry; use fuel_core_storage::{ kv_store::StorageColumn, structured_storage::TableWithBlueprint, @@ -14,17 +12,21 @@ use fuel_core_storage::{ StorageInspect, StorageMutate, }; -use tokio_util::sync::CancellationToken; -use crate::database::{ - database_description::DatabaseDescription, - genesis_progress::{ - GenesisMetadata, - GenesisProgressMutate, +use crate::{ + database::{ + database_description::DatabaseDescription, + genesis_progress::{ + GenesisMetadata, + GenesisProgressMutate, + }, + Database, }, - Database, + service::genesis::task_manager::CancellationToken, }; +use super::progress::ProgressReporter; + pub struct ImportTask where DbDesc: DatabaseDescription, @@ -34,6 +36,7 @@ where groups: Groups, cancel_token: CancellationToken, db: Database, + reporter: ProgressReporter, } pub trait ImportTable { @@ -59,6 +62,7 @@ where handler: Logic, groups: GroupGenerator, db: Database, + reporter: ProgressReporter, ) -> Self { let skip = match db .storage::>() @@ -76,6 +80,7 @@ where groups, cancel_token, db, + reporter, } } } @@ -85,7 +90,7 @@ where DbDesc: DatabaseDescription, Logic: ImportTable, GroupGenerator: - IntoIterator>>>, + IntoIterator>>>, GenesisMetadata: TableWithBlueprint< Column = DbDesc::Column, Key = str, @@ -98,40 +103,37 @@ where StorageMutate, Error = fuel_core_storage::Error>, { pub fn run(mut self) -> anyhow::Result<()> { - tracing::info!( - "Starting genesis runner. Reading: {} writing into {}", - Logic::TableInSnapshot::column().name(), - Logic::TableBeingWritten::column().name() - ); let mut db = self.db; - let result = self - .groups + let mut is_cancelled = self.cancel_token.is_cancelled(); + self.groups .into_iter() + .enumerate() .skip(self.skip) - .take_while(|_| !self.cancel_token.is_cancelled()) - .try_for_each(move |group| { + .take_while(|_| { + is_cancelled = self.cancel_token.is_cancelled(); + !is_cancelled + }) + .try_for_each(|(index, group)| { let group = group?; - let group_num = group.index; - let mut tx = db.write_transaction(); - self.handler.process(group.data, &mut tx)?; + self.handler.process(group, &mut tx)?; GenesisProgressMutate::::update_genesis_progress( &mut tx, Logic::TableBeingWritten::column().name(), - group_num, + index, )?; tx.commit()?; - Ok(()) - }); + self.reporter + .set_progress(u64::try_from(index).unwrap_or(u64::MAX)); + anyhow::Result::<_>::Ok(()) + })?; - tracing::info!( - "Finishing genesis runner. Read: {} wrote into {}", - Logic::TableInSnapshot::column().name(), - Logic::TableBeingWritten::column().name() - ); + if is_cancelled { + bail!("Import cancelled") + } - result + Ok(()) } } @@ -139,7 +141,13 @@ where mod tests { use crate::{ database::genesis_progress::GenesisProgressInspect, - service::genesis::importer::import_task::ImportTask, + service::genesis::{ + importer::{ + import_task::ImportTask, + progress::ProgressReporter, + }, + task_manager::CancellationToken, + }, }; use std::sync::{ Arc, @@ -151,7 +159,6 @@ mod tests { bail, }; use fuel_core_chain_config::{ - Group, Randomize, TableEntry, }; @@ -192,8 +199,6 @@ mod tests { SeedableRng, }; - use tokio_util::sync::CancellationToken; - use crate::{ combined_database::CombinedDatabase, database::{ @@ -265,19 +270,12 @@ mod tests { .collect() } - pub fn as_indexed_groups(&self) -> Vec>> { - self.batches - .iter() - .enumerate() - .map(|(index, data)| Group { - index, - data: data.clone(), - }) - .collect() + pub fn as_groups(&self) -> Vec>> { + self.batches.clone() } - pub fn as_ok_groups(&self) -> Vec>>> { - self.as_indexed_groups().into_iter().map(Ok).collect() + pub fn as_ok_groups(&self) -> Vec>>> { + self.as_groups().into_iter().map(Ok).collect() } } @@ -288,13 +286,14 @@ mod tests { let mut called_with = vec![]; let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|group, _| { called_with.push(group); Ok(()) }), data.as_ok_groups(), Database::default(), + ProgressReporter::default(), ); // when @@ -317,15 +316,15 @@ mod tests { 0, ) .unwrap(); - let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|element, _| { called_with.push(element); Ok(()) }), data.as_ok_groups(), db.on_chain().clone(), + ProgressReporter::default(), ); // when @@ -343,7 +342,7 @@ mod tests { let utxo_id = UtxoId::new(Default::default(), 0); let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, tx| { insert_a_coin(tx, &utxo_id); @@ -364,6 +363,7 @@ mod tests { }), groups.as_ok_groups(), outer_db.clone(), + ProgressReporter::default(), ); // when @@ -390,13 +390,14 @@ mod tests { let utxo_id = UtxoId::new(Default::default(), 0); let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, tx| { insert_a_coin(tx, &utxo_id); bail!("Some error") }), groups.as_ok_groups(), db.clone(), + ProgressReporter::default(), ); // when @@ -411,10 +412,11 @@ mod tests { // given let groups = TestData::new(1); let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, _| bail!("Some error")), groups.as_ok_groups(), Database::default(), + ProgressReporter::default(), ); // when @@ -429,10 +431,11 @@ mod tests { // given let groups = [Err(anyhow!("Some error"))]; let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, _| Ok(())), groups, Database::default(), + ProgressReporter::default(), ); // when @@ -448,10 +451,11 @@ mod tests { let data = TestData::new(2); let db = Database::default(); let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, _| Ok(())), data.as_ok_groups(), db.clone(), + ProgressReporter::default(), ); // when @@ -470,21 +474,21 @@ mod tests { #[tokio::test] async fn processing_stops_when_cancelled() { // given - let cancel_token = CancellationToken::new(); - let (tx, rx) = std::sync::mpsc::channel(); let read_groups = Arc::new(Mutex::new(vec![])); + let cancel_token = tokio_util::sync::CancellationToken::new(); let runner = { let read_groups = Arc::clone(&read_groups); ImportTask::new( - cancel_token.clone(), + cancel_token.clone().into(), TestHandler::new(move |el, _| { read_groups.lock().unwrap().push(el); Ok(()) }), rx, Database::default(), + ProgressReporter::default(), ) }; @@ -508,11 +512,10 @@ mod tests { // then // runner should finish drop(tx); - let runner_response = runner_handle.join().unwrap(); - assert!( - runner_response.is_ok(), - "Stopping a runner should not be an error" - ); + runner_handle + .join() + .unwrap() + .expect_err("Cancelling is an error"); // group after signal is not read let read_entries = read_groups.lock().unwrap().clone(); @@ -572,10 +575,11 @@ mod tests { // given let groups = TestData::new(1); let runner = ImportTask::new( - CancellationToken::new(), + CancellationToken::default(), TestHandler::new(|_, _| Ok(())), groups.as_ok_groups(), Database::new(Arc::new(BrokenTransactions::new())), + ProgressReporter::default(), ); // when diff --git a/crates/fuel-core/src/service/genesis/importer/progress.rs b/crates/fuel-core/src/service/genesis/importer/progress.rs new file mode 100644 index 00000000000..7faa8bc0f6a --- /dev/null +++ b/crates/fuel-core/src/service/genesis/importer/progress.rs @@ -0,0 +1,90 @@ +use indicatif::{ + HumanDuration, + MultiProgress, + ProgressBar, + ProgressDrawTarget, + ProgressStyle, +}; +use tracing::Span; + +#[derive(Clone)] +pub struct ProgressReporter { + bar: ProgressBar, + target: Target, +} + +impl Default for ProgressReporter { + fn default() -> Self { + Self::new(Target::Logs(tracing::info_span!("default")), usize::MAX) + } +} + +#[derive(Clone)] +pub enum Target { + Cli(&'static str), + Logs(Span), +} + +impl ProgressReporter { + pub fn new(target: Target, max: usize) -> Self { + let max = u64::try_from(max).unwrap_or(u64::MAX); + // Bars always hidden. Will be printed only if added to a `MultipleProgressReporter` that + // prints to stderr. This removes flicker from double rendering (once when the progress bar + // is constructed and again when added to the `MultipleProgressReporter`) + let bar = ProgressBar::with_draw_target(Some(max), ProgressDrawTarget::hidden()); + if let Target::Cli(message) = target { + bar.set_message(message); + let style = ProgressStyle::with_template( + "[{elapsed_precise}] {bar:.64.on_black} {pos:>7}/{len:7} {msg} {eta}", + ) + .unwrap(); + + bar.set_style(style); + } + + ProgressReporter { bar, target } + } + + pub fn set_progress(&self, group_index: u64) { + let group_num = group_index.saturating_add(1); + self.bar.set_position(group_num); + if let Target::Logs(span) = &self.target { + span.in_scope(|| { + if let Some(len) = self.bar.length() { + let human_eta = HumanDuration(self.bar.eta()); + tracing::info!("Processing: {group_num}/{len}. ({human_eta})"); + } else { + tracing::info!("Processing: {}", group_num); + } + }) + } + } +} + +pub struct MultipleProgressReporter { + multi_progress: MultiProgress, +} + +impl MultipleProgressReporter { + pub fn new_sterr() -> Self { + Self::new_target(ProgressDrawTarget::stderr()) + } + + pub fn new_hidden() -> Self { + Self::new_target(ProgressDrawTarget::hidden()) + } + + fn new_target(target: ProgressDrawTarget) -> Self { + Self { + multi_progress: MultiProgress::with_draw_target(target), + } + } + + pub fn register(&self, reporter: ProgressReporter) -> ProgressReporter { + let bar = self.multi_progress.add(reporter.bar); + ProgressReporter { + bar, + target: reporter.target, + } + } +} diff --git a/crates/fuel-core/src/service/genesis/task_manager.rs b/crates/fuel-core/src/service/genesis/task_manager.rs index 5903db16429..322592d5166 100644 --- a/crates/fuel-core/src/service/genesis/task_manager.rs +++ b/crates/fuel-core/src/service/genesis/task_manager.rs @@ -1,26 +1,65 @@ use std::future::Future; +use fuel_core_services::StateWatcher; use futures::{ StreamExt, TryStreamExt, }; use itertools::Itertools; use tokio::task::JoinSet; -use tokio_util::sync::CancellationToken; pub struct TaskManager { set: JoinSet>, - cancel_token: CancellationToken, + cancel: CancellationToken, +} + +#[cfg_attr(feature = "test-helpers", derive(Default))] +#[derive(Clone)] +pub struct CancellationToken { + task_cancellator: tokio_util::sync::CancellationToken, + state_watcher: StateWatcher, +} + +#[cfg(feature = "test-helpers")] +impl From for CancellationToken { + fn from(token: tokio_util::sync::CancellationToken) -> Self { + Self { + task_cancellator: token, + ..Default::default() + } + } +} + +impl CancellationToken { + #[cfg(test)] + pub async fn cancelled(mut self) -> anyhow::Result<()> { + tokio::select! { + _ = self.task_cancellator.cancelled() => Ok(()), + result = self.state_watcher.wait_stopping_or_stopped() => result + } + } + + fn cancel_tasks(&self) { + self.task_cancellator.cancel(); + } + + pub fn is_cancelled(&self) -> bool { + let state = self.state_watcher.borrow(); + self.task_cancellator.is_cancelled() || state.stopped() || state.stopping() + } } impl TaskManager where T: Send + 'static, { - pub fn new(cancel_token: CancellationToken) -> Self { + pub fn new(watcher: StateWatcher) -> Self { Self { set: JoinSet::new(), - cancel_token, + cancel: CancellationToken { + task_cancellator: tokio_util::sync::CancellationToken::new(), + state_watcher: watcher, + }, } } @@ -29,7 +68,7 @@ where F: FnOnce(CancellationToken) -> Fut, Fut: Future> + Send + 'static, { - self.set.spawn(arg(self.cancel_token.clone())); + self.set.spawn(arg(self.cancel.clone())); } pub async fn wait(self) -> anyhow::Result> { @@ -38,7 +77,7 @@ where Some((res, set)) }) .map(|result| result.map_err(Into::into).and_then(|r| r)) - .inspect_err(|_| self.cancel_token.cancel()) + .inspect_err(|_| self.cancel.cancel_tasks()) .collect::>() .await; @@ -51,13 +90,15 @@ mod tests { use std::time::Duration; use anyhow::bail; + use fuel_core_services::State; + use tokio::sync::watch; use super::*; #[tokio::test] async fn task_added_and_completed() { // given - let mut workers = TaskManager::new(CancellationToken::new()); + let mut workers = TaskManager::new(StateWatcher::default()); workers.spawn(|_| async { Ok(8u8) }); // when @@ -70,7 +111,7 @@ mod tests { #[tokio::test] async fn returns_err_on_single_failure() { // given - let mut workers = TaskManager::new(CancellationToken::new()); + let mut workers = TaskManager::new(StateWatcher::default()); workers.spawn(|_| async { Ok(10u8) }); workers.spawn(|_| async { Err(anyhow::anyhow!("I fail")) }); @@ -85,10 +126,12 @@ mod tests { #[tokio::test] async fn signals_cancel_to_non_finished_tasks_on_failure() { // given - let mut workers = TaskManager::new(CancellationToken::new()); + let (_sender, recv) = watch::channel(State::Started); + let watcher = recv.into(); + let mut workers = TaskManager::new(watcher); let (tx, rx) = tokio::sync::oneshot::channel(); workers.spawn(move |token| async move { - token.cancelled().await; + token.cancelled().await.unwrap(); tx.send(()).unwrap(); Ok(()) }); @@ -105,18 +148,18 @@ mod tests { } #[tokio::test] - async fn propagates_cancellation_from_outside() { + async fn reacts_when_state_changes_to_stopping() { // given - let cancel_token = CancellationToken::new(); - let mut workers = TaskManager::new(cancel_token.clone()); + let (sender, receiver) = watch::channel(State::Started); + let mut workers = TaskManager::new(receiver.into()); workers.spawn(move |token| async move { - token.cancelled().await; + token.cancelled().await.unwrap(); Ok(10u8) }); // when - cancel_token.cancel(); + sender.send(State::Stopping).unwrap(); // then let result = tokio::time::timeout(Duration::from_secs(2), workers.wait()) diff --git a/crates/services/src/state.rs b/crates/services/src/state.rs index 26559e2c19c..fb7505a196a 100644 --- a/crates/services/src/state.rs +++ b/crates/services/src/state.rs @@ -39,6 +39,11 @@ impl State { pub fn stopped(&self) -> bool { matches!(self, State::Stopped | State::StoppedWithError(_)) } + + /// is stopping + pub fn stopping(&self) -> bool { + self == &State::Stopping + } } /// The wrapper around the `watch::Receiver`. It repeats the `Receiver` functionality + @@ -99,12 +104,21 @@ impl StateWatcher { loop { let state = self.borrow().clone(); if !state.started() { - return Ok(state) + return Ok(state); } self.changed().await?; } } + + /// Future that resolves once the state is `State::Stopped`. + pub async fn wait_stopping_or_stopped(&mut self) -> anyhow::Result<()> { + let state = self.borrow().clone(); + while !(state.stopped() || state.stopping()) { + self.changed().await?; + } + Ok(()) + } } impl From> for StateWatcher { diff --git a/tests/tests/contract.rs b/tests/tests/contract.rs index df43d99d67c..64dadf73cd7 100644 --- a/tests/tests/contract.rs +++ b/tests/tests/contract.rs @@ -241,10 +241,10 @@ async fn can_get_message_proof() { .snapshot_reader .read::() .unwrap() + .into_iter() .next() .unwrap() - .unwrap() - .data[0] + .unwrap()[0] .value .clone(); diff --git a/tests/tests/messages.rs b/tests/tests/messages.rs index 157b38b0945..63d8bbaa3dd 100644 --- a/tests/tests/messages.rs +++ b/tests/tests/messages.rs @@ -299,10 +299,10 @@ async fn can_get_message_proof() { .snapshot_reader .read::() .unwrap() + .into_iter() .next() .unwrap() - .unwrap() - .data[0] + .unwrap()[0] .value .clone();