Skip to content

Commit

Permalink
feat: generate .forest.car.zst files by default (#3283)
Browse files Browse the repository at this point in the history
  • Loading branch information
lemmih authored Jul 27, 2023
1 parent d95f81f commit 6dbd776
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 92 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
lookup errors instead of silently ignoring them.
- [#2999](https://github.com/ChainSafe/forest/issues/2999): Restored `--tipset`
flag to `forest-cli snapshot export` to allow export at a specific tipset.
- [#3283](https://github.com/ChainSafe/forest/pull/3283): All generated car
files use the new forest.car.zst format.

### Removed

Expand Down
57 changes: 21 additions & 36 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,43 @@
pub mod store;
mod weight;
use crate::blocks::Tipset;
use crate::db::car::forest;
use crate::ipld::stream_chain;
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use anyhow::{Context, Result};
use async_compression::futures::write::ZstdEncoder;
use digest::Digest;
use futures::future::Either;
use futures::{io::BufWriter, AsyncWrite, AsyncWriteExt};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_car::CarHeader;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

pub use self::{store::*, weight::*};

pub async fn export<W, D>(
db: impl Blockstore + Send + Sync,
pub async fn export<D: Digest>(
db: impl Blockstore,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: W,
compressed: bool,
writer: impl AsyncWrite + Unpin,
skip_checksum: bool,
) -> Result<Option<digest::Output<D>>, Error>
where
D: Digest + Send + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
let store = Arc::new(db);
use futures::StreamExt;
let writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
let mut writer = if compressed {
Either::Left(ZstdEncoder::new(writer))
} else {
Either::Right(writer)
};

) -> Result<Option<digest::Output<D>>, Error> {
let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
let roots = tipset.key().cids().to_vec();

// Wrap writer in optional checksum calculator
let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);

// Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also
// stream all block headers until genesis.
let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit);

// Encode Ipld key-value pairs in zstd frames
let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks);

let mut stream = stream_chain(&store, tipset.clone().chain(&store), stateroot_lookup_limit)
.map(|result| result.unwrap()); // FIXME: use a sink that supports TryStream.
let header = CarHeader::from(tipset.key().cids().to_vec());
header
.write_stream_async(&mut writer, &mut stream)
.await
.map_err(|e| Error::Other(format!("Failed to write blocks in export: {e}")))?;
// Write zstd frames and include a skippable index
forest::Encoder::write(&mut writer, roots, frames).await?;

// Flush to ensure everything has been successfully written
writer.flush().await.context("failed to flush")?;
writer.close().await.context("failed to close")?;

let digest = match &mut writer {
Either::Left(left) => left.get_mut().finalize().await,
Either::Right(right) => right.finalize().await,
}
.map_err(|e| Error::Other(e.to_string()))?;
let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;

Ok(digest)
}
6 changes: 6 additions & 0 deletions src/chain/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ impl From<anyhow::Error> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Other(e.to_string())
}
}

