diff --git a/CHANGELOG.md b/CHANGELOG.md index 553e0fafc4d..1fe1450a578 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ Description of the upcoming release here. ### Added +- [#1786](https://github.com/FuelLabs/fuel-core/pull/1786): Regenesis now includes off-chain tables. - [#1716](https://github.com/FuelLabs/fuel-core/pull/1716): Added support of WASM state transition along with upgradable execution that works with native(std) and WASM(non-std) executors. The `fuel-core` now requires a `wasm32-unknown-unknown` target to build. - [#1770](https://github.com/FuelLabs/fuel-core/pull/1770): Add the new L1 event type for forced transactions. - [#1767](https://github.com/FuelLabs/fuel-core/pull/1767): Added consensus parameters version and state transition version to the `ApplicationHeader` to describe what was used to produce this block. diff --git a/bin/fuel-core/Cargo.toml b/bin/fuel-core/Cargo.toml index 2318c8bde3a..bc8b178e6d4 100644 --- a/bin/fuel-core/Cargo.toml +++ b/bin/fuel-core/Cargo.toml @@ -43,6 +43,7 @@ tracing-subscriber = { workspace = true, features = [ url = { version = "2.2", optional = true } [dev-dependencies] +fuel-core = { workspace = true, features = ["test-helpers"] } fuel-core-types = { workspace = true, features = ["test-helpers"] } pretty_assertions = { workspace = true } rand = { workspace = true } diff --git a/bin/fuel-core/src/cli/snapshot.rs b/bin/fuel-core/src/cli/snapshot.rs index 3a4430a2884..d4d53facc48 100644 --- a/bin/fuel-core/src/cli/snapshot.rs +++ b/bin/fuel-core/src/cli/snapshot.rs @@ -9,11 +9,16 @@ use fuel_core::{ combined_database::CombinedDatabase, database::{ database_description::{ + off_chain::OffChain, on_chain::OnChain, DatabaseDescription, }, Database, }, + fuel_core_graphql_api::storage::transactions::{ + OwnedTransactions, + TransactionStatuses, + }, types::fuel_types::ContractId, }; use fuel_core_chain_config::{ @@ -34,6 +39,7 @@ use fuel_core_storage::{ ContractsRawCode, ContractsState, Messages, + Transactions, }, }; use itertools::Itertools; @@ -215,7 +221,7 @@ fn full_snapshot( prev_chain_config: Option, output_dir: &Path, encoding: Encoding, - db: CombinedDatabase, + combined_db: CombinedDatabase, ) -> Result<(), anyhow::Error> { std::fs::create_dir_all(output_dir)?; @@ -230,33 +236,39 @@ fn full_snapshot( let prev_chain_config = load_chain_config(prev_chain_config)?; writer.write_chain_config(&prev_chain_config)?; - fn write( - db: &CombinedDatabase, + fn write( + db: &Database, group_size: usize, writer: &mut SnapshotWriter, ) -> anyhow::Result<()> where - T: TableWithBlueprint::Column>, - T::Blueprint: BlueprintInspect, + T: TableWithBlueprint::Column>, + T::Blueprint: BlueprintInspect>, TableEntry: serde::Serialize, StateConfigBuilder: AddTable, + DbDesc: DatabaseDescription, { - db.on_chain() - .entries::(None, IterDirection::Forward) + db.entries::(None, IterDirection::Forward) .chunks(group_size) .into_iter() .try_for_each(|chunk| writer.write(chunk.try_collect()?)) } let group_size = encoding.group_size().unwrap_or(MAX_GROUP_SIZE); - write::(&db, group_size, &mut writer)?; - write::(&db, group_size, &mut writer)?; - write::(&db, group_size, &mut writer)?; - write::(&db, group_size, &mut writer)?; - write::(&db, group_size, &mut writer)?; - write::(&db, group_size, &mut writer)?; + let db = combined_db.on_chain(); + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; - let block = db.on_chain().latest_block()?; + let db = combined_db.off_chain(); + write::(db, group_size, &mut writer)?; + write::(db, group_size, &mut writer)?; + + let block = combined_db.on_chain().latest_block()?; writer.write_block_data(*block.header().height(), block.header().da_height)?; writer.close()?; @@ -286,7 +298,7 @@ mod tests { use std::iter::repeat_with; - use fuel_core::database::Database; + use fuel_core::fuel_core_graphql_api::storage::transactions::OwnedTransactionIndexKey; use fuel_core_chain_config::{ AddTable, AsTable, @@ -307,10 +319,6 @@ mod tests { FuelBlocks, Messages, }, - transactional::{ - IntoTransaction, - StorageTransaction, - }, ContractsAssetKey, ContractsStateKey, StorageAsMut, @@ -332,9 +340,15 @@ mod tests { }, }, fuel_tx::{ + Receipt, + TransactionBuilder, TxPointer, + UniqueIdentifier, UtxoId, }, + fuel_types::ChainId, + services::txpool::TransactionStatus, + tai64::Tai64, }; use rand::{ rngs::StdRng, @@ -349,12 +363,20 @@ mod tests { use super::*; struct DbPopulator { - db: StorageTransaction, + db: CombinedDatabase, rng: StdRng, } #[derive(Debug, PartialEq)] - struct OnChainData { + struct SnapshotData { + common: CommonData, + transactions: Vec>, + transaction_statuses: Vec>, + owned_transactions: Vec>, + } + + #[derive(Debug, PartialEq)] + struct CommonData { coins: Vec>, messages: Vec>, contract_code: Vec>, @@ -364,8 +386,8 @@ mod tests { block: TableEntry, } - impl OnChainData { - fn sorted(mut self) -> OnChainData { + impl CommonData { + fn sorted(mut self) -> CommonData { self.coins.sort_by_key(|e| e.key); self.messages.sort_by_key(|e| e.key); self.contract_code.sort_by_key(|e| e.key); @@ -374,20 +396,30 @@ mod tests { self.contract_balance.sort_by_key(|e| e.key); self } + } + + impl SnapshotData { + fn sorted(mut self) -> SnapshotData { + self.common = self.common.sorted(); + self.transactions.sort_by_key(|e| e.key); + self.transaction_statuses.sort_by_key(|e| e.key); + self.owned_transactions.sort_by_key(|e| e.key.clone()); + self + } fn into_state_config(self) -> StateConfig { let mut builder = StateConfigBuilder::default(); - builder.add(self.coins); - builder.add(self.messages); - builder.add(self.contract_code); - builder.add(self.contract_utxo); - builder.add(self.contract_state); - builder.add(self.contract_balance); - - let height = self.block.value.header().height(); + builder.add(self.common.coins); + builder.add(self.common.messages); + builder.add(self.common.contract_code); + builder.add(self.common.contract_utxo); + builder.add(self.common.contract_state); + builder.add(self.common.contract_balance); + + let height = self.common.block.value.header().height(); builder.set_block_height(*height); - let da_height = self.block.value.header().application().da_height; + let da_height = self.common.block.value.header().application().da_height; builder.set_da_block_height(da_height); builder.build().unwrap() @@ -425,47 +457,40 @@ mod tests { let reader = &mut reader; Self { - coins: read(reader), - messages: read(reader), - contract_code: read(reader), - contract_utxo: read(reader), - contract_state: read(reader), - contract_balance: read(reader), - block, + common: CommonData { + coins: read(reader), + messages: read(reader), + contract_code: read(reader), + contract_utxo: read(reader), + contract_state: read(reader), + contract_balance: read(reader), + block, + }, + transactions: read(reader), + transaction_statuses: read(reader), + owned_transactions: read(reader), } } } impl DbPopulator { fn new(db: CombinedDatabase, rng: StdRng) -> Self { - Self { - db: (db.on_chain().clone()).into_transaction(), - rng, - } + Self { db, rng } } - fn commit(self) { - self.db.commit().expect("failed to commit transaction"); - } + // Db will flush data upon being dropped. Important to do before snapshotting + fn flush(self) {} - fn given_persisted_on_chain_data( - &mut self, - coins: usize, - messages: usize, - contracts: usize, - states_per_contract: usize, - balances_per_contract: usize, - ) -> OnChainData { - let coins = repeat_with(|| self.given_coin()).take(coins).collect(); - let messages = repeat_with(|| self.given_message()) - .take(messages) - .collect(); + fn given_persisted_data(&mut self) -> SnapshotData { + let amount = 10; + let coins = repeat_with(|| self.given_coin()).take(amount).collect(); + let messages = repeat_with(|| self.given_message()).take(amount).collect(); let contract_ids = repeat_with(|| { let contract_id: ContractId = self.rng.gen(); contract_id }) - .take(contracts) + .take(amount) .collect_vec(); let contract_code = contract_ids @@ -482,7 +507,7 @@ mod tests { .iter() .flat_map(|id| { repeat_with(|| self.given_contract_state(*id)) - .take(states_per_contract) + .take(amount) .collect_vec() }) .collect(); @@ -491,23 +516,93 @@ mod tests { .iter() .flat_map(|id| { repeat_with(|| self.given_contract_asset(*id)) - .take(balances_per_contract) + .take(amount) .collect_vec() }) .collect(); + let transactions = vec![self.given_transaction()]; + + let transaction_statuses = vec![self.given_transaction_status()]; + + let owned_transactions = vec![self.given_owned_transaction()]; + let block = self.given_block(); - OnChainData { - coins, - messages, - contract_code, - contract_utxo, - contract_state, - contract_balance, - block, + + SnapshotData { + common: CommonData { + coins, + messages, + contract_code, + contract_utxo, + contract_state, + contract_balance, + block, + }, + transactions, + transaction_statuses, + owned_transactions, } } + fn given_transaction(&mut self) -> TableEntry { + let tx = TransactionBuilder::script( + self.generate_data(1000), + self.generate_data(1000), + ) + .finalize_as_transaction(); + + let id = tx.id(&ChainId::new(self.rng.gen::())); + + self.db + .on_chain_mut() + .storage_as_mut::() + .insert(&id, &tx) + .unwrap(); + + TableEntry { key: id, value: tx } + } + + fn given_transaction_status(&mut self) -> TableEntry { + let key = self.rng.gen(); + let status = TransactionStatus::Success { + block_height: self.rng.gen(), + time: Tai64(self.rng.gen::().into()), + result: None, + receipts: vec![Receipt::Return { + id: self.rng.gen(), + val: self.rng.gen(), + pc: self.rng.gen(), + is: self.rng.gen(), + }], + }; + + self.db + .off_chain_mut() + .storage_as_mut::() + .insert(&key, &status) + .unwrap(); + + TableEntry { key, value: status } + } + + fn given_owned_transaction(&mut self) -> TableEntry { + let key = OwnedTransactionIndexKey { + owner: self.rng.gen(), + block_height: self.rng.gen(), + tx_idx: self.rng.gen(), + }; + let value = self.rng.gen(); + + self.db + .off_chain_mut() + .storage_as_mut::() + .insert(&key, &value) + .unwrap(); + + TableEntry { key, value } + } + fn given_block(&mut self) -> TableEntry { let mut block = CompressedBlock::default(); let height = self.rng.gen(); @@ -515,6 +610,7 @@ mod tests { block.header_mut().set_block_height(height); let _ = self .db + .on_chain_mut() .storage_as_mut::() .insert(&height, &block); @@ -535,6 +631,7 @@ mod tests { }); let key = UtxoId::new(tx_id, output_index); self.db + .on_chain_mut() .storage_as_mut::() .insert(&key, &coin) .unwrap(); @@ -554,6 +651,7 @@ mod tests { let key = *message.nonce(); self.db + .on_chain_mut() .storage_as_mut::() .insert(&key, &message) .unwrap(); @@ -572,6 +670,7 @@ mod tests { let code = self.generate_data(1000); self.db + .on_chain_mut() .storage_as_mut::() .insert(&key, code.as_ref()) .unwrap(); @@ -591,6 +690,7 @@ mod tests { let value = ContractUtxoInfo::V1((utxo_id, tx_pointer).into()); self.db + .on_chain_mut() .storage::() .insert(&contract_id, &value) .unwrap(); @@ -609,6 +709,7 @@ mod tests { let key = ContractsStateKey::new(&contract_id, &state_key); let state_value = self.generate_data(100); self.db + .on_chain_mut() .storage_as_mut::() .insert(&key, &state_value) .unwrap(); @@ -626,6 +727,7 @@ mod tests { let key = ContractsAssetKey::new(&contract_id, &asset_id); let amount = self.rng.gen(); self.db + .on_chain_mut() .storage_as_mut::() .insert(&key, &amount) .unwrap(); @@ -649,10 +751,11 @@ mod tests { let snapshot_dir = temp_dir.path().join("snapshot"); let db_path = temp_dir.path().join("db"); - let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); + std::fs::create_dir(&db_path)?; - let state = db.given_persisted_on_chain_data(10, 10, 10, 10, 10); - db.commit(); + let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); + let state = db.given_persisted_data(); + db.flush(); // when exec(Command { @@ -668,12 +771,19 @@ mod tests { // then let snapshot = SnapshotMetadata::read(&snapshot_dir)?; - let written_data = OnChainData::read_from_snapshot(snapshot); + let written_data = SnapshotData::read_from_snapshot(snapshot); - assert_eq!(written_data.block, state.block); + assert_eq!(written_data.common.block, state.common.block); - assert_ne!(written_data, state); - assert_eq!(written_data, state.sorted()); + // Needed because of the way the test case macro works + #[allow(irrefutable_let_patterns)] + if let Encoding::Json = encoding { + assert_ne!(written_data.common, state.common); + assert_eq!(written_data.common, state.common.sorted()); + } else { + assert_ne!(written_data, state); + assert_eq!(written_data, state.sorted()); + } Ok(()) } @@ -691,8 +801,8 @@ mod tests { let db_path = temp_dir.path().join("db"); let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); - let state = db.given_persisted_on_chain_data(10, 10, 10, 10, 10); - db.commit(); + let state = db.given_persisted_data(); + db.flush(); // when exec(Command { @@ -715,7 +825,7 @@ mod tests { let mut reader = SnapshotReader::open(snapshot)?; let expected_state = state.sorted(); - assert_groups_as_expected(group_size, expected_state.coins, &mut reader); + assert_groups_as_expected(group_size, expected_state.common.coins, &mut reader); Ok(()) } @@ -729,10 +839,7 @@ mod tests { let db_path = temp_dir.path().join("db"); let mut db = DbPopulator::new(open_db(&db_path, None)?, StdRng::seed_from_u64(2)); - let original_state = db - .given_persisted_on_chain_data(10, 10, 10, 10, 10) - .sorted() - .into_state_config(); + let original_state = db.given_persisted_data().sorted().into_state_config(); let randomly_chosen_contract = original_state .contracts @@ -740,7 +847,7 @@ mod tests { .unwrap() .clone(); let contract_id = randomly_chosen_contract.contract_id; - db.commit(); + db.flush(); // when exec(Command { diff --git a/crates/chain-config/src/config/state.rs b/crates/chain-config/src/config/state.rs index 88b6aff3e4b..8b02aa627ce 100644 --- a/crates/chain-config/src/config/state.rs +++ b/crates/chain-config/src/config/state.rs @@ -17,6 +17,7 @@ use fuel_core_storage::{ ContractsRawCode, ContractsState, Messages, + Transactions, }, ContractsAssetKey, ContractsStateKey, @@ -221,33 +222,13 @@ impl AddTable for StateConfigBuilder { } } -impl AddTable for StateConfigBuilder { - fn add(&mut self, entries: Vec>) { - self.messages.extend(entries); - } -} - -impl AddTable for StateConfigBuilder { - fn add(&mut self, entries: Vec>) { - self.contract_state.extend(entries); - } -} - -impl AddTable for StateConfigBuilder { - fn add(&mut self, entries: Vec>) { - self.contract_balance.extend(entries); - } -} - -impl AddTable for StateConfigBuilder { - fn add(&mut self, entries: Vec>) { - self.contract_code.extend(entries); - } -} - -impl AddTable for StateConfigBuilder { - fn add(&mut self, entries: Vec>) { - self.contract_utxo.extend(entries); +impl AsTable for StateConfig { + fn as_table(&self) -> Vec> { + self.coins + .clone() + .into_iter() + .map(|coin| coin.into()) + .collect() } } @@ -281,16 +262,6 @@ where fn as_table(&self) -> Vec>; } -impl AsTable for StateConfig { - fn as_table(&self) -> Vec> { - self.coins - .clone() - .into_iter() - .map(|coin| coin.into()) - .collect() - } -} - impl AsTable for StateConfig { fn as_table(&self) -> Vec> { self.messages @@ -301,6 +272,12 @@ impl AsTable for StateConfig { } } +impl AddTable for StateConfigBuilder { + fn add(&mut self, entries: Vec>) { + self.messages.extend(entries); + } +} + impl AsTable for StateConfig { fn as_table(&self) -> Vec> { self.contracts @@ -319,6 +296,13 @@ impl AsTable for StateConfig { .collect() } } + +impl AddTable for StateConfigBuilder { + fn add(&mut self, entries: Vec>) { + self.contract_state.extend(entries); + } +} + impl AsTable for StateConfig { fn as_table(&self) -> Vec> { self.contracts @@ -335,6 +319,12 @@ impl AsTable for StateConfig { } } +impl AddTable for StateConfigBuilder { + fn add(&mut self, entries: Vec>) { + self.contract_balance.extend(entries); + } +} + impl AsTable for StateConfig { fn as_table(&self) -> Vec> { self.contracts @@ -346,6 +336,13 @@ impl AsTable for StateConfig { .collect() } } + +impl AddTable for StateConfigBuilder { + fn add(&mut self, entries: Vec>) { + self.contract_code.extend(entries); + } +} + impl AsTable for StateConfig { fn as_table(&self) -> Vec> { self.contracts @@ -363,6 +360,24 @@ impl AsTable for StateConfig { } } +impl AddTable for StateConfigBuilder { + fn add(&mut self, entries: Vec>) { + self.contract_utxo.extend(entries); + } +} + +impl AsTable for StateConfig { + fn as_table(&self) -> Vec> { + Vec::new() // Do not include these for now + } +} + +impl AddTable for StateConfigBuilder { + fn add(&mut self, _entries: Vec>) { + // Do not include these for now + } +} + impl StateConfig { pub fn sorted(mut self) -> Self { self.coins = self diff --git a/crates/chain-config/src/config/state/reader.rs b/crates/chain-config/src/config/state/reader.rs index dc4bf6731dd..8c10e8f98c8 100644 --- a/crates/chain-config/src/config/state/reader.rs +++ b/crates/chain-config/src/config/state/reader.rs @@ -233,7 +233,7 @@ impl SnapshotReader { use fuel_core_storage::kv_store::StorageColumn; let name = T::column().name(); let path = tables.get(name).ok_or_else(|| { - anyhow::anyhow!("table '{name}' not found snapshot metadata.") + anyhow::anyhow!("table '{name}' not found in snapshot metadata.") })?; let file = std::fs::File::open(path).with_context(|| { format!("Could not open {path:?} in order to read table '{name}'") diff --git a/crates/fuel-core/src/database.rs b/crates/fuel-core/src/database.rs index f0d11a105af..9117d4f797e 100644 --- a/crates/fuel-core/src/database.rs +++ b/crates/fuel-core/src/database.rs @@ -102,14 +102,19 @@ impl Database { .map(|(_, block)| block) .ok_or_else(|| not_found!("FuelBlocks")) } +} +impl Database +where + DbDesc: DatabaseDescription, +{ pub fn entries<'a, T>( &'a self, prefix: Option<&[u8]>, direction: IterDirection, ) -> impl Iterator>> + 'a where - T: TableWithBlueprint::Column>, + T: TableWithBlueprint::Column>, T::OwnedValue: 'a, T::OwnedKey: 'a, T::Blueprint: BlueprintInspect, @@ -141,7 +146,7 @@ where #[cfg(feature = "rocksdb")] pub fn open_rocksdb(path: &Path, capacity: impl Into>) -> Result { use anyhow::Context; - let db = RocksDb::::default_open(path, capacity.into()).map_err(Into::::into).context("Failed to open rocksdb, you may need to wipe a pre-existing incompatible db `rm -rf ~/.fuel/db`")?; + let db = RocksDb::::default_open(path, capacity.into()).map_err(Into::::into).with_context(|| format!("Failed to open rocksdb, you may need to wipe a pre-existing incompatible db e.g. `rm -rf {path:?}`"))?; Ok(Database::new(Arc::new(db))) } diff --git a/crates/fuel-core/src/database/genesis_progress.rs b/crates/fuel-core/src/database/genesis_progress.rs index 3303e775b47..d7e5269368c 100644 --- a/crates/fuel-core/src/database/genesis_progress.rs +++ b/crates/fuel-core/src/database/genesis_progress.rs @@ -1,5 +1,8 @@ +use crate::graphql_api::storage::Column as OffChainColumn; + use super::{ database_description::{ + off_chain::OffChain, on_chain::OnChain, DatabaseDescription, }, @@ -45,6 +48,14 @@ impl TableWithBlueprint for GenesisMetadata { } } +impl TableWithBlueprint for GenesisMetadata { + type Blueprint = Plain; + type Column = ::Column; + fn column() -> Self::Column { + OffChainColumn::GenesisMetadata + } +} + pub trait GenesisProgressInspect { fn genesis_progress(&self, key: &str) -> Option; } @@ -57,32 +68,33 @@ pub trait GenesisProgressMutate { ) -> Result<()>; } -impl GenesisProgressInspect for S +impl GenesisProgressInspect for S where - S: StorageInspect, Error = StorageError>, + S: StorageInspect, Error = StorageError>, + DbDesc: DatabaseDescription, { fn genesis_progress( &self, - key: & as Mappable>::Key, + key: & as Mappable>::Key, ) -> Option { Some( - StorageInspect::>::get(self, key) + StorageInspect::>::get(self, key) .ok()?? .into_owned(), ) } } -impl GenesisProgressMutate for S +impl GenesisProgressMutate for S where - S: StorageMutate, Error = StorageError>, + S: StorageMutate, Error = StorageError>, { fn update_genesis_progress( &mut self, - key: & as Mappable>::Key, + key: & as Mappable>::Key, processed_group: usize, ) -> Result<()> { - self.storage_as_mut::>() + self.storage_as_mut::>() .insert(key, &processed_group)?; Ok(()) diff --git a/crates/fuel-core/src/database/transactions.rs b/crates/fuel-core/src/database/transactions.rs index 20caa5f28b0..0d03bea0a1c 100644 --- a/crates/fuel-core/src/database/transactions.rs +++ b/crates/fuel-core/src/database/transactions.rs @@ -10,6 +10,7 @@ use crate::{ TransactionStatuses, }, }; + use fuel_core_storage::{ iter::{ IterDirection, diff --git a/crates/fuel-core/src/graphql_api/storage.rs b/crates/fuel-core/src/graphql_api/storage.rs index 1dfbc631b9d..401fbfb4a72 100644 --- a/crates/fuel-core/src/graphql_api/storage.rs +++ b/crates/fuel-core/src/graphql_api/storage.rs @@ -58,20 +58,22 @@ pub mod transactions; pub enum Column { /// The column id of metadata about the blockchain Metadata = 0, + /// Metadata for genesis progress + GenesisMetadata = 1, /// The column of the table that stores `true` if `owner` owns `Coin` with `coin_id` - OwnedCoins = 1, + OwnedCoins = 2, /// Transaction id to current status - TransactionStatus = 2, + TransactionStatus = 3, /// The column of the table of all `owner`'s transactions - TransactionsByOwnerBlockIdx = 3, + TransactionsByOwnerBlockIdx = 4, /// The column of the table that stores `true` if `owner` owns `Message` with `message_id` - OwnedMessageIds = 4, + OwnedMessageIds = 5, /// The column of the table that stores statistic about the blockchain. - Statistic = 5, + Statistic = 6, /// See [`blocks::FuelBlockIdsToHeights`] - FuelBlockIdsToHeights = 6, + FuelBlockIdsToHeights = 7, /// See [`ContractsInfo`](contracts::ContractsInfo) - ContractsInfo = 7, + ContractsInfo = 8, } impl Column { diff --git a/crates/fuel-core/src/graphql_api/storage/transactions.rs b/crates/fuel-core/src/graphql_api/storage/transactions.rs index 757f73bd940..ad3582b3ef9 100644 --- a/crates/fuel-core/src/graphql_api/storage/transactions.rs +++ b/crates/fuel-core/src/graphql_api/storage/transactions.rs @@ -1,3 +1,10 @@ +use fuel_core_chain_config::{ + AddTable, + AsTable, + StateConfig, + StateConfigBuilder, + TableEntry, +}; use fuel_core_storage::{ blueprint::plain::Plain, codec::{ @@ -42,6 +49,18 @@ impl TableWithBlueprint for OwnedTransactions { } } +impl AsTable for StateConfig { + fn as_table(&self) -> Vec> { + Vec::new() // Do not include these for now + } +} + +impl AddTable for StateConfigBuilder { + fn add(&mut self, _entries: Vec>) { + // Do not include these for now + } +} + /// The table stores the status of each transaction. pub struct TransactionStatuses; @@ -61,6 +80,18 @@ impl TableWithBlueprint for TransactionStatuses { } } +impl AsTable for StateConfig { + fn as_table(&self) -> Vec> { + Vec::new() // Do not include these for now + } +} + +impl AddTable for StateConfigBuilder { + fn add(&mut self, _entries: Vec>) { + // Do not include these for now + } +} + const TX_INDEX_SIZE: usize = size_of::(); const BLOCK_HEIGHT: usize = size_of::(); const INDEX_SIZE: usize = Address::LEN + BLOCK_HEIGHT + TX_INDEX_SIZE; @@ -84,7 +115,9 @@ fn owned_tx_index_key( pub type TransactionIndex = u16; -#[derive(Clone)] +#[derive( + Debug, Clone, PartialEq, Eq, PartialOrd, Ord, serde::Serialize, serde::Deserialize, +)] pub struct OwnedTransactionIndexKey { pub owner: Address, pub block_height: BlockHeight, diff --git a/crates/fuel-core/src/p2p_test_helpers.rs b/crates/fuel-core/src/p2p_test_helpers.rs index 27d9245e932..237229f4189 100644 --- a/crates/fuel-core/src/p2p_test_helpers.rs +++ b/crates/fuel-core/src/p2p_test_helpers.rs @@ -5,6 +5,7 @@ use crate::{ CoinConfig, CoinConfigGenerator, }, + combined_database::CombinedDatabase, database::Database, p2p::Multiaddr, service::{ @@ -398,13 +399,13 @@ pub async fn make_node(node_config: Config, test_txs: Vec) -> Node async fn extract_p2p_config(node_config: &Config) -> fuel_core_p2p::config::Config { let bootstrap_config = node_config.p2p.clone(); - let db = Database::in_memory(); + let db = CombinedDatabase::in_memory(); crate::service::genesis::execute_and_commit_genesis_block(node_config, &db) .await .unwrap(); bootstrap_config .unwrap() - .init(db.get_genesis().unwrap()) + .init(db.on_chain().get_genesis().unwrap()) .unwrap() } diff --git a/crates/fuel-core/src/service.rs b/crates/fuel-core/src/service.rs index 3b1ecd9a7cc..581c448d21c 100644 --- a/crates/fuel-core/src/service.rs +++ b/crates/fuel-core/src/service.rs @@ -2,12 +2,9 @@ use self::adapters::BlockImporterAdapter; use crate::{ combined_database::CombinedDatabase, database::Database, - service::{ - adapters::{ - P2PAdapter, - PoAAdapter, - }, - genesis::execute_genesis_block, + service::adapters::{ + P2PAdapter, + PoAAdapter, }, }; use fuel_core_poa::ports::BlockImporter; @@ -18,10 +15,7 @@ use fuel_core_services::{ State, StateWatcher, }; -use fuel_core_storage::{ - transactional::AtomicView, - IsNotFound, -}; +use fuel_core_storage::IsNotFound; use std::net::SocketAddr; use crate::service::adapters::StaticGasPrice; @@ -231,16 +225,16 @@ impl RunnableService for Task { _: &StateWatcher, _: Self::TaskParams, ) -> anyhow::Result { - let on_view = self.shared.database.on_chain().latest_view(); - let mut off_view = self.shared.database.off_chain().latest_view(); // check if chain is initialized - if let Err(err) = on_view.get_genesis() { + if let Err(err) = self.shared.database.on_chain().get_genesis() { if err.is_not_found() { - let result = execute_genesis_block(&self.shared.config, &on_view).await?; + let result = genesis::execute_genesis_block( + &self.shared.config, + &self.shared.database, + ) + .await?; self.shared.block_importer.commit_result(result).await?; - - genesis::off_chain::execute_genesis_block(&on_view, &mut off_view)?; } } diff --git a/crates/fuel-core/src/service/genesis.rs b/crates/fuel-core/src/service/genesis.rs index 207a3cefac0..5c1a9c2068c 100644 --- a/crates/fuel-core/src/service/genesis.rs +++ b/crates/fuel-core/src/service/genesis.rs @@ -1,35 +1,30 @@ -use self::workers::GenesisWorkers; use crate::{ + combined_database::CombinedDatabase, database::{ - database_description::on_chain::OnChain, + database_description::{ + off_chain::OffChain, + on_chain::OnChain, + }, genesis_progress::GenesisMetadata, - Database, }, service::config::Config, }; -use anyhow::anyhow; -use fuel_core_chain_config::{ - GenesisCommitment, - TableEntry, -}; +use fuel_core_chain_config::GenesisCommitment; use fuel_core_storage::{ iter::IteratorOverTable, tables::{ - Coins, ConsensusParametersVersions, - ContractsLatestUtxo, - ContractsRawCode, - Messages, StateTransitionBytecodeVersions, }, transactional::{ Changes, + IntoTransaction, ReadTransaction, - StorageTransaction, }, StorageAsMut, }; use fuel_core_types::{ + self, blockchain::{ block::Block, consensus::{ @@ -43,20 +38,10 @@ use fuel_core_types::{ PartialBlockHeader, StateTransitionBytecodeVersion, }, - primitives::{ - DaBlockHeight, - Empty, - }, + primitives::Empty, SealedBlock, }, - entities::{ - coins::coin::Coin, - Message, - }, - fuel_types::{ - BlockHeight, - Bytes32, - }, + fuel_types::Bytes32, services::block_importer::{ ImportResult, UncommittedResult as UncommittedImportResult, @@ -65,33 +50,39 @@ use fuel_core_types::{ use itertools::Itertools; pub mod off_chain; +pub mod on_chain; mod runner; mod workers; -pub use runner::{ - GenesisRunner, - TransactionOpener, -}; +pub use runner::GenesisRunner; /// Performs the importing of the genesis block from the snapshot. pub async fn execute_genesis_block( config: &Config, - original_database: &Database, + db: &CombinedDatabase, ) -> anyhow::Result> { - let workers = - GenesisWorkers::new(original_database.clone(), config.snapshot_reader.clone()); + on_chain::import_state(db.clone(), config.snapshot_reader.clone()).await?; + off_chain::import_state(db.clone(), config.snapshot_reader.clone()).await?; - import_chain_state(workers).await?; - let genesis_progress = fetch_genesis_progress(original_database)?; + let genesis_progress_on_chain: Vec = db + .on_chain() + .iter_all::>(None) + .map_ok(|(k, _)| k) + .try_collect()?; + let genesis_progress_off_chain: Vec = db + .off_chain() + .iter_all::>(None) + .map_ok(|(k, _)| k) + .try_collect()?; let chain_config = config.snapshot_reader.chain_config(); let genesis = Genesis { // TODO: We can get the serialized consensus parameters from the database. // https://github.com/FuelLabs/fuel-core/issues/1570 chain_config_hash: chain_config.root()?.into(), - coins_root: original_database.genesis_coins_root()?.into(), - messages_root: original_database.genesis_messages_root()?.into(), - contracts_root: original_database.genesis_contracts_root()?.into(), + coins_root: db.on_chain().genesis_coins_root()?.into(), + messages_root: db.on_chain().genesis_messages_root()?.into(), + contracts_root: db.on_chain().genesis_contracts_root()?.into(), }; let block = create_genesis_block(config); @@ -101,10 +92,18 @@ pub async fn execute_genesis_block( consensus, }; - let mut database_transaction = original_database.read_transaction(); + let mut database_transaction_off_chain = db.off_chain().clone().into_transaction(); + for key in genesis_progress_off_chain { + database_transaction_off_chain + .storage_as_mut::>() + .remove(&key)?; + } + database_transaction_off_chain.commit()?; + + let mut database_transaction_on_chain = db.on_chain().read_transaction(); // TODO: The chain config should be part of the snapshot state. // https://github.com/FuelLabs/fuel-core/issues/1570 - database_transaction + database_transaction_on_chain .storage_as_mut::() .insert( &ConsensusParametersVersion::MIN, @@ -112,55 +111,38 @@ pub async fn execute_genesis_block( )?; // TODO: The bytecode of the state transition function should be part of the snapshot state. // https://github.com/FuelLabs/fuel-core/issues/1570 - database_transaction + database_transaction_on_chain .storage_as_mut::() .insert(&ConsensusParametersVersion::MIN, &[])?; // Needs to be given the progress because `iter_all` is not implemented on db transactions. - cleanup_genesis_progress(&mut database_transaction, genesis_progress)?; + for key in genesis_progress_on_chain { + database_transaction_on_chain + .storage_as_mut::>() + .remove(&key)?; + } let result = UncommittedImportResult::new( ImportResult::new_from_local(block, vec![], vec![]), - database_transaction.into_changes(), + database_transaction_on_chain.into_changes(), ); Ok(result) } -fn fetch_genesis_progress( - original_database: &Database, -) -> Result, anyhow::Error> { - Ok(original_database - .iter_all::>(None) - .map_ok(|(k, _)| k) - .try_collect()?) -} - -async fn import_chain_state(mut workers: GenesisWorkers) -> anyhow::Result<()> { - if let Err(e) = workers.run_imports().await { - workers.shutdown(); - tokio::select! { - _ = workers.finished() => {} - _ = tokio::time::sleep(tokio::time::Duration::from_secs(10)) => { - return Err(anyhow!("Timeout while importing genesis state")); - } - }; - - return Err(e); - } - - Ok(()) -} - -fn cleanup_genesis_progress( - tx: &mut StorageTransaction<&Database>, - genesis_progress: Vec, +#[cfg(feature = "test-helpers")] +pub async fn execute_and_commit_genesis_block( + config: &Config, + db: &CombinedDatabase, ) -> anyhow::Result<()> { - for key in genesis_progress { - tx.storage_as_mut::>() - .remove(&key)?; - } - + let result = execute_genesis_block(config, db).await?; + let importer = fuel_core_importer::Importer::new( + config.block_importer.clone(), + db.on_chain().clone(), + (), + (), + ); + importer.commit_result(result).await?; Ok(()) } @@ -195,130 +177,13 @@ pub fn create_genesis_block(config: &Config) -> Block { ) } -#[cfg(feature = "test-helpers")] -pub async fn execute_and_commit_genesis_block( - config: &Config, - original_database: &Database, -) -> anyhow::Result<()> { - let result = execute_genesis_block(config, original_database).await?; - let importer = fuel_core_importer::Importer::new( - config.block_importer.clone(), - original_database.clone(), - (), - (), - ); - importer.commit_result(result).await?; - Ok(()) -} - -fn init_coin( - transaction: &mut StorageTransaction<&mut Database>, - coin: &TableEntry, - height: BlockHeight, -) -> anyhow::Result<()> { - let utxo_id = coin.key; - - let compressed_coin = Coin { - utxo_id, - owner: *coin.value.owner(), - amount: *coin.value.amount(), - asset_id: *coin.value.asset_id(), - tx_pointer: *coin.value.tx_pointer(), - } - .compress(); - - // ensure coin can't point to blocks in the future - let coin_height = coin.value.tx_pointer().block_height(); - if coin_height > height { - return Err(anyhow!( - "coin tx_pointer height ({coin_height}) cannot be greater than genesis block ({height})" - )); - } - - if transaction - .storage::() - .insert(&utxo_id, &compressed_coin)? - .is_some() - { - return Err(anyhow!("Coin should not exist")); - } - - Ok(()) -} - -fn init_contract_latest_utxo( - transaction: &mut StorageTransaction<&mut Database>, - entry: &TableEntry, - height: BlockHeight, -) -> anyhow::Result<()> { - let contract_id = entry.key; - - if entry.value.tx_pointer().block_height() > height { - return Err(anyhow!( - "contract tx_pointer cannot be greater than genesis block" - )); - } - - if transaction - .storage::() - .insert(&contract_id, &entry.value)? - .is_some() - { - return Err(anyhow!("Contract utxo should not exist")); - } - - Ok(()) -} - -fn init_contract_raw_code( - transaction: &mut StorageTransaction<&mut Database>, - entry: &TableEntry, -) -> anyhow::Result<()> { - let contract = entry.value.as_ref(); - let contract_id = entry.key; - - // insert contract code - if transaction - .storage::() - .insert(&contract_id, contract)? - .is_some() - { - return Err(anyhow!("Contract code should not exist")); - } - - Ok(()) -} - -fn init_da_message( - transaction: &mut StorageTransaction<&mut Database>, - msg: TableEntry, - da_height: DaBlockHeight, -) -> anyhow::Result<()> { - let message: Message = msg.value; - - if message.da_height() > da_height { - return Err(anyhow!( - "message da_height cannot be greater than genesis da block height" - )); - } - - if transaction - .storage::() - .insert(message.id(), &message)? - .is_some() - { - return Err(anyhow!("Message should not exist")); - } - - Ok(()) -} - #[cfg(test)] mod tests { use super::*; use crate::{ combined_database::CombinedDatabase, + database::Database, service::{ config::Config, FuelService, @@ -561,6 +426,8 @@ mod tests { #[cfg(feature = "test-helpers")] #[tokio::test] async fn tests_init_da_msgs() { + use fuel_core_storage::tables::Messages; + let mut rng = StdRng::seed_from_u64(32492); let msg = MessageConfig { @@ -583,15 +450,16 @@ mod tests { ..Config::local_node() }; - let db = Database::default(); + let db = CombinedDatabase::default(); - execute_and_commit_genesis_block(&config, &db) + super::execute_and_commit_genesis_block(&config, &db) .await .unwrap(); - let expected_msg: Message = msg.into(); + let expected_msg: fuel_core_types::entities::Message = msg.into(); let ret_msg = db + .on_chain() .storage::() .get(expected_msg.id()) .unwrap() diff --git a/crates/fuel-core/src/service/genesis/off_chain.rs b/crates/fuel-core/src/service/genesis/off_chain.rs index aa9689a9608..55afbc54ac8 100644 --- a/crates/fuel-core/src/service/genesis/off_chain.rs +++ b/crates/fuel-core/src/service/genesis/off_chain.rs @@ -1,118 +1,164 @@ +use std::borrow::Cow; + use crate::{ + combined_database::CombinedDatabase, database::{ - database_description::{ - off_chain::OffChain, - on_chain::OnChain, - }, + database_description::off_chain::OffChain, Database, }, - graphql_api::worker_service, + graphql_api::{ + storage::{ + blocks::FuelBlockIdsToHeights, + coins::OwnedCoins, + contracts::ContractsInfo, + messages::OwnedMessageIds, + transactions::{ + OwnedTransactions, + TransactionStatuses, + }, + }, + worker_service, + }, +}; +use fuel_core_chain_config::{ + SnapshotReader, + TableEntry, }; use fuel_core_storage::{ - iter::IteratorOverTable, tables::{ Coins, Messages, Transactions, }, - transactional::WriteTransaction, + transactional::StorageTransaction, + StorageAsMut, }; -use fuel_core_txpool::types::TxId; -use fuel_core_types::{ - entities::{ - coins::coin::CompressedCoin, - Message, - }, - fuel_tx::{ - Transaction, - UtxoId, +use fuel_core_types::services::executor::Event; + +use super::{ + runner::ProcessState, + workers::{ + GenesisWorkers, + Handler, }, - fuel_types::Nonce, - services::executor::Event, }; -use itertools::Itertools; -use std::borrow::Cow; -fn process_messages( - original_database: &mut Database, - messages: Vec<(Nonce, Message)>, +pub async fn import_state( + db: CombinedDatabase, + snapshot_reader: SnapshotReader, ) -> anyhow::Result<()> { - let mut database_transaction = original_database.write_transaction(); - - let message_events = messages - .into_iter() - .map(|(_, message)| Cow::Owned(Event::MessageImported(message))); + let mut workers = GenesisWorkers::new(db, snapshot_reader); + if let Err(e) = workers.run_off_chain_imports().await { + workers.shutdown(); + workers.finished().await; - worker_service::process_executor_events(message_events, &mut database_transaction)?; - - database_transaction.commit()?; + return Err(e); + } Ok(()) } -fn process_coins( - original_database: &mut Database, - coins: Vec<(UtxoId, CompressedCoin)>, -) -> anyhow::Result<()> { - let mut database_transaction = original_database.write_transaction(); - - let coin_events = coins.into_iter().map(|(utxo_id, coin)| { - let coin = coin.uncompress(utxo_id); - Cow::Owned(Event::CoinCreated(coin)) - }); - - worker_service::process_executor_events(coin_events, &mut database_transaction)?; - - database_transaction.commit()?; - Ok(()) +impl ProcessState for Handler { + type TableInSnapshot = TransactionStatuses; + type TableBeingWritten = TransactionStatuses; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + for tx_status in group { + tx.storage::() + .insert(&tx_status.key, &tx_status.value)?; + } + Ok(()) + } } -fn process_transactions( - original_database: &mut Database, - transactions: Vec<(TxId, Transaction)>, -) -> anyhow::Result<()> { - let mut database_transaction = original_database.write_transaction(); - - let transactions = transactions.iter().map(|(_, tx)| tx); - - worker_service::process_transactions(transactions, &mut database_transaction)?; - - database_transaction.commit()?; - Ok(()) +impl ProcessState for Handler { + type TableInSnapshot = FuelBlockIdsToHeights; + type TableBeingWritten = FuelBlockIdsToHeights; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + for entry in group { + tx.storage::() + .insert(&entry.key, &entry.value)?; + } + Ok(()) + } } -/// Performs the importing of the genesis block from the snapshot. -// TODO: The regenesis of the off-chain database should go in the same way as the on-chain database. -// https://github.com/FuelLabs/fuel-core/issues/1619 -pub fn execute_genesis_block( - on_chain_database: &Database, - off_chain_database: &mut Database, -) -> anyhow::Result<()> { - for chunk in on_chain_database - .iter_all::(None) - .chunks(1000) - .into_iter() - { - let chunk: Vec<_> = chunk.try_collect()?; - process_messages(off_chain_database, chunk)?; +impl ProcessState for Handler { + type TableInSnapshot = OwnedTransactions; + type TableBeingWritten = OwnedTransactions; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + for entry in group { + tx.storage::() + .insert(&entry.key, &entry.value)?; + } + Ok(()) } +} - for chunk in on_chain_database - .iter_all::(None) - .chunks(1000) - .into_iter() - { - let chunk: Vec<_> = chunk.try_collect()?; - process_coins(off_chain_database, chunk)?; +impl ProcessState for Handler { + type TableInSnapshot = Messages; + type TableBeingWritten = OwnedMessageIds; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + let events = group + .into_iter() + .map(|TableEntry { value, .. }| Cow::Owned(Event::MessageImported(value))); + worker_service::process_executor_events(events, tx)?; + Ok(()) } +} - for chunk in on_chain_database - .iter_all::(None) - .chunks(1000) - .into_iter() - { - let chunk: Vec<_> = chunk.try_collect()?; - process_transactions(off_chain_database, chunk)?; +impl ProcessState for Handler { + type TableInSnapshot = Coins; + type TableBeingWritten = OwnedCoins; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + let events = group.into_iter().map(|TableEntry { value, key }| { + Cow::Owned(Event::CoinCreated(value.uncompress(key))) + }); + worker_service::process_executor_events(events, tx)?; + Ok(()) } +} - Ok(()) +impl ProcessState for Handler { + type TableInSnapshot = Transactions; + type TableBeingWritten = ContractsInfo; + type DbDesc = OffChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + let transactions = group.iter().map(|TableEntry { value, .. }| value); + worker_service::process_transactions(transactions, tx)?; + Ok(()) + } } diff --git a/crates/fuel-core/src/service/genesis/on_chain.rs b/crates/fuel-core/src/service/genesis/on_chain.rs new file mode 100644 index 00000000000..a5ccf047deb --- /dev/null +++ b/crates/fuel-core/src/service/genesis/on_chain.rs @@ -0,0 +1,275 @@ +use super::{ + runner::ProcessState, + workers::{ + GenesisWorkers, + Handler, + }, +}; +use crate::{ + combined_database::CombinedDatabase, + database::{ + balances::BalancesInitializer, + database_description::on_chain::OnChain, + state::StateInitializer, + Database, + }, +}; +use anyhow::anyhow; +use fuel_core_chain_config::{ + SnapshotReader, + TableEntry, +}; +use fuel_core_storage::{ + tables::{ + Coins, + ContractsAssets, + ContractsLatestUtxo, + ContractsRawCode, + ContractsState, + Messages, + Transactions, + }, + transactional::StorageTransaction, + StorageAsMut, +}; +use fuel_core_types::{ + self, + blockchain::primitives::DaBlockHeight, + entities::{ + coins::coin::Coin, + Message, + }, + fuel_types::BlockHeight, +}; + +pub(crate) async fn import_state( + db: CombinedDatabase, + snapshot_reader: SnapshotReader, +) -> anyhow::Result<()> { + let mut workers = GenesisWorkers::new(db, snapshot_reader); + if let Err(e) = workers.run_on_chain_imports().await { + workers.shutdown(); + workers.finished().await; + + return Err(e); + } + + Ok(()) +} + +impl ProcessState for Handler { + type TableInSnapshot = Coins; + type TableBeingWritten = Coins; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + group.into_iter().try_for_each(|coin| { + init_coin(tx, &coin, self.block_height)?; + Ok(()) + }) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = Messages; + type TableBeingWritten = Messages; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + group + .into_iter() + .try_for_each(|message| init_da_message(tx, message, self.da_block_height)) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = ContractsRawCode; + type TableBeingWritten = ContractsRawCode; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + group.into_iter().try_for_each(|contract| { + init_contract_raw_code(tx, &contract)?; + Ok::<(), anyhow::Error>(()) + }) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = ContractsLatestUtxo; + type TableBeingWritten = ContractsLatestUtxo; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + group.into_iter().try_for_each(|contract| { + init_contract_latest_utxo(tx, &contract, self.block_height)?; + Ok::<(), anyhow::Error>(()) + }) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = ContractsState; + type TableBeingWritten = ContractsState; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + tx.update_contract_states(group)?; + Ok(()) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = ContractsAssets; + type TableBeingWritten = ContractsAssets; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + tx.update_contract_balances(group)?; + Ok(()) + } +} + +impl ProcessState for Handler { + type TableInSnapshot = Transactions; + type TableBeingWritten = Transactions; + type DbDesc = OnChain; + + fn process( + &mut self, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, + ) -> anyhow::Result<()> { + for transaction in &group { + tx.storage::() + .insert(&transaction.key, &transaction.value)?; + } + Ok(()) + } +} + +fn init_coin( + transaction: &mut StorageTransaction<&mut Database>, + coin: &TableEntry, + height: BlockHeight, +) -> anyhow::Result<()> { + let utxo_id = coin.key; + + let compressed_coin = Coin { + utxo_id, + owner: *coin.value.owner(), + amount: *coin.value.amount(), + asset_id: *coin.value.asset_id(), + tx_pointer: *coin.value.tx_pointer(), + } + .compress(); + + // ensure coin can't point to blocks in the future + let coin_height = coin.value.tx_pointer().block_height(); + if coin_height > height { + return Err(anyhow!( + "coin tx_pointer height ({coin_height}) cannot be greater than genesis block ({height})" + )); + } + + if transaction + .storage::() + .insert(&utxo_id, &compressed_coin)? + .is_some() + { + return Err(anyhow!("Coin should not exist")); + } + + Ok(()) +} + +fn init_contract_latest_utxo( + transaction: &mut StorageTransaction<&mut Database>, + entry: &TableEntry, + height: BlockHeight, +) -> anyhow::Result<()> { + let contract_id = entry.key; + + if entry.value.tx_pointer().block_height() > height { + return Err(anyhow!( + "contract tx_pointer cannot be greater than genesis block" + )); + } + + if transaction + .storage::() + .insert(&contract_id, &entry.value)? + .is_some() + { + return Err(anyhow!("Contract utxo should not exist")); + } + + Ok(()) +} + +fn init_contract_raw_code( + transaction: &mut StorageTransaction<&mut Database>, + entry: &TableEntry, +) -> anyhow::Result<()> { + let contract = entry.value.as_ref(); + let contract_id = entry.key; + + // insert contract code + if transaction + .storage::() + .insert(&contract_id, contract)? + .is_some() + { + return Err(anyhow!("Contract code should not exist")); + } + + Ok(()) +} + +fn init_da_message( + transaction: &mut StorageTransaction<&mut Database>, + msg: TableEntry, + da_height: DaBlockHeight, +) -> anyhow::Result<()> { + let message: Message = msg.value; + + if message.da_height() > da_height { + return Err(anyhow!( + "message da_height cannot be greater than genesis da block height" + )); + } + + if transaction + .storage::() + .insert(message.id(), &message)? + .is_some() + { + return Err(anyhow!("Message should not exist")); + } + + Ok(()) +} diff --git a/crates/fuel-core/src/service/genesis/runner.rs b/crates/fuel-core/src/service/genesis/runner.rs index c9a5e9ce07b..5a5744861b2 100644 --- a/crates/fuel-core/src/service/genesis/runner.rs +++ b/crates/fuel-core/src/service/genesis/runner.rs @@ -6,98 +6,126 @@ use fuel_core_storage::{ kv_store::StorageColumn, structured_storage::TableWithBlueprint, transactional::{ + Modifiable, StorageTransaction, WriteTransaction, }, + StorageAsRef, + StorageInspect, + StorageMutate, }; use std::sync::Arc; use tokio::sync::Notify; use tokio_util::sync::CancellationToken; use crate::database::{ + database_description::DatabaseDescription, genesis_progress::{ - GenesisProgressInspect, + GenesisMetadata, GenesisProgressMutate, }, Database, }; -pub trait TransactionOpener { - fn transaction(&mut self) -> StorageTransaction<&mut Database>; - - fn view_only(&self) -> &Database; -} - -impl TransactionOpener for Database { - fn transaction(&mut self) -> StorageTransaction<&mut Database> { - self.write_transaction() - } - - fn view_only(&self) -> &Database { - self - } -} - -pub struct GenesisRunner { +pub struct GenesisRunner +where + DbDesc: DatabaseDescription, +{ handler: Handler, - tx_opener: TxOpener, skip: usize, groups: Groups, finished_signal: Option>, cancel_token: CancellationToken, + db: Database, } pub trait ProcessState { - type Table: TableWithBlueprint; + type TableInSnapshot: TableWithBlueprint; + type TableBeingWritten: TableWithBlueprint; + type DbDesc: DatabaseDescription; fn process( &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, + group: Vec>, + tx: &mut StorageTransaction<&mut Database>, ) -> anyhow::Result<()>; } -impl GenesisRunner +impl GenesisRunner where - Logic: ProcessState, - GroupGenerator: IntoIterator>>>, - TxOpener: TransactionOpener, + DbDesc: DatabaseDescription, + Logic: ProcessState, + Database: StorageInspect>, { pub fn new( finished_signal: Option>, cancel_token: CancellationToken, handler: Logic, groups: GroupGenerator, - tx_opener: TxOpener, + db: Database, ) -> Self { - let skip = tx_opener - .view_only() - .genesis_progress(Logic::Table::column().name()) - // The `idx_last_handled` is zero based, so we need to add 1 to skip the already handled groups. - .map(|idx_last_handled| idx_last_handled.saturating_add(1)) - .unwrap_or_default(); + let skip = match db + .storage::>() + .get(Logic::TableBeingWritten::column().name()) + { + Ok(Some(idx_last_handled)) => { + usize::saturating_add(idx_last_handled.into_owned(), 1) + } + _ => 0, + }; + Self { handler, skip, groups, - tx_opener, finished_signal, cancel_token, + db, } } +} +impl GenesisRunner +where + DbDesc: DatabaseDescription, + Logic: ProcessState, + GroupGenerator: + IntoIterator>>>, + GenesisMetadata: TableWithBlueprint< + Column = DbDesc::Column, + Key = str, + Value = usize, + OwnedValue = usize, + >, + Database: + StorageInspect> + WriteTransaction + Modifiable, + for<'a> StorageTransaction<&'a mut Database>: + StorageMutate, Error = fuel_core_storage::Error>, +{ pub fn run(mut self) -> anyhow::Result<()> { + tracing::info!( + "Starting genesis runner. Reading: {} writing into {}", + Logic::TableInSnapshot::column().name(), + Logic::TableBeingWritten::column().name() + ); + let mut db = self.db; let result = self .groups .into_iter() .skip(self.skip) .take_while(|_| !self.cancel_token.is_cancelled()) - .try_for_each(|group| { - let mut tx = self.tx_opener.transaction(); + .try_for_each(move |group| { let group = group?; let group_num = group.index; + + let mut tx = db.write_transaction(); self.handler.process(group.data, &mut tx)?; - tx.update_genesis_progress(Logic::Table::column().name(), group_num)?; + + GenesisProgressMutate::::update_genesis_progress( + &mut tx, + Logic::TableBeingWritten::column().name(), + group_num, + )?; tx.commit()?; Ok(()) }); @@ -106,12 +134,19 @@ where finished_signal.notify_one(); } + tracing::info!( + "Finishing genesis runner. Read: {} wrote into {}", + Logic::TableInSnapshot::column().name(), + Logic::TableBeingWritten::column().name() + ); + result } } #[cfg(test)] mod tests { + use crate::database::genesis_progress::GenesisProgressInspect; use std::{ sync::{ Arc, @@ -147,7 +182,6 @@ mod tests { transactional::{ Changes, StorageTransaction, - WriteTransaction, }, Result as StorageResult, StorageAsMut, @@ -170,17 +204,13 @@ mod tests { use tokio_util::sync::CancellationToken; use crate::{ + combined_database::CombinedDatabase, database::{ - genesis_progress::{ - GenesisProgressInspect, - GenesisProgressMutate, - }, + database_description::on_chain::OnChain, + genesis_progress::GenesisProgressMutate, Database, }, - service::genesis::runner::{ - GenesisRunner, - TransactionOpener, - }, + service::genesis::runner::GenesisRunner, state::{ in_memory::memory_store::MemoryStore, TransactableStorage, @@ -209,10 +239,12 @@ mod tests { &mut StorageTransaction<&mut Database>, ) -> anyhow::Result<()>, { - type Table = Coins; + type TableInSnapshot = Coins; + type TableBeingWritten = Coins; + type DbDesc = OnChain; fn process( &mut self, - group: Vec>, + group: Vec>, tx: &mut StorageTransaction<&mut Database>, ) -> anyhow::Result<()> { group @@ -289,9 +321,13 @@ mod tests { let data = TestData::new(2); let mut called_with = vec![]; - let mut db = Database::default(); - db.update_genesis_progress(Coins::column().name(), 0) - .unwrap(); + let mut db = CombinedDatabase::default(); + GenesisProgressMutate::::update_genesis_progress( + db.on_chain_mut(), + Coins::column().name(), + 0, + ) + .unwrap(); let runner = GenesisRunner::new( Some(Arc::new(Notify::new())), @@ -301,7 +337,7 @@ mod tests { Ok(()) }), data.as_ok_groups(), - db, + db.on_chain().clone(), ); // when @@ -439,56 +475,13 @@ mod tests { runner.run().unwrap(); // then - assert_eq!(db.genesis_progress(Coins::column().name()), Some(1)); - } - - #[test] - fn genesis_progress_is_increased_in_same_transaction_as_batch_work() { - struct OnlyOneTransactionAllowed { - db: Database, - counter: usize, - } - impl TransactionOpener for OnlyOneTransactionAllowed { - fn transaction(&mut self) -> StorageTransaction<&mut Database> { - if self.counter == 0 { - self.counter += 1; - self.db.write_transaction() - } else { - panic!("Only one transaction should be opened") - } - } - - fn view_only(&self) -> &Database { - &self.db - } - } - - // given - let data = TestData::new(1); - let db = Database::default(); - let tx_opener = OnlyOneTransactionAllowed { - db: db.clone(), - counter: 0, - }; - let utxo_id = UtxoId::new(Default::default(), 0); - - let runner = GenesisRunner::new( - Some(Arc::new(Notify::new())), - CancellationToken::new(), - TestHandler::new(|_, tx| { - insert_a_coin(tx, &utxo_id); - Ok(()) - }), - data.as_ok_groups(), - tx_opener, + assert_eq!( + GenesisProgressInspect::::genesis_progress( + &db, + Coins::column().name(), + ), + Some(1) ); - - // when - runner.run().unwrap(); - - // then - assert_eq!(db.genesis_progress(Coins::column().name()), Some(0)); - assert!(db.storage_as_ref::().contains_key(&utxo_id).unwrap()); } #[tokio::test] @@ -517,11 +510,12 @@ mod tests { let runner_handle = std::thread::spawn(move || runner.run()); let data = TestData::new(4); - for group in data.as_ok_groups() { + let take = 3; + for group in data.as_ok_groups().into_iter().take(take) { tx.send(group).unwrap(); } - while read_groups.lock().unwrap().len() < 3 { + while read_groups.lock().unwrap().len() < take { std::thread::sleep(std::time::Duration::from_millis(1)); } @@ -541,7 +535,11 @@ mod tests { // group after signal is not read let read_entries = read_groups.lock().unwrap().clone(); - let inserted_groups = data.as_entries(0); + let inserted_groups = data + .as_entries(0) + .into_iter() + .take(take) + .collect::>(); assert_eq!(read_entries, inserted_groups); // finished signal is emitted diff --git a/crates/fuel-core/src/service/genesis/workers.rs b/crates/fuel-core/src/service/genesis/workers.rs index c4a13840730..c110ad6a859 100644 --- a/crates/fuel-core/src/service/genesis/workers.rs +++ b/crates/fuel-core/src/service/genesis/workers.rs @@ -1,8 +1,4 @@ use super::{ - init_coin, - init_contract_latest_utxo, - init_contract_raw_code, - init_da_message, runner::ProcessState, GenesisRunner, }; @@ -12,16 +8,26 @@ use std::{ sync::Arc, }; -use crate::database::{ - balances::BalancesInitializer, - state::StateInitializer, - Database, +use crate::{ + combined_database::CombinedDatabase, + database::database_description::{ + off_chain::OffChain, + on_chain::OnChain, + }, + graphql_api::storage::{ + coins::OwnedCoins, + contracts::ContractsInfo, + messages::OwnedMessageIds, + transactions::{ + OwnedTransactions, + TransactionStatuses, + }, + }, }; use fuel_core_chain_config::{ AsTable, SnapshotReader, StateConfig, - TableEntry, }; use fuel_core_storage::{ kv_store::StorageColumn, @@ -33,8 +39,8 @@ use fuel_core_storage::{ ContractsRawCode, ContractsState, Messages, + Transactions, }, - transactional::StorageTransaction, }; use fuel_core_types::{ blockchain::primitives::DaBlockHeight, @@ -45,7 +51,7 @@ use tokio_rayon::AsyncRayonHandle; use tokio_util::sync::CancellationToken; pub struct GenesisWorkers { - db: Database, + db: CombinedDatabase, cancel_token: CancellationToken, block_height: BlockHeight, da_block_height: DaBlockHeight, @@ -54,7 +60,7 @@ pub struct GenesisWorkers { } impl GenesisWorkers { - pub fn new(db: Database, snapshot_reader: SnapshotReader) -> Self { + pub fn new(db: CombinedDatabase, snapshot_reader: SnapshotReader) -> Self { let block_height = snapshot_reader.block_height(); let da_block_height = snapshot_reader.da_block_height(); Self { @@ -67,14 +73,29 @@ impl GenesisWorkers { } } - pub async fn run_imports(&mut self) -> anyhow::Result<()> { + pub async fn run_on_chain_imports(&mut self) -> anyhow::Result<()> { + tracing::info!("Running on-chain imports"); + tokio::try_join!( + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + self.spawn_worker_on_chain::()?, + ) + .map(|_| ()) + } + + pub async fn run_off_chain_imports(&mut self) -> anyhow::Result<()> { + tracing::info!("Running off-chain imports"); + // TODO: Should we insert a FuelBlockIdsToHeights entry for the genesis block? tokio::try_join!( - self.spawn_worker::()?, - self.spawn_worker::()?, - self.spawn_worker::()?, - self.spawn_worker::()?, - self.spawn_worker::()?, - self.spawn_worker::()?, + self.spawn_worker_off_chain::()?, + self.spawn_worker_off_chain::()?, + self.spawn_worker_off_chain::()?, + self.spawn_worker_off_chain::()?, + self.spawn_worker_off_chain::()? ) .map(|_| ()) } @@ -89,7 +110,7 @@ impl GenesisWorkers { self.cancel_token.cancel() } - pub fn spawn_worker( + pub fn spawn_worker_on_chain( &mut self, ) -> anyhow::Result>> where @@ -97,19 +118,42 @@ impl GenesisWorkers { T::OwnedKey: serde::de::DeserializeOwned + Send, T::OwnedValue: serde::de::DeserializeOwned + Send, StateConfig: AsTable, - Handler: ProcessState, + Handler: ProcessState, { let groups = self.snapshot_reader.read::()?; let finished_signal = self.get_signal(T::column().name()); - let handler = Handler::new(self.block_height, self.da_block_height); - let database = self.db.clone(); let runner = GenesisRunner::new( Some(finished_signal), self.cancel_token.clone(), - handler, + Handler::new(self.block_height, self.da_block_height), + groups, + self.db.on_chain().clone(), + ); + Ok(tokio_rayon::spawn(move || runner.run())) + } + + // TODO: serde bounds can be written shorter + pub fn spawn_worker_off_chain( + &mut self, + ) -> anyhow::Result>> + where + TableInSnapshot: TableWithBlueprint + Send + 'static, + TableInSnapshot::OwnedKey: serde::de::DeserializeOwned + Send, + TableInSnapshot::OwnedValue: serde::de::DeserializeOwned + Send, + StateConfig: AsTable, + Handler: + ProcessState, + TableBeingWritten: Send + 'static, + { + let groups = self.snapshot_reader.read::()?; + let finished_signal = self.get_signal(TableInSnapshot::column().name()); + let runner = GenesisRunner::new( + Some(finished_signal), + self.cancel_token.clone(), + Handler::::new(self.block_height, self.da_block_height), groups, - database, + self.db.off_chain().clone(), ); Ok(tokio_rayon::spawn(move || runner.run())) } @@ -124,9 +168,9 @@ impl GenesisWorkers { #[derive(Debug, Clone, Copy)] pub struct Handler { - block_height: BlockHeight, - da_block_height: DaBlockHeight, - phaton_data: PhantomData, + pub block_height: BlockHeight, + pub da_block_height: DaBlockHeight, + pub phaton_data: PhantomData, } impl Handler { @@ -138,88 +182,3 @@ impl Handler { } } } - -impl ProcessState for Handler { - type Table = Coins; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - group.into_iter().try_for_each(|coin| { - init_coin(tx, &coin, self.block_height)?; - Ok(()) - }) - } -} - -impl ProcessState for Handler { - type Table = Messages; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - group - .into_iter() - .try_for_each(|message| init_da_message(tx, message, self.da_block_height)) - } -} - -impl ProcessState for Handler { - type Table = ContractsRawCode; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - group.into_iter().try_for_each(|contract| { - init_contract_raw_code(tx, &contract)?; - Ok::<(), anyhow::Error>(()) - }) - } -} - -impl ProcessState for Handler { - type Table = ContractsLatestUtxo; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - group.into_iter().try_for_each(|contract| { - init_contract_latest_utxo(tx, &contract, self.block_height)?; - Ok::<(), anyhow::Error>(()) - }) - } -} - -impl ProcessState for Handler { - type Table = ContractsState; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - tx.update_contract_states(group)?; - Ok(()) - } -} - -impl ProcessState for Handler { - type Table = ContractsAssets; - - fn process( - &mut self, - group: Vec>, - tx: &mut StorageTransaction<&mut Database>, - ) -> anyhow::Result<()> { - tx.update_contract_balances(group)?; - Ok(()) - } -} diff --git a/crates/fuel-core/src/service/sub_services.rs b/crates/fuel-core/src/service/sub_services.rs index 255c7be1a75..32531c5d18d 100644 --- a/crates/fuel-core/src/service/sub_services.rs +++ b/crates/fuel-core/src/service/sub_services.rs @@ -1,5 +1,8 @@ #![allow(clippy::let_unit_value)] -use super::adapters::P2PAdapter; +use super::{ + adapters::P2PAdapter, + genesis::create_genesis_block, +}; use crate::{ combined_database::CombinedDatabase, database::Database, @@ -16,7 +19,6 @@ use crate::{ TxPoolAdapter, VerifierAdapter, }, - genesis::create_genesis_block, Config, SharedState, SubServices,