Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: generate .forest.car.zst files by default #3283

Merged
merged 3 commits into from
Jul 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
lookup errors instead of silently ignoring them.
- [#2999](https://github.com/ChainSafe/forest/issues/2999): Restored `--tipset`
flag to `forest-cli snapshot export` to allow export at a specific tipset.
- [#3283](https://github.com/ChainSafe/forest/pull/3283): All generated car
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a breaking change, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In other words, should any services (e.g., upload snapshot) be updated to this logic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Surprisingly it is not! The files are all backward compatible. The new snapshots can still be used with old versions of Forest or even with Lotus. They just won't get any of the new cool benefits.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we also good with extensions and many regular expressions to match files?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh,yeah, we'll need to update some regexes.

files use the new forest.car.zst format.

### Removed

Expand Down
57 changes: 21 additions & 36 deletions src/chain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,58 +3,43 @@
pub mod store;
mod weight;
use crate::blocks::Tipset;
use crate::db::car::forest;
use crate::ipld::stream_chain;
use crate::utils::io::{AsyncWriterWithChecksum, Checksum};
use anyhow::{Context, Result};
use async_compression::futures::write::ZstdEncoder;
use digest::Digest;
use futures::future::Either;
use futures::{io::BufWriter, AsyncWrite, AsyncWriteExt};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_car::CarHeader;
use std::sync::Arc;
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter};

pub use self::{store::*, weight::*};

pub async fn export<W, D>(
db: impl Blockstore + Send + Sync,
pub async fn export<D: Digest>(
db: impl Blockstore,
tipset: &Tipset,
lookup_depth: ChainEpochDelta,
writer: W,
compressed: bool,
writer: impl AsyncWrite + Unpin,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about writer: impl AsyncBufWrite + Unpin to enforce using BufWriter here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to enforce that?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see most of the export calls are passing File instead of BufWriter, should it be beneficial to use BufWriter given these exported snapshots are pretty large? if it is, should we enforce using BufWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BufWriter isn't automatically better, and it only improves performance when you have a lot of small writes. We do not have a lot of small writes.

skip_checksum: bool,
) -> Result<Option<digest::Output<D>>, Error>
where
D: Digest + Send + 'static,
W: AsyncWrite + Send + Unpin + 'static,
{
let store = Arc::new(db);
use futures::StreamExt;
let writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);
let mut writer = if compressed {
Either::Left(ZstdEncoder::new(writer))
} else {
Either::Right(writer)
};

) -> Result<Option<digest::Output<D>>, Error> {
let stateroot_lookup_limit = tipset.epoch() - lookup_depth;
let roots = tipset.key().cids().to_vec();

// Wrap writer in optional checksum calculator
let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum);

// Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also
// stream all block headers until genesis.
let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit);

// Encode Ipld key-value pairs in zstd frames
let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks);

let mut stream = stream_chain(&store, tipset.clone().chain(&store), stateroot_lookup_limit)
.map(|result| result.unwrap()); // FIXME: use a sink that supports TryStream.
let header = CarHeader::from(tipset.key().cids().to_vec());
header
.write_stream_async(&mut writer, &mut stream)
.await
.map_err(|e| Error::Other(format!("Failed to write blocks in export: {e}")))?;
// Write zstd frames and include a skippable index
forest::Encoder::write(&mut writer, roots, frames).await?;

// Flush to ensure everything has been successfully written
writer.flush().await.context("failed to flush")?;
writer.close().await.context("failed to close")?;

let digest = match &mut writer {
Either::Left(left) => left.get_mut().finalize().await,
Either::Right(right) => right.finalize().await,
}
.map_err(|e| Error::Other(e.to_string()))?;
let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?;

Ok(digest)
}
6 changes: 6 additions & 0 deletions src/chain/store/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ impl From<anyhow::Error> for Error {
}
}

impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Self {
Error::Other(e.to_string())
}
}