impl<T> From<flume::SendError<T>> for Error {
fn from(e: flume::SendError<T>) -> Self {
Error::Other(e.to_string())
Expand Down
32 changes: 26 additions & 6 deletions src/cli/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use crate::chain::ChainEpochDelta;
use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
use crate::db::car::AnyCar;
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain};
use crate::shim::clock::EPOCH_DURATION_SECONDS;
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY};
use anyhow::{bail, Context as _};
use chrono::Utc;
use chrono::NaiveDateTime;
use clap::Subcommand;
use fvm_ipld_blockstore::Blockstore;
use indicatif::ProgressIterator;
Expand All @@ -43,7 +44,6 @@ use sha2::Sha256;
use std::io::{self, Read, Seek};
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::info;

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -106,13 +106,24 @@ impl ArchiveCommands {

// This does nothing if the output path is a file. If it is a directory - it produces the following:
// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
fn build_output_path(chain: String, epoch: ChainEpoch, output_path: PathBuf) -> PathBuf {
fn build_output_path(
chain: String,
genesis_timestamp: u64,
epoch: ChainEpoch,
output_path: PathBuf,
) -> PathBuf {
match output_path.is_dir() {
true => output_path.join(snapshot::filename(
TrustedVendor::Forest,
chain,
Utc::now().date_naive(),
NaiveDateTime::from_timestamp_opt(
genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS,
0,
)
.unwrap_or_default()
.into(),
epoch,
true,
)),
false => output_path.clone(),
}
Expand Down Expand Up @@ -154,7 +165,8 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
.tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
.context("unable to get a tipset at given height")?;

let output_path = build_output_path(network.to_string(), epoch, output_path);
let output_path =
build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path);

let writer = tokio::fs::File::create(&output_path)
.await
Expand All @@ -168,7 +180,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path.to_str().unwrap_or_default()
);

crate::chain::export::<_, Sha256>(store, &ts, depth, writer.compat(), true, true).await?;
crate::chain::export::<Sha256>(store, &ts, depth, writer, true).await?;

Ok(())
}
Expand Down Expand Up @@ -335,6 +347,7 @@ mod tests {
use fvm_ipld_car::CarReader;
use tempfile::TempDir;
use tokio::io::BufReader;
use tokio_util::compat::TokioAsyncReadCompatExt;

#[test]
fn archive_info_calibnet() {
Expand All @@ -358,6 +371,12 @@ mod tests {
assert_eq!(info.epoch, 0);
}

fn genesis_timestamp(reader: impl Read + Seek) -> u64 {
let db = crate::db::car::PlainCar::new(reader).unwrap();
let ts = Tipset::load_required(&db, &TipsetKeys::new(db.roots())).unwrap();
ts.genesis(&db).unwrap().timestamp()
}

#[tokio::test]
async fn export() {
let output_path = TempDir::new().unwrap();
Expand All @@ -371,6 +390,7 @@ mod tests {
.unwrap();
let file = tokio::fs::File::open(build_output_path(
NetworkChain::Calibnet.to_string(),
genesis_timestamp(std::io::Cursor::new(calibnet::DEFAULT_GENESIS)),
0,
output_path.path().into(),
))
Expand Down
4 changes: 3 additions & 1 deletion src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::utils::proofs_api::paramfetch::ensure_params_downloaded;
use anyhow::{bail, Context, Result};
use chrono::Utc;
use clap::Subcommand;
use futures::TryStreamExt;
use fvm_ipld_blockstore::Blockstore;
use human_repr::HumanCount;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -108,6 +109,7 @@ impl SnapshotCommands {
chain_name,
Utc::now().date_naive(),
epoch,
true,
)),
false => output_path.clone(),
};
Expand Down Expand Up @@ -207,7 +209,7 @@ impl SnapshotCommands {
let frames = crate::db::car::forest::Encoder::compress_stream(
frame_size,
compression_level,
block_stream,
block_stream.map_err(anyhow::Error::from),
);
crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
dest.flush().await?;
Expand Down
64 changes: 49 additions & 15 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@ pub enum TrustedVendor {
/// Create a filename in the "full" format. See [`parse`].
// Common between export, and [`fetch`].
// Keep in sync with the CLI documentation for the `snapshot` sub-command.
pub fn filename(vendor: impl Display, chain: impl Display, date: NaiveDate, height: i64) -> String {
pub fn filename(
vendor: impl Display,
chain: impl Display,
date: NaiveDate,
height: i64,
forest_format: bool,
) -> String {
let vendor = vendor.to_string();
let chain = chain.to_string();
ParsedFilename::Full {
vendor: &vendor,
chain: &chain,
date,
height,
forest_format,
}
.to_string()
}
Expand All @@ -60,10 +67,10 @@ pub async fn fetch(
vendor: TrustedVendor,
) -> anyhow::Result<PathBuf> {
let (_len, url) = peek(vendor, chain).await?;
let (date, height) = ParsedFilename::parse_url(&url)
let (date, height, forest_format) = ParsedFilename::parse_url(&url)
.context("unexpected url format")?
.date_and_height();
let filename = filename(vendor, chain, date, height);
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

match download_aria2c(&url, directory, &filename).await {
Ok(path) => Ok(path),
Expand Down Expand Up @@ -232,6 +239,7 @@ mod parse {
chain: &'a str,
date: NaiveDate,
height: i64,
forest_format: bool,
},
}

Expand All @@ -247,19 +255,26 @@ mod parse {
chain,
date,
height,
forest_format,
} => f.write_fmt(format_args!(
"{vendor}_snapshot_{chain}_{}_height_{height}.car.zst",
date.format("%Y-%m-%d")
"{vendor}_snapshot_{chain}_{}_height_{height}{}.car.zst",
date.format("%Y-%m-%d"),
if *forest_format { ".forest" } else { "" }
)),
}
}
}

impl<'a> ParsedFilename<'a> {
pub fn date_and_height(&self) -> (NaiveDate, i64) {
pub fn date_and_height_and_forest(&self) -> (NaiveDate, i64, bool) {
match self {
ParsedFilename::Short { date, height, .. } => (*date, *height),
ParsedFilename::Full { date, height, .. } => (*date, *height),
ParsedFilename::Short { date, height, .. } => (*date, *height, false),
ParsedFilename::Full {
date,
height,
forest_format,
..
} => (*date, *height, *forest_format),
}
}

Expand Down Expand Up @@ -310,7 +325,7 @@ mod parse {
}

fn full(input: &str) -> nom::IResult<&str, ParsedFilename> {
let (rest, (vendor, _snapshot_, chain, _, date, _height_, height, _car_zst)) =
let (rest, (vendor, _snapshot_, chain, _, date, _height_, height, car_zst)) =
tuple((
take_until("_snapshot_"),
tag("_snapshot_"),
Expand All @@ -319,7 +334,7 @@ mod parse {
ymd("-"),
tag("_height_"),
number,
tag(".car.zst"),
alt((tag(".car.zst"), tag(".forest.car.zst"))),
))(input)?;
Ok((
rest,
Expand All @@ -328,6 +343,7 @@ mod parse {
chain,
date,
height,
forest_format: car_zst == ".forest.car.zst",
},
))
}
Expand Down Expand Up @@ -389,24 +405,34 @@ mod parse {
month: u32,
day: u32,
height: i64,
forest_format: bool,
) -> Self {
Self::Full {
vendor,
chain,
date: NaiveDate::from_ymd_opt(year, month, day).unwrap(),
height,
forest_format,
}
}
}

for (text, value) in [
(
"forest_snapshot_mainnet_2023-05-30_height_2905376.car.zst",
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376),
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376, false),
),
(
"forest_snapshot_calibnet_2023-05-30_height_604419.car.zst",
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419),
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419, false),
),
(
"forest_snapshot_mainnet_2023-05-30_height_2905376.forest.car.zst",
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376, true),
),
(
"forest_snapshot_calibnet_2023-05-30_height_604419.forest.car.zst",
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419, true),
),
(
"2905920_2023_05_30T22_00_00Z.car.zst",
Expand All @@ -418,11 +444,19 @@ mod parse {
),
(
"filecoin_snapshot_calibnet_2023-06-13_height_643680.car.zst",
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680),
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680, false),
),
(
"venus_snapshot_pineconenet_2045-01-01_height_2.car.zst",
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2),
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2, false),
),
(
"filecoin_snapshot_calibnet_2023-06-13_height_643680.forest.car.zst",
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680, true),
),
(
"venus_snapshot_pineconenet_2045-01-01_height_2.forest.car.zst",
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2, true),
),
] {
assert_eq!(
Expand Down
Loading

0 comments on commit 6dbd776

Please sign in to comment.