Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Parallellize snapshot creation #1799

Merged
merged 31 commits into from
Apr 9, 2024
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
ed3ab94
snapshot fragments for json work
segfault-magnet Mar 30, 2024
42e0763
parquet fragment support, cleanup pending
segfault-magnet Mar 30, 2024
58b04be
add tests for fragments
segfault-magnet Mar 30, 2024
f440bb9
comment out denies
segfault-magnet Mar 30, 2024
3fb1586
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Mar 31, 2024
4936ed9
snapshot generation uses concurrent workers
segfault-magnet Apr 1, 2024
3a0b898
task_manager used for import/export of snapshot
segfault-magnet Apr 1, 2024
316b1c9
cleanup imports
segfault-magnet Apr 1, 2024
35449e6
use rayon in genesis importer
segfault-magnet Apr 1, 2024
633dc22
move files around
segfault-magnet Apr 2, 2024
b0fdfdc
enable deny lints, fix errors in chain-config
segfault-magnet Apr 2, 2024
1f0b3c3
wip, investigating features
segfault-magnet Apr 2, 2024
1804a7d
feature gate imports, fix unused deps
segfault-magnet Apr 2, 2024
1209564
ci checks
segfault-magnet Apr 2, 2024
e01d204
remove unused result
segfault-magnet Apr 2, 2024
892849f
use rayon for exporter
segfault-magnet Apr 2, 2024
c8c28f7
remove uuid dep
segfault-magnet Apr 2, 2024
90785f4
dry up fragments tests
segfault-magnet Apr 2, 2024
58a4e25
deduplicate tests
segfault-magnet Apr 2, 2024
cb27956
inline path
segfault-magnet Apr 2, 2024
4171e87
dedupe writer tests
segfault-magnet Apr 2, 2024
9d09510
restructure into import/export format
segfault-magnet Apr 2, 2024
d84bd72
format and cargo sort
segfault-magnet Apr 2, 2024
ebe836b
update change log
segfault-magnet Apr 2, 2024
6060bad
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 2, 2024
47e5432
entries filter
segfault-magnet Apr 4, 2024
3c1ee01
optimize
segfault-magnet Apr 4, 2024
b9c3906
shorten bounds
segfault-magnet Apr 5, 2024
992bcc0
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 5, 2024
416da94
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 5, 2024
e073268
Merge branch 'master' into feature/parallel_snapshot_writing
segfault-magnet Apr 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ Description of the upcoming release here.

### Added