impl<T> From<flume::SendError<T>> for Error {
fn from(e: flume::SendError<T>) -> Self {
Error::Other(e.to_string())
Expand Down
32 changes: 26 additions & 6 deletions src/cli/subcommands/archive_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ use crate::chain::ChainEpochDelta;
use crate::cli_shared::{snapshot, snapshot::TrustedVendor};
use crate::db::car::AnyCar;
use crate::networks::{calibnet, mainnet, ChainConfig, NetworkChain};
use crate::shim::clock::EPOCH_DURATION_SECONDS;
use crate::shim::clock::{ChainEpoch, EPOCHS_IN_DAY};
use anyhow::{bail, Context as _};
use chrono::Utc;
use chrono::NaiveDateTime;
use clap::Subcommand;
use fvm_ipld_blockstore::Blockstore;
use indicatif::ProgressIterator;
Expand All @@ -43,7 +44,6 @@ use sha2::Sha256;
use std::io::{self, Read, Seek};
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::compat::TokioAsyncReadCompatExt;
use tracing::info;

#[derive(Debug, Subcommand)]
Expand Down Expand Up @@ -106,13 +106,24 @@ impl ArchiveCommands {

// This does nothing if the output path is a file. If it is a directory - it produces the following:
// `./forest_snapshot_{chain}_{year}-{month}-{day}_height_{epoch}.car.zst`.
fn build_output_path(chain: String, epoch: ChainEpoch, output_path: PathBuf) -> PathBuf {
fn build_output_path(
chain: String,
genesis_timestamp: u64,
epoch: ChainEpoch,
output_path: PathBuf,
) -> PathBuf {
match output_path.is_dir() {
true => output_path.join(snapshot::filename(
TrustedVendor::Forest,
chain,
Utc::now().date_naive(),
NaiveDateTime::from_timestamp_opt(
genesis_timestamp as i64 + epoch * EPOCH_DURATION_SECONDS,
0,
)
.unwrap_or_default()
.into(),
epoch,
true,
)),
false => output_path.clone(),
}
Expand Down Expand Up @@ -154,7 +165,8 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
.tipset_by_height(epoch, ts, ResolveNullTipset::TakeOlder)
.context("unable to get a tipset at given height")?;

let output_path = build_output_path(network.to_string(), epoch, output_path);
let output_path =
build_output_path(network.to_string(), genesis.timestamp(), epoch, output_path);

let writer = tokio::fs::File::create(&output_path)
.await
Expand All @@ -168,7 +180,7 @@ async fn do_export<ReaderT: Read + Seek + Send + Sync>(
output_path.to_str().unwrap_or_default()
);

crate::chain::export::<_, Sha256>(store, &ts, depth, writer.compat(), true, true).await?;
crate::chain::export::<Sha256>(store, &ts, depth, writer, true).await?;

Ok(())
}
Expand Down Expand Up @@ -341,6 +353,7 @@ mod tests {
use fvm_ipld_car::CarReader;
use tempfile::TempDir;
use tokio::io::BufReader;
use tokio_util::compat::TokioAsyncReadCompatExt;

#[test]
fn archive_info_calibnet() {
Expand All @@ -364,6 +377,12 @@ mod tests {
assert_eq!(info.epoch, 0);
}

fn genesis_timestamp(reader: impl Read + Seek) -> u64 {
let db = crate::db::car::PlainCar::new(reader).unwrap();
let ts = Tipset::load_required(&db, &TipsetKeys::new(db.roots())).unwrap();
ts.genesis(&db).unwrap().timestamp()
}

#[tokio::test]
async fn export() {
let output_path = TempDir::new().unwrap();
Expand All @@ -377,6 +396,7 @@ mod tests {
.unwrap();
let file = tokio::fs::File::open(build_output_path(
NetworkChain::Calibnet.to_string(),
genesis_timestamp(std::io::Cursor::new(calibnet::DEFAULT_GENESIS)),
0,
output_path.path().into(),
))
Expand Down
4 changes: 3 additions & 1 deletion src/cli/subcommands/snapshot_cmd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::utils::proofs_api::paramfetch::ensure_params_downloaded;
use anyhow::{bail, Context, Result};
use chrono::Utc;
use clap::Subcommand;
use futures::TryStreamExt;
use fvm_ipld_blockstore::Blockstore;
use human_repr::HumanCount;
use std::path::{Path, PathBuf};
Expand Down Expand Up @@ -108,6 +109,7 @@ impl SnapshotCommands {
chain_name,
Utc::now().date_naive(),
epoch,
true,
)),
false => output_path.clone(),
};
Expand Down Expand Up @@ -207,7 +209,7 @@ impl SnapshotCommands {
let frames = crate::db::car::forest::Encoder::compress_stream(
frame_size,
compression_level,
block_stream,
block_stream.map_err(anyhow::Error::from),
);
crate::db::car::forest::Encoder::write(&mut dest, roots, frames).await?;
dest.flush().await?;
Expand Down
64 changes: 49 additions & 15 deletions src/cli_shared/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,21 @@ pub enum TrustedVendor {
/// Create a filename in the "full" format. See [`parse`].
// Common between export, and [`fetch`].
// Keep in sync with the CLI documentation for the `snapshot` sub-command.
pub fn filename(vendor: impl Display, chain: impl Display, date: NaiveDate, height: i64) -> String {
pub fn filename(
vendor: impl Display,
chain: impl Display,
date: NaiveDate,
height: i64,
forest_format: bool,
) -> String {
let vendor = vendor.to_string();
let chain = chain.to_string();
ParsedFilename::Full {
vendor: &vendor,
chain: &chain,
date,
height,
forest_format,
}
.to_string()
}
Expand All @@ -60,10 +67,10 @@ pub async fn fetch(
vendor: TrustedVendor,
) -> anyhow::Result<PathBuf> {
let (_len, url) = peek(vendor, chain).await?;
let (date, height) = ParsedFilename::parse_url(&url)
let (date, height, forest_format) = ParsedFilename::parse_url(&url)
.context("unexpected url format")?
.date_and_height();
let filename = filename(vendor, chain, date, height);
.date_and_height_and_forest();
let filename = filename(vendor, chain, date, height, forest_format);

match download_aria2c(&url, directory, &filename).await {
Ok(path) => Ok(path),
Expand Down Expand Up @@ -232,6 +239,7 @@ mod parse {
chain: &'a str,
date: NaiveDate,
height: i64,
forest_format: bool,
},
}

Expand All @@ -247,19 +255,26 @@ mod parse {
chain,
date,
height,
forest_format,
} => f.write_fmt(format_args!(
"{vendor}_snapshot_{chain}_{}_height_{height}.car.zst",
date.format("%Y-%m-%d")
"{vendor}_snapshot_{chain}_{}_height_{height}{}.car.zst",
date.format("%Y-%m-%d"),
if *forest_format { ".forest" } else { "" }
)),
}
}
}

impl<'a> ParsedFilename<'a> {
pub fn date_and_height(&self) -> (NaiveDate, i64) {
pub fn date_and_height_and_forest(&self) -> (NaiveDate, i64, bool) {
match self {
ParsedFilename::Short { date, height, .. } => (*date, *height),
ParsedFilename::Full { date, height, .. } => (*date, *height),
ParsedFilename::Short { date, height, .. } => (*date, *height, false),
ParsedFilename::Full {
date,
height,
forest_format,
..
} => (*date, *height, *forest_format),
}
}

Expand Down Expand Up @@ -310,7 +325,7 @@ mod parse {
}

fn full(input: &str) -> nom::IResult<&str, ParsedFilename> {
let (rest, (vendor, _snapshot_, chain, _, date, _height_, height, _car_zst)) =
let (rest, (vendor, _snapshot_, chain, _, date, _height_, height, car_zst)) =
tuple((
take_until("_snapshot_"),
tag("_snapshot_"),
Expand All @@ -319,7 +334,7 @@ mod parse {
ymd("-"),
tag("_height_"),
number,
tag(".car.zst"),
alt((tag(".car.zst"), tag(".forest.car.zst"))),
))(input)?;
Ok((
rest,
Expand All @@ -328,6 +343,7 @@ mod parse {
chain,
date,
height,
forest_format: car_zst == ".forest.car.zst",
},
))
}
Expand Down Expand Up @@ -389,24 +405,34 @@ mod parse {
month: u32,
day: u32,
height: i64,
forest_format: bool,
) -> Self {
Self::Full {
vendor,
chain,
date: NaiveDate::from_ymd_opt(year, month, day).unwrap(),
height,
forest_format,
}
}
}

for (text, value) in [
(
"forest_snapshot_mainnet_2023-05-30_height_2905376.car.zst",
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376),
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376, false),
),
(
"forest_snapshot_calibnet_2023-05-30_height_604419.car.zst",
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419),
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419, false),
),
(
"forest_snapshot_mainnet_2023-05-30_height_2905376.forest.car.zst",
ParsedFilename::full("forest", "mainnet", 2023, 5, 30, 2905376, true),
),
(
"forest_snapshot_calibnet_2023-05-30_height_604419.forest.car.zst",
ParsedFilename::full("forest", "calibnet", 2023, 5, 30, 604419, true),
),
(
"2905920_2023_05_30T22_00_00Z.car.zst",
Expand All @@ -418,11 +444,19 @@ mod parse {
),
(
"filecoin_snapshot_calibnet_2023-06-13_height_643680.car.zst",
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680),
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680, false),
),
(
"venus_snapshot_pineconenet_2045-01-01_height_2.car.zst",
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2),
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2, false),
),
(
"filecoin_snapshot_calibnet_2023-06-13_height_643680.forest.car.zst",
ParsedFilename::full("filecoin", "calibnet", 2023, 6, 13, 643680, true),
),
(
"venus_snapshot_pineconenet_2045-01-01_height_2.forest.car.zst",
ParsedFilename::full("venus", "pineconenet", 2045, 1, 1, 2, true),
),
] {
assert_eq!(
Expand Down
Loading