diff --git a/CHANGELOG.md b/CHANGELOG.md index 86583018f833..3fdf819e0968 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,8 @@ ### Fixed +- [#4959](https://github.com/ChainSafe/forest/pull/4959) Re-enable garbage + collection after implementing a "persistent" storage for manifests. - [#4988](https://github.com/ChainSafe/forest/pull/4988) Fix the `logs` member in `EthTxReceipt` that was initialized with a default value. diff --git a/Cargo.lock b/Cargo.lock index 07bf68bd6ebc..84da01f56d91 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3103,7 +3103,7 @@ checksum = "f81ec6369c545a7d40e4589b5597581fa1c441fe1cce96dd1de43159910a36a2" [[package]] name = "forest-filecoin" -version = "0.22.0" +version = "0.22.1" dependencies = [ "ahash", "anes 0.2.0", diff --git a/Cargo.toml b/Cargo.toml index c894bf1f08d2..3b88b3847a93 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "forest-filecoin" -version = "0.22.0" +version = "0.22.1" authors = ["ChainSafe Systems "] repository = "https://github.com/ChainSafe/forest" edition = "2021" diff --git a/src/daemon/bundle.rs b/src/daemon/bundle.rs index f04ff3a799ce..58dc9f0fad9b 100644 --- a/src/daemon/bundle.rs +++ b/src/daemon/bundle.rs @@ -1,13 +1,11 @@ // Copyright 2019-2024 ChainSafe Systems // SPDX-License-Identifier: Apache-2.0, MIT +use crate::db::PersistentStore; use crate::{ networks::{ActorBundleInfo, NetworkChain, ACTOR_BUNDLES}, utils::{ - db::{ - car_stream::{CarBlock, CarStream}, - car_util::load_car, - }, + db::car_stream::{CarBlock, CarStream}, net::http_get, }, }; @@ -15,15 +13,15 @@ use ahash::HashSet; use anyhow::ensure; use cid::Cid; use futures::{stream::FuturesUnordered, TryStreamExt}; -use fvm_ipld_blockstore::Blockstore; use std::mem::discriminant; use std::{io::Cursor, path::Path}; +use tokio::io::BufReader; use tracing::{info, warn}; /// Tries to load the missing actor bundles to the blockstore. If the bundle is /// not present, it will be downloaded. pub async fn load_actor_bundles( - db: &impl Blockstore, + db: &impl PersistentStore, network: &NetworkChain, ) -> anyhow::Result<()> { if let Some(bundle_path) = match std::env::var("FOREST_ACTOR_BUNDLE_PATH") { @@ -40,7 +38,7 @@ pub async fn load_actor_bundles( } pub async fn load_actor_bundles_from_path( - db: &impl Blockstore, + db: &impl PersistentStore, network: &NetworkChain, bundle_path: impl AsRef, ) -> anyhow::Result<()> { @@ -70,7 +68,7 @@ pub async fn load_actor_bundles_from_path( // Load into DB while let Some(CarBlock { cid, data }) = car_stream.try_next().await? { - db.put_keyed(&cid, &data)?; + db.put_keyed_persistent(&cid, &data)?; } Ok(()) @@ -78,7 +76,7 @@ pub async fn load_actor_bundles_from_path( /// Loads the missing actor bundle, returns the CIDs of the loaded bundles. pub async fn load_actor_bundles_from_server( - db: &impl Blockstore, + db: &impl PersistentStore, network: &NetworkChain, bundles: &[ActorBundleInfo], ) -> anyhow::Result> { @@ -106,7 +104,12 @@ pub async fn load_actor_bundles_from_server( http_get(alt_url).await? }; let bytes = response.bytes().await?; - let header = load_car(db, Cursor::new(bytes)).await?; + + let mut stream = CarStream::new(BufReader::new(Cursor::new(bytes))).await?; + while let Some(block) = stream.try_next().await? { + db.put_keyed_persistent(&block.cid, &block.data)?; + } + let header = stream.header; ensure!(header.roots.len() == 1); ensure!(header.roots.first() == root); Ok(*header.roots.first()) diff --git a/src/daemon/mod.rs b/src/daemon/mod.rs index e752b1c47d08..f0e6e31f77e5 100644 --- a/src/daemon/mod.rs +++ b/src/daemon/mod.rs @@ -246,11 +246,7 @@ pub(super) async fn start( genesis_header.clone(), )?); - // Network Upgrade manifests are stored in the blockstore but may not be - // garbage collected. Until this is fixed, the GC has to be disabled. - // Tracking issue: https://github.com/ChainSafe/forest/issues/4926 - // if !opts.no_gc { - if false { + if !opts.no_gc { let mut db_garbage_collector = { let chain_store = chain_store.clone(); let depth = cmp::max( diff --git a/src/db/car/any.rs b/src/db/car/any.rs index d4a9eb422991..75a20d3aacd6 100644 --- a/src/db/car/any.rs +++ b/src/db/car/any.rs @@ -10,6 +10,7 @@ use super::{CacheKey, RandomAccessFileReader, ZstdFrameCache}; use crate::blocks::Tipset; +use crate::db::PersistentStore; use crate::utils::io::EitherMmapOrRandomAccessFile; use cid::Cid; use fvm_ipld_blockstore::Blockstore; @@ -124,6 +125,19 @@ where } } +impl PersistentStore for AnyCar +where + ReaderT: ReadAt, +{ + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + match self { + AnyCar::Forest(forest) => forest.put_keyed_persistent(k, block), + AnyCar::Plain(plain) => plain.put_keyed_persistent(k, block), + AnyCar::Memory(mem) => mem.put_keyed_persistent(k, block), + } + } +} + impl From> for AnyCar { fn from(car: super::ForestCar) -> Self { Self::Forest(car) diff --git a/src/db/car/forest.rs b/src/db/car/forest.rs index bb5f2829f287..30468b98a2c1 100644 --- a/src/db/car/forest.rs +++ b/src/db/car/forest.rs @@ -50,6 +50,7 @@ use super::{CacheKey, ZstdFrameCache}; use crate::blocks::{Tipset, TipsetKey}; use crate::db::car::plain::write_skip_frame_header_async; use crate::db::car::RandomAccessFileReader; +use crate::db::PersistentStore; use crate::utils::db::car_stream::{CarBlock, CarHeader}; use crate::utils::encoding::from_slice_with_fallback; use crate::utils::io::EitherMmapOrRandomAccessFile; @@ -73,6 +74,7 @@ use std::{ use tokio::io::{AsyncWrite, AsyncWriteExt}; use tokio_util::codec::{Decoder, Encoder as _}; use unsigned_varint::codec::UviBytes; + #[cfg(feature = "benchmark-private")] pub mod index; #[cfg(not(feature = "benchmark-private"))] @@ -237,6 +239,15 @@ where } } +impl PersistentStore for ForestCar +where + ReaderT: ReadAt, +{ + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.put_keyed(k, block) + } +} + fn decode_zstd_single_frame(reader: ReaderT) -> io::Result { let mut zstd_frame = vec![]; zstd::Decoder::new(reader)? diff --git a/src/db/car/many.rs b/src/db/car/many.rs index 4f279440564e..9820fed4c580 100644 --- a/src/db/car/many.rs +++ b/src/db/car/many.rs @@ -9,7 +9,7 @@ //! A single z-frame cache is shared between all read-only stores. use super::{AnyCar, ZstdFrameCache}; -use crate::db::{EthMappingsStore, MemoryDB, SettingsStore}; +use crate::db::{EthMappingsStore, MemoryDB, PersistentStore, SettingsStore}; use crate::libp2p_bitswap::BitswapStoreReadWrite; use crate::rpc::eth::types::EthHash; use crate::shim::clock::ChainEpoch; @@ -169,6 +169,12 @@ impl Blockstore for ManyCar { } } +impl PersistentStore for ManyCar { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.writer.put_keyed_persistent(k, block) + } +} + impl BitswapStoreRead for ManyCar { fn contains(&self, cid: &Cid) -> anyhow::Result { Blockstore::has(self, cid) diff --git a/src/db/car/plain.rs b/src/db/car/plain.rs index 8c7b6c54e11a..df8a162281dc 100644 --- a/src/db/car/plain.rs +++ b/src/db/car/plain.rs @@ -71,6 +71,7 @@ use cid::Cid; use fvm_ipld_blockstore::Blockstore; use integer_encoding::VarIntReader; +use crate::db::PersistentStore; use nunny::Vec as NonEmpty; use parking_lot::RwLock; use positioned_io::ReadAt; @@ -233,6 +234,15 @@ where } } +impl PersistentStore for PlainCar +where + ReaderT: ReadAt, +{ + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.put_keyed(k, block) + } +} + pub async fn write_skip_frame_header_async( mut writer: impl AsyncWrite + Unpin, data_len: u32, diff --git a/src/db/gc/mod.rs b/src/db/gc/mod.rs index 150e08cfd73a..9baabe03407d 100644 --- a/src/db/gc/mod.rs +++ b/src/db/gc/mod.rs @@ -238,7 +238,7 @@ mod test { use crate::blocks::{CachingBlockHeader, Tipset}; use crate::chain::{ChainEpochDelta, ChainStore}; - use crate::db::{GarbageCollectable, MarkAndSweep, MemoryDB}; + use crate::db::{GarbageCollectable, MarkAndSweep, MemoryDB, PersistentStore}; use crate::message_pool::test_provider::{mock_block, mock_block_with_parents}; use crate::networks::ChainConfig; @@ -247,7 +247,11 @@ mod test { use core::time::Duration; use crate::shim::clock::ChainEpoch; + use cid::multihash::Code::Identity; + use cid::multihash::MultihashDigest; + use cid::Cid; use fvm_ipld_blockstore::Blockstore; + use fvm_ipld_encoding::DAG_CBOR; use std::sync::Arc; const ZERO_DURATION: Duration = Duration::from_secs(0); @@ -473,4 +477,43 @@ mod test { current_epoch + 1 + depth * 2 ); } + + #[tokio::test] + async fn persistent_data_resilient_to_gc() { + let depth = 5 as ChainEpochDelta; + let current_epoch = 0 as ChainEpochDelta; + + let tester = GCTester::new(); + let mut gc = MarkAndSweep::new( + tester.db.clone(), + tester.get_heaviest_tipset_fn(), + depth, + ZERO_DURATION, + ); + + let depth = depth as ChainEpochDelta; + let current_epoch = current_epoch as ChainEpochDelta; + + let persistent_data = [1, 55]; + let persistent_cid = Cid::new_v1(DAG_CBOR, Identity.digest(&persistent_data)); + + // Make sure we run enough epochs to initiate GC. + tester.run_epochs(current_epoch); + tester.run_epochs(depth); + tester + .db + .put_keyed_persistent(&persistent_cid, &persistent_data) + .unwrap(); + // Mark. + gc.gc_workflow(ZERO_DURATION).await.unwrap(); + tester.run_epochs(depth); + // Sweep. + gc.gc_workflow(ZERO_DURATION).await.unwrap(); + + // Make sure persistent data stays. + assert_eq!( + tester.db.get(&persistent_cid).unwrap(), + Some(persistent_data.to_vec()) + ); + } } diff --git a/src/db/memory.rs b/src/db/memory.rs index 95b371a1af06..62194952ea45 100644 --- a/src/db/memory.rs +++ b/src/db/memory.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0, MIT use crate::cid_collections::CidHashSet; -use crate::db::GarbageCollectable; +use crate::db::{GarbageCollectable, PersistentStore}; use crate::libp2p_bitswap::{BitswapStoreRead, BitswapStoreReadWrite}; use crate::rpc::eth::types::EthHash; use ahash::HashMap; @@ -16,6 +16,7 @@ use super::{EthMappingsStore, SettingsStore}; #[derive(Debug, Default)] pub struct MemoryDB { blockchain_db: RwLock, Vec>>, + blockchain_persistent_db: RwLock, Vec>>, settings_db: RwLock>>, eth_mappings_db: RwLock>>, } @@ -109,7 +110,16 @@ impl EthMappingsStore for MemoryDB { impl Blockstore for MemoryDB { fn get(&self, k: &Cid) -> anyhow::Result>> { - Ok(self.blockchain_db.read().get(&k.to_bytes()).cloned()) + Ok(self + .blockchain_db + .read() + .get(&k.to_bytes()) + .cloned() + .or(self + .blockchain_persistent_db + .read() + .get(&k.to_bytes()) + .cloned())) } fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { @@ -120,6 +130,15 @@ impl Blockstore for MemoryDB { } } +impl PersistentStore for MemoryDB { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.blockchain_persistent_db + .write() + .insert(k.to_bytes(), block.to_vec()); + Ok(()) + } +} + impl BitswapStoreRead for MemoryDB { fn contains(&self, cid: &Cid) -> anyhow::Result { Ok(self.blockchain_db.read().contains_key(&cid.to_bytes())) diff --git a/src/db/migration/migration_map.rs b/src/db/migration/migration_map.rs index 9e2f1de63874..9493850ef569 100644 --- a/src/db/migration/migration_map.rs +++ b/src/db/migration/migration_map.rs @@ -9,6 +9,7 @@ use std::{ use crate::db::migration::v0_16_0::Migration0_15_2_0_16_0; use crate::db::migration::v0_19_0::Migration0_18_0_0_19_0; +use crate::db::migration::v0_22_1::Migration0_22_0_0_22_1; use crate::Config; use anyhow::bail; use anyhow::Context as _; @@ -80,6 +81,7 @@ create_migrations!( "0.12.1" -> "0.13.0" @ Migration0_12_1_0_13_0, "0.15.2" -> "0.16.0" @ Migration0_15_2_0_16_0, "0.18.0" -> "0.19.0" @ Migration0_18_0_0_19_0, + "0.22.0" -> "0.22.1" @ Migration0_22_0_0_22_1, ); pub struct Migration { diff --git a/src/db/migration/mod.rs b/src/db/migration/mod.rs index f0e9f390f06d..a9c85af22314 100644 --- a/src/db/migration/mod.rs +++ b/src/db/migration/mod.rs @@ -6,6 +6,7 @@ mod migration_map; mod v0_12_1; mod v0_16_0; mod v0_19_0; +mod v0_22_1; mod void_migration; pub use db_migration::DbMigration; diff --git a/src/db/migration/v0_16_0.rs b/src/db/migration/v0_16_0.rs index 71204629dea4..9cd2aca0f9ee 100644 --- a/src/db/migration/v0_16_0.rs +++ b/src/db/migration/v0_16_0.rs @@ -210,7 +210,7 @@ mod paritydb_0_15_1 { // if it changes and then this migration should either be maintained or removed. pub(super) fn open(path: impl Into) -> anyhow::Result { let opts = Self::to_options(path.into()); - let db = db::parity_db::ParityDb::wrap(Db::open_or_create(&opts)?, false); + let db = db::parity_db::ParityDb::wrap(Db::open_or_create(&opts)?, false, false); Ok(db) } } diff --git a/src/db/migration/v0_19_0.rs b/src/db/migration/v0_19_0.rs index 7f747312c340..032dec2d5a39 100644 --- a/src/db/migration/v0_19_0.rs +++ b/src/db/migration/v0_19_0.rs @@ -9,14 +9,14 @@ use crate::chain::ChainStore; use crate::cli_shared::chain_path; use crate::daemon::db_util::{load_all_forest_cars, populate_eth_mappings}; use crate::db::car::ManyCar; -use crate::db::db_engine::{open_db, Db}; +use crate::db::db_engine::Db; use crate::db::migration::migration_map::temporary_db_name; use crate::db::migration::v0_19_0::paritydb_0_18_0::{DbColumn, ParityDb}; use crate::db::CAR_DB_DIR_NAME; use crate::genesis::read_genesis_header; use crate::networks::ChainConfig; use crate::state_manager::StateManager; -use crate::Config; +use crate::{db, Config}; use anyhow::Context; use cid::multihash::Code::Blake2b256; use cid::multihash::MultihashDigest; @@ -147,9 +147,16 @@ impl MigrationOperation for Migration0_18_0_0_19_0 { } async fn create_state_manager_and_populate(config: Config, db_name: String) -> anyhow::Result<()> { + use db::parity_db::ParityDb as ParityDbCurrent; + let chain_data_path = chain_path(&config); let db_root_dir = chain_data_path.join(db_name); - let db_writer = Arc::new(open_db(db_root_dir.clone(), config.db_config().clone())?); + let db = ParityDbCurrent::wrap( + paritydb_0_19_0::ParityDb::open(db_root_dir.clone())?.db, + false, + true, + ); + let db_writer = Arc::new(db); let db = Arc::new(ManyCar::new(db_writer.clone())); let forest_car_db_dir = db_root_dir.join(CAR_DB_DIR_NAME); load_all_forest_cars(&db, &forest_car_db_dir)?; diff --git a/src/db/migration/v0_22_1.rs b/src/db/migration/v0_22_1.rs new file mode 100644 index 000000000000..2591d0f72e05 --- /dev/null +++ b/src/db/migration/v0_22_1.rs @@ -0,0 +1,330 @@ +// Copyright 2019-2024 ChainSafe Systems +// SPDX-License-Identifier: Apache-2.0, MIT + +//! Migration logic for 0.22.0 to 0.22.1 version. +//! A `PersistentGraph` column has been introduced to allow for storage of persistent data that +//! isn't garbage collected. The initial use-case is network upgrade manifest storage. + +use crate::blocks::TipsetKey; +use crate::db::db_engine::Db; +use crate::db::migration::migration_map::temporary_db_name; +use crate::db::migration::v0_22_1::paritydb_0_22_0::{DbColumn, ParityDb}; +use crate::db::CAR_DB_DIR_NAME; +use crate::rpc::eth::types::EthHash; +use crate::Config; +use anyhow::Context; +use cid::multihash::Code::Blake2b256; +use cid::multihash::MultihashDigest; +use cid::Cid; +use fs_extra::dir::CopyOptions; +use fvm_ipld_encoding::DAG_CBOR; +use semver::Version; +use std::path::{Path, PathBuf}; +use strum::IntoEnumIterator; +use tracing::info; + +use super::migration_map::MigrationOperation; + +pub(super) struct Migration0_22_0_0_22_1 { + from: Version, + to: Version, +} + +/// Migrates the database from version 0.22.0 to 0.22.1 +impl MigrationOperation for Migration0_22_0_0_22_1 { + fn new(from: Version, to: Version) -> Self + where + Self: Sized, + { + Self { from, to } + } + + fn pre_checks(&self, _chain_data_path: &Path) -> anyhow::Result<()> { + Ok(()) + } + + fn migrate(&self, chain_data_path: &Path, _: &Config) -> anyhow::Result { + let source_db = chain_data_path.join(self.from.to_string()); + + let temp_db_path = chain_data_path.join(temporary_db_name(&self.from, &self.to)); + if temp_db_path.exists() { + info!( + "Removing old temporary database {temp_db_path}", + temp_db_path = temp_db_path.display() + ); + std::fs::remove_dir_all(&temp_db_path)?; + } + + let old_car_db_path = source_db.join(CAR_DB_DIR_NAME); + let new_car_db_path = temp_db_path.join(CAR_DB_DIR_NAME); + + // Make sure `car_db` dir exists as it might not be the case when migrating + // from older versions. + if old_car_db_path.is_dir() { + info!( + "Copying snapshot from {source_db} to {temp_db_path}", + source_db = old_car_db_path.display(), + temp_db_path = new_car_db_path.display() + ); + + fs_extra::copy_items( + &[old_car_db_path.as_path()], + new_car_db_path, + &CopyOptions::default().copy_inside(true), + )?; + } + + let db = ParityDb::open(source_db)?; + + // open the new database to migrate data from the old one. + let new_db = paritydb_0_22_1::ParityDb::open(&temp_db_path)?; + + for col in DbColumn::iter() { + info!("Migrating column {}", col); + let mut res = anyhow::Ok(()); + match col { + DbColumn::GraphDagCborBlake2b256 => { + db.db.iter_column_while(col as u8, |val| { + let hash = Blake2b256.digest(&val.value); + let cid = Cid::new_v1(DAG_CBOR, hash); + res = new_db + .db + .commit_changes([Db::set_operation( + col as u8, + cid.to_bytes(), + val.value, + )]) + .context("failed to commit"); + + if res.is_err() { + return false; + } + + true + })?; + res?; + } + DbColumn::EthMappings => { + db.db.iter_column_while(col as u8, |val| { + let tsk: Result = + fvm_ipld_encoding::from_slice(&val.value); + if tsk.is_err() { + res = Err(tsk.context("serde error").unwrap_err()); + return false; + } + let cid = tsk.unwrap().cid(); + + if cid.is_err() { + res = Err(cid.context("serde error").unwrap_err()); + return false; + } + + let hash: EthHash = cid.unwrap().into(); + res = new_db + .db + .commit_changes([Db::set_operation( + col as u8, + hash.0.as_bytes().to_vec(), + val.value, + )]) + .context("failed to commit"); + + if res.is_err() { + return false; + } + + true + })?; + res?; + } + _ => { + let mut iter = db.db.iter(col as u8)?; + while let Some((key, value)) = iter.next()? { + new_db + .db + .commit_changes([Db::set_operation(col as u8, key, value)]) + .context("failed to commit")?; + } + } + } + } + + drop(new_db); + + Ok(temp_db_path) + } + + fn post_checks(&self, chain_data_path: &Path) -> anyhow::Result<()> { + let temp_db_name = temporary_db_name(&self.from, &self.to); + if !chain_data_path.join(&temp_db_name).exists() { + anyhow::bail!( + "migration database {} does not exist", + chain_data_path.join(temp_db_name).display() + ); + } + Ok(()) + } +} + +/// Database settings from Forest `v0.22.0` +mod paritydb_0_22_0 { + use parity_db::{CompressionType, Db, Options}; + use std::path::PathBuf; + use strum::{Display, EnumIter, IntoEnumIterator}; + + #[derive(Copy, Clone, Debug, PartialEq, EnumIter, Display)] + #[repr(u8)] + pub(super) enum DbColumn { + GraphDagCborBlake2b256, + GraphFull, + Settings, + EthMappings, + } + + impl DbColumn { + fn create_column_options(compression: CompressionType) -> Vec { + DbColumn::iter() + .map(|col| { + match col { + DbColumn::GraphDagCborBlake2b256 => parity_db::ColumnOptions { + preimage: true, + compression, + ..Default::default() + }, + DbColumn::GraphFull => parity_db::ColumnOptions { + preimage: true, + // This is needed for key retrieval. + btree_index: true, + compression, + ..Default::default() + }, + DbColumn::Settings => parity_db::ColumnOptions { + // explicitly disable preimage for settings column + // othewise we are not able to overwrite entries + preimage: false, + // This is needed for key retrieval. + btree_index: true, + compression, + ..Default::default() + }, + DbColumn::EthMappings => parity_db::ColumnOptions { + preimage: false, + btree_index: false, + compression, + ..Default::default() + }, + } + }) + .collect() + } + } + + pub(super) struct ParityDb { + pub db: parity_db::Db, + } + + impl ParityDb { + pub(super) fn to_options(path: PathBuf) -> Options { + Options { + path, + sync_wal: true, + sync_data: true, + stats: false, + salt: None, + columns: DbColumn::create_column_options(CompressionType::Lz4), + compression_threshold: [(0, 128)].into_iter().collect(), + } + } + + pub(super) fn open(path: impl Into) -> anyhow::Result { + let opts = Self::to_options(path.into()); + Ok(Self { + db: Db::open_or_create(&opts)?, + }) + } + } +} + +/// Database settings from Forest `v0.22.1` +mod paritydb_0_22_1 { + use parity_db::{CompressionType, Db, Options}; + use std::path::PathBuf; + use strum::{Display, EnumIter, IntoEnumIterator}; + + #[derive(Copy, Clone, Debug, PartialEq, EnumIter, Display)] + #[repr(u8)] + pub(super) enum DbColumn { + GraphDagCborBlake2b256, + GraphFull, + Settings, + EthMappings, + PersistentGraph, + } + + impl DbColumn { + fn create_column_options(compression: CompressionType) -> Vec { + DbColumn::iter() + .map(|col| { + match col { + DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => { + parity_db::ColumnOptions { + preimage: true, + compression, + ..Default::default() + } + } + DbColumn::GraphFull => parity_db::ColumnOptions { + preimage: true, + // This is needed for key retrieval. + btree_index: true, + compression, + ..Default::default() + }, + DbColumn::Settings => { + parity_db::ColumnOptions { + // explicitly disable preimage for settings column + // othewise we are not able to overwrite entries + preimage: false, + // This is needed for key retrieval. + btree_index: true, + compression, + ..Default::default() + } + } + DbColumn::EthMappings => parity_db::ColumnOptions { + preimage: false, + btree_index: false, + compression, + ..Default::default() + }, + } + }) + .collect() + } + } + + pub(super) struct ParityDb { + pub db: parity_db::Db, + } + + impl ParityDb { + pub(super) fn to_options(path: PathBuf) -> Options { + Options { + path, + sync_wal: true, + sync_data: true, + stats: false, + salt: None, + columns: DbColumn::create_column_options(CompressionType::Lz4), + compression_threshold: [(0, 128)].into_iter().collect(), + } + } + + pub(super) fn open(path: impl Into) -> anyhow::Result { + let opts = Self::to_options(path.into()); + Ok(Self { + db: Db::open_or_create(&opts)?, + }) + } + } +} diff --git a/src/db/mod.rs b/src/db/mod.rs index dd0a648c761b..452d0ef803a5 100644 --- a/src/db/mod.rs +++ b/src/db/mod.rs @@ -17,6 +17,7 @@ pub mod migration; use crate::rpc::eth::types::EthHash; use anyhow::Context as _; use cid::Cid; +use fvm_ipld_blockstore::{Blockstore, MemoryBlockstore}; use serde::de::DeserializeOwned; use serde::Serialize; use std::sync::Arc; @@ -211,6 +212,35 @@ pub trait GarbageCollectable { fn remove_keys(&self, keys: T) -> anyhow::Result; } +/// A trait that allows for storing data that is not garbage collected. +pub trait PersistentStore: Blockstore { + /// Puts a keyed block with pre-computed CID into the database. + /// + /// # Arguments + /// + /// * `k` - The key to be stored. + /// * `block` - The block to be stored. + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()>; +} + +impl PersistentStore for MemoryBlockstore { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.put_keyed(k, block) + } +} + +impl PersistentStore for Arc { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + PersistentStore::put_keyed_persistent(self.as_ref(), k, block) + } +} + +impl PersistentStore for &Arc { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + PersistentStore::put_keyed_persistent(self.as_ref(), k, block) + } +} + pub mod db_engine { use std::path::{Path, PathBuf}; diff --git a/src/db/parity_db.rs b/src/db/parity_db.rs index 07d6d3ae1a91..1482c0e99697 100644 --- a/src/db/parity_db.rs +++ b/src/db/parity_db.rs @@ -3,7 +3,7 @@ use std::path::PathBuf; -use super::SettingsStore; +use super::{PersistentStore, SettingsStore}; use super::EthMappingsStore; @@ -44,6 +44,10 @@ enum DbColumn { Settings, /// Column for storing Ethereum mappings. EthMappings, + /// Column for storing IPLD data that has to be ignored by the garbage collector. + /// Anything stored in this column can be considered permanent, unless manually + /// deleted. + PersistentGraph, } impl DbColumn { @@ -51,11 +55,13 @@ impl DbColumn { DbColumn::iter() .map(|col| { match col { - DbColumn::GraphDagCborBlake2b256 => parity_db::ColumnOptions { - preimage: true, - compression, - ..Default::default() - }, + DbColumn::GraphDagCborBlake2b256 | DbColumn::PersistentGraph => { + parity_db::ColumnOptions { + preimage: true, + compression, + ..Default::default() + } + } DbColumn::GraphFull => parity_db::ColumnOptions { preimage: true, // This is needed for key retrieval. @@ -87,6 +93,8 @@ impl DbColumn { pub struct ParityDb { pub db: parity_db::Db, statistics_enabled: bool, + // This is needed to maintain backwards-compatibility for pre-persistent-column migrations. + disable_persistent_fallback: bool, } impl ParityDb { @@ -107,13 +115,15 @@ impl ParityDb { Ok(Self { db: Db::open_or_create(&opts)?, statistics_enabled: opts.stats, + disable_persistent_fallback: false, }) } - pub fn wrap(db: parity_db::Db, stats: bool) -> Self { + pub fn wrap(db: parity_db::Db, stats: bool, disable_persistent: bool) -> Self { Self { db, statistics_enabled: stats, + disable_persistent_fallback: disable_persistent, } } @@ -216,24 +226,18 @@ impl EthMappingsStore for ParityDb { impl Blockstore for ParityDb { fn get(&self, k: &Cid) -> anyhow::Result>> { let column = Self::choose_column(k); - match column { - DbColumn::GraphDagCborBlake2b256 | DbColumn::GraphFull => { - self.read_from_column(k.to_bytes(), column) - } - DbColumn::Settings | DbColumn::EthMappings => panic!("invalid column for IPLD data"), + let res = self.read_from_column(k.to_bytes(), column)?; + if res.is_some() { + return Ok(res); } + self.get_persistent(k) } fn put_keyed(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { let column = Self::choose_column(k); - match column { - // We can put the data directly into the database without any encoding. - DbColumn::GraphDagCborBlake2b256 | DbColumn::GraphFull => { - self.write_to_column(k.to_bytes(), block, column) - } - DbColumn::Settings | DbColumn::EthMappings => panic!("invalid column for IPLD data"), - } + // We can put the data directly into the database without any encoding. + self.write_to_column(k.to_bytes(), block, column) } fn put_many_keyed(&self, blocks: I) -> anyhow::Result<()> @@ -255,6 +259,12 @@ impl Blockstore for ParityDb { } } +impl PersistentStore for ParityDb { + fn put_keyed_persistent(&self, k: &Cid, block: &[u8]) -> anyhow::Result<()> { + self.write_to_column(k.to_bytes(), block, DbColumn::PersistentGraph) + } +} + impl BitswapStoreRead for ParityDb { fn contains(&self, cid: &Cid) -> anyhow::Result { // We need to check both columns because we don't know which one @@ -333,6 +343,14 @@ impl ParityDb { pub fn set_operation(column: u8, key: Vec, value: Vec) -> Op { (column, Operation::Set(key, value)) } + + // Get data from persistent graph column. + fn get_persistent(&self, k: &Cid) -> anyhow::Result>> { + if self.disable_persistent_fallback { + return Ok(None); + } + self.read_from_column(k.to_bytes(), DbColumn::PersistentGraph) + } } impl GarbageCollectable for ParityDb { @@ -395,6 +413,7 @@ mod test { use cid::multihash::MultihashDigest; use fvm_ipld_encoding::IPLD_RAW; use nom::AsBytes; + use std::ops::Deref; use crate::db::tests::db_utils::parity::TempParityDB; @@ -437,6 +456,7 @@ mod test { DbColumn::GraphFull => DbColumn::GraphDagCborBlake2b256, DbColumn::Settings => panic!("invalid column for IPLD data"), DbColumn::EthMappings => panic!("invalid column for IPLD data"), + DbColumn::PersistentGraph => panic!("invalid column for GC enabled IPLD data"), }; let actual = db.read_from_column(cid.to_bytes(), other_column).unwrap(); assert!(actual.is_none()); @@ -523,4 +543,51 @@ mod test { assert_eq!(expected, actual); } } + + #[test] + fn persistent_tests() { + let db = TempParityDB::new(); + let data = [ + b"h'nglui mglw'nafh".to_vec(), + b"Cthulhu".to_vec(), + b"R'lyeh wgah'nagl fhtagn!!".to_vec(), + ]; + + let persistent_data = data + .clone() + .into_iter() + .map(|mut entry| { + entry.push(255); + entry + }) + .collect::>>(); + + let cids = [ + Cid::new_v1(DAG_CBOR, Blake2b256.digest(&data[0])), + Cid::new_v1(DAG_CBOR, Sha2_256.digest(&data[1])), + Cid::new_v1(IPLD_RAW, Blake2b256.digest(&data[1])), + ]; + + for idx in 0..3 { + let cid = &cids[idx]; + let persistent_entry = &persistent_data[idx]; + let data_entry = &data[idx]; + db.put_keyed_persistent(cid, persistent_entry).unwrap(); + // Check that we get persistent data if the data is otherwise absent from the GC enabled + // storage. + assert_eq!( + Blockstore::get(db.deref(), cid).unwrap(), + Some(persistent_entry.clone()) + ); + assert!(db + .read_from_column(cid.to_bytes(), DbColumn::PersistentGraph) + .unwrap() + .is_some()); + db.put_keyed(cid, data_entry).unwrap(); + assert_eq!( + Blockstore::get(db.deref(), cid).unwrap(), + Some(data_entry.clone()) + ); + } + } } diff --git a/src/tool/subcommands/snapshot_cmd.rs b/src/tool/subcommands/snapshot_cmd.rs index b9a096bebef8..9190168f990b 100644 --- a/src/tool/subcommands/snapshot_cmd.rs +++ b/src/tool/subcommands/snapshot_cmd.rs @@ -8,6 +8,7 @@ use crate::cli_shared::snapshot; use crate::daemon::bundle::load_actor_bundles; use crate::db::car::forest::DEFAULT_FOREST_CAR_FRAME_SIZE; use crate::db::car::{AnyCar, ManyCar}; +use crate::db::PersistentStore; use crate::interpreter::{MessageCallbackCtx, VMEvent, VMTrace}; use crate::ipld::stream_chain; use crate::networks::{butterflynet, calibnet, mainnet, ChainConfig, NetworkChain}; @@ -294,7 +295,7 @@ async fn validate_with_blockstore( check_stateroots: u32, ) -> anyhow::Result<()> where - BlockstoreT: Blockstore + Send + Sync + 'static, + BlockstoreT: PersistentStore + Send + Sync + 'static, { if check_links != 0 { validate_ipld_links(root.clone(), &store, check_links).await?; @@ -396,7 +397,7 @@ async fn validate_stateroots( epochs: u32, ) -> anyhow::Result<()> where - DB: Blockstore + Send + Sync + 'static, + DB: PersistentStore + Send + Sync + 'static, { let chain_config = Arc::new(ChainConfig::from_chain(&network)); let genesis = ts.genesis(db)?;