- [#1799](https://github.com/FuelLabs/fuel-core/pull/1799) Snapshot creation is now concurrent.
- [#1786](https://github.com/FuelLabs/fuel-core/pull/1786): Regenesis now includes off-chain tables.
- [#1716](https://github.com/FuelLabs/fuel-core/pull/1716): Added support of WASM state transition along with upgradable execution that works with native(std) and WASM(non-std) executors. The `fuel-core` now requires a `wasm32-unknown-unknown` target to build.
- [#1770](https://github.com/FuelLabs/fuel-core/pull/1770): Add the new L1 event type for forced transactions.
Expand Down
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ parking_lot = "0.12"
tokio = { version = "1.27", default-features = false }
tokio-rayon = "2.1.0"
tokio-stream = "0.1"
tokio-util = { version = "0.7", default-features = false }
tracing = "0.1"
thiserror = "1.0"
futures = "0.3"
Expand Down
14 changes: 5 additions & 9 deletions bin/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,11 @@ dirs = "4.0"
dotenvy = { version = "0.15", optional = true }
fuel-core = { workspace = true }
fuel-core-chain-config = { workspace = true }
fuel-core-storage = { workspace = true, optional = true }
fuel-core-types = { workspace = true }
hex = "0.4"
humantime = "2.1"
itertools = { workspace = true, optional = true }
pyroscope = "0.5"
pyroscope_pprofrs = "0.2"
serde = { workspace = true }
serde_json = { workspace = true }
tikv-jemallocator = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
Expand All @@ -44,9 +41,12 @@ url = { version = "2.2", optional = true }

[dev-dependencies]
fuel-core = { workspace = true, features = ["test-helpers"] }
fuel-core-storage = { workspace = true }
fuel-core-types = { workspace = true, features = ["test-helpers"] }
itertools = { workspace = true }
pretty_assertions = { workspace = true }
rand = { workspace = true }
serde = { workspace = true }
tempfile = { workspace = true }
test-case = { workspace = true }

Expand All @@ -56,11 +56,7 @@ env = ["dep:dotenvy"]
p2p = ["fuel-core/p2p", "const_format"]
relayer = ["fuel-core/relayer", "dep:url"]
parquet = ["fuel-core-chain-config/parquet", "fuel-core-types/serde"]
rocksdb = ["fuel-core/rocksdb", "dep:fuel-core-storage", "dep:itertools"]
rocksdb-production = [
"fuel-core/rocksdb-production",
"dep:fuel-core-storage",
"dep:itertools",
]
rocksdb = ["fuel-core/rocksdb"]
rocksdb-production = ["fuel-core/rocksdb-production"]
# features to enable in production, but increase build times
production = ["env", "relayer", "rocksdb-production", "p2p", "parquet"]
2 changes: 1 addition & 1 deletion bin/fuel-core/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ pub async fn run_cli() -> anyhow::Result<()> {
Ok(opt) => match opt.command {
Fuel::Run(command) => run::exec(command).await,
#[cfg(any(feature = "rocksdb", feature = "rocksdb-production"))]
Fuel::Snapshot(command) => snapshot::exec(command),
Fuel::Snapshot(command) => snapshot::exec(command).await,
Fuel::GenerateFeeContract(command) => fee_contract::exec(command).await,
},
Err(e) => {
Expand Down
224 changes: 62 additions & 162 deletions bin/fuel-core/src/cli/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,18 @@ use clap::{
Subcommand,
};
use fuel_core::{
chain_config::ChainConfig,
combined_database::CombinedDatabase,
database::{
database_description::{
off_chain::OffChain,
on_chain::OnChain,
DatabaseDescription,
},
Database,
},
fuel_core_graphql_api::storage::transactions::{
OwnedTransactions,
TransactionStatuses,
},
types::fuel_types::ContractId,
};
use fuel_core_chain_config::{
AddTable,
SnapshotWriter,
StateConfigBuilder,
TableEntry,
MAX_GROUP_SIZE,
};
use fuel_core_storage::{
blueprint::BlueprintInspect,
iter::IterDirection,
structured_storage::TableWithBlueprint,
tables::{
Coins,
ContractsAssets,
ContractsLatestUtxo,
ContractsRawCode,
ContractsState,
Messages,
Transactions,
},
};
use itertools::Itertools;
use fuel_core_chain_config::ChainConfig;

use std::path::{
Path,
PathBuf,
};

use super::local_testnet_chain_config;

/// Print a snapshot of blockchain state to stdout.
#[derive(Debug, Clone, Parser)]
pub struct Command {
Expand Down Expand Up @@ -144,7 +113,13 @@ pub enum SubCommands {
}

#[cfg(any(feature = "rocksdb", feature = "rocksdb-production"))]
pub fn exec(command: Command) -> anyhow::Result<()> {
pub async fn exec(command: Command) -> anyhow::Result<()> {
use fuel_core::service::genesis::Exporter;
use fuel_core_chain_config::{
SnapshotWriter,
MAX_GROUP_SIZE,
};

let db = open_db(
&command.database_path,
Some(command.max_database_cache_size),
Expand All @@ -160,131 +135,38 @@ pub fn exec(command: Command) -> anyhow::Result<()> {
let encoding = encoding_command
.map(|f| f.encoding())
.unwrap_or_else(|| Encoding::Json);
full_snapshot(chain_config, &output_dir, encoding, db)

let group_size = encoding.group_size().unwrap_or(MAX_GROUP_SIZE);
let writer = move || match encoding {
Encoding::Json => Ok(SnapshotWriter::json(output_dir.clone())),
#[cfg(feature = "parquet")]
Encoding::Parquet { compression, .. } => {
SnapshotWriter::parquet(output_dir.clone(), compression.try_into()?)
}
};
Exporter::new(
db,
load_chain_config_or_use_testnet(chain_config.as_deref())?,
writer,
group_size,
)
.write_full_snapshot()
.await
}
SubCommands::Contract { contract_id } => {
contract_snapshot(db, contract_id, &output_dir)
let writer = move || Ok(SnapshotWriter::json(output_dir.clone()));
Exporter::new(db, local_testnet_chain_config(), writer, MAX_GROUP_SIZE)
.write_contract_snapshot(contract_id)
}
}
}

fn contract_snapshot(
db: CombinedDatabase,
contract_id: ContractId,
output_dir: &Path,
) -> Result<(), anyhow::Error> {
std::fs::create_dir_all(output_dir)?;

let code = db
.on_chain()
.entries::<ContractsRawCode>(Some(contract_id.as_ref()), IterDirection::Forward)
.next()
.ok_or_else(|| {
anyhow::anyhow!("contract code not found! id: {:?}", contract_id)
})??;

let utxo = db
.on_chain()
.entries::<ContractsLatestUtxo>(
Some(contract_id.as_ref()),
IterDirection::Forward,
)
.next()
.ok_or_else(|| {
anyhow::anyhow!("contract utxo not found! id: {:?}", contract_id)
})??;

let state = db
.on_chain()
.entries::<ContractsState>(Some(contract_id.as_ref()), IterDirection::Forward)
.try_collect()?;

let balance = db
.on_chain()
.entries::<ContractsAssets>(Some(contract_id.as_ref()), IterDirection::Forward)
.try_collect()?;

let block = db.on_chain().latest_block()?;
let mut writer = SnapshotWriter::json(output_dir);

writer.write(vec![code])?;
writer.write(vec![utxo])?;
writer.write(state)?;
writer.write(balance)?;
writer.write_block_data(*block.header().height(), block.header().da_height)?;
writer.write_chain_config(&crate::cli::local_testnet_chain_config())?;
writer.close()?;
Ok(())
}

fn full_snapshot(
prev_chain_config: Option<PathBuf>,
output_dir: &Path,
encoding: Encoding,
combined_db: CombinedDatabase,
) -> Result<(), anyhow::Error> {
std::fs::create_dir_all(output_dir)?;

let mut writer = match encoding {
Encoding::Json => SnapshotWriter::json(output_dir),
#[cfg(feature = "parquet")]
Encoding::Parquet { compression, .. } => {
SnapshotWriter::parquet(output_dir, compression.try_into()?)?
}
};

let prev_chain_config = load_chain_config(prev_chain_config)?;
writer.write_chain_config(&prev_chain_config)?;

fn write<T, DbDesc>(
db: &Database<DbDesc>,
group_size: usize,
writer: &mut SnapshotWriter,
) -> anyhow::Result<()>
where
T: TableWithBlueprint<Column = <DbDesc as DatabaseDescription>::Column>,
T::Blueprint: BlueprintInspect<T, Database<DbDesc>>,
TableEntry<T>: serde::Serialize,
StateConfigBuilder: AddTable<T>,
DbDesc: DatabaseDescription,
{
db.entries::<T>(None, IterDirection::Forward)
.chunks(group_size)
.into_iter()
.try_for_each(|chunk| writer.write(chunk.try_collect()?))
fn load_chain_config_or_use_testnet(path: Option<&Path>) -> anyhow::Result<ChainConfig> {
if let Some(path) = path {
ChainConfig::load(path)
} else {
Ok(local_testnet_chain_config())
}
let group_size = encoding.group_size().unwrap_or(MAX_GROUP_SIZE);

let db = combined_db.on_chain();
write::<Coins, OnChain>(db, group_size, &mut writer)?;
write::<Messages, OnChain>(db, group_size, &mut writer)?;
write::<ContractsRawCode, OnChain>(db, group_size, &mut writer)?;
write::<ContractsLatestUtxo, OnChain>(db, group_size, &mut writer)?;
write::<ContractsState, OnChain>(db, group_size, &mut writer)?;
write::<ContractsAssets, OnChain>(db, group_size, &mut writer)?;
write::<Transactions, OnChain>(db, group_size, &mut writer)?;

let db = combined_db.off_chain();
write::<TransactionStatuses, OffChain>(db, group_size, &mut writer)?;
write::<OwnedTransactions, OffChain>(db, group_size, &mut writer)?;

let block = combined_db.on_chain().latest_block()?;
writer.write_block_data(*block.header().height(), block.header().da_height)?;

writer.close()?;

Ok(())
}

fn load_chain_config(
chain_config: Option<PathBuf>,
) -> Result<ChainConfig, anyhow::Error> {
let chain_config = match chain_config {
Some(file) => ChainConfig::load(file)?,
None => crate::cli::local_testnet_chain_config(),
};

Ok(chain_config)
}

fn open_db(path: &Path, capacity: Option<usize>) -> anyhow::Result<CombinedDatabase> {
Expand All @@ -298,7 +180,11 @@ mod tests {

use std::iter::repeat_with;

use fuel_core::fuel_core_graphql_api::storage::transactions::OwnedTransactionIndexKey;
use fuel_core::fuel_core_graphql_api::storage::transactions::{
OwnedTransactionIndexKey,
OwnedTransactions,
TransactionStatuses,
};
use fuel_core_chain_config::{
AddTable,
AsTable,
Expand All @@ -318,6 +204,7 @@ mod tests {
ContractsState,
FuelBlocks,
Messages,
Transactions,
},
ContractsAssetKey,
ContractsStateKey,
Expand Down Expand Up @@ -350,6 +237,7 @@ mod tests {
services::txpool::TransactionStatus,
tai64::Tai64,
};
use itertools::Itertools;
use rand::{
rngs::StdRng,
seq::SliceRandom,
Expand Down Expand Up @@ -758,15 +646,21 @@ mod tests {
db.flush();

// when
exec(Command {
let fut = exec(Command {
database_path: db_path,
max_database_cache_size: DEFAULT_DATABASE_CACHE_SIZE,
output_dir: snapshot_dir.clone(),
subcommand: SubCommands::Everything {
chain_config: None,
encoding_command: Some(EncodingCommand::Encoding { encoding }),
},
})?;
});

// Because the test_case macro doesn't work with async tests
tokio::runtime::Runtime::new()
.unwrap()
.block_on(fut)
.unwrap();

// then
let snapshot = SnapshotMetadata::read(&snapshot_dir)?;
Expand Down Expand Up @@ -805,7 +699,7 @@ mod tests {
db.flush();

// when
exec(Command {
let fut = exec(Command {
database_path: db_path,
output_dir: snapshot_dir.clone(),
max_database_cache_size: DEFAULT_DATABASE_CACHE_SIZE,
Expand All @@ -818,7 +712,12 @@ mod tests {
},
}),
},
})?;
});

tokio::runtime::Runtime::new()
.unwrap()
.block_on(fut)
.unwrap();

// then
let snapshot = SnapshotMetadata::read(&snapshot_dir)?;
Expand All @@ -830,8 +729,8 @@ mod tests {
Ok(())
}

#[test]
fn contract_snapshot_isolates_contract_correctly() -> anyhow::Result<()> {
#[tokio::test]
async fn contract_snapshot_isolates_contract_correctly() -> anyhow::Result<()> {
// given
let temp_dir = tempfile::tempdir()?;
let snapshot_dir = temp_dir.path().join("snapshot");
Expand All @@ -855,7 +754,8 @@ mod tests {
output_dir: snapshot_dir.clone(),
max_database_cache_size: DEFAULT_DATABASE_CACHE_SIZE,
subcommand: SubCommands::Contract { contract_id },
})?;
})
.await?;

// then
let metadata = SnapshotMetadata::read(&snapshot_dir)?;
Expand Down
Loading
Loading