-
Notifications
You must be signed in to change notification settings - Fork 159
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: generate .forest.car.zst files by default #3283
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,58 +3,43 @@ | |
pub mod store; | ||
mod weight; | ||
use crate::blocks::Tipset; | ||
use crate::db::car::forest; | ||
use crate::ipld::stream_chain; | ||
use crate::utils::io::{AsyncWriterWithChecksum, Checksum}; | ||
use anyhow::{Context, Result}; | ||
use async_compression::futures::write::ZstdEncoder; | ||
use digest::Digest; | ||
use futures::future::Either; | ||
use futures::{io::BufWriter, AsyncWrite, AsyncWriteExt}; | ||
use fvm_ipld_blockstore::Blockstore; | ||
use fvm_ipld_car::CarHeader; | ||
use std::sync::Arc; | ||
use tokio::io::{AsyncWrite, AsyncWriteExt, BufWriter}; | ||
|
||
pub use self::{store::*, weight::*}; | ||
|
||
pub async fn export<W, D>( | ||
db: impl Blockstore + Send + Sync, | ||
pub async fn export<D: Digest>( | ||
db: impl Blockstore, | ||
tipset: &Tipset, | ||
lookup_depth: ChainEpochDelta, | ||
writer: W, | ||
compressed: bool, | ||
writer: impl AsyncWrite + Unpin, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we want to enforce that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see most of the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
skip_checksum: bool, | ||
) -> Result<Option<digest::Output<D>>, Error> | ||
where | ||
D: Digest + Send + 'static, | ||
W: AsyncWrite + Send + Unpin + 'static, | ||
{ | ||
let store = Arc::new(db); | ||
use futures::StreamExt; | ||
let writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum); | ||
let mut writer = if compressed { | ||
Either::Left(ZstdEncoder::new(writer)) | ||
} else { | ||
Either::Right(writer) | ||
}; | ||
|
||
) -> Result<Option<digest::Output<D>>, Error> { | ||
let stateroot_lookup_limit = tipset.epoch() - lookup_depth; | ||
let roots = tipset.key().cids().to_vec(); | ||
|
||
// Wrap writer in optional checksum calculator | ||
let mut writer = AsyncWriterWithChecksum::<D, _>::new(BufWriter::new(writer), !skip_checksum); | ||
|
||
// Stream stateroots in range stateroot_lookup_limit..=tipset.epoch(). Also | ||
// stream all block headers until genesis. | ||
let blocks = stream_chain(&db, tipset.clone().chain(&db), stateroot_lookup_limit); | ||
|
||
// Encode Ipld key-value pairs in zstd frames | ||
let frames = forest::Encoder::compress_stream(8000usize.next_power_of_two(), 3, blocks); | ||
|
||
let mut stream = stream_chain(&store, tipset.clone().chain(&store), stateroot_lookup_limit) | ||
.map(|result| result.unwrap()); // FIXME: use a sink that supports TryStream. | ||
let header = CarHeader::from(tipset.key().cids().to_vec()); | ||
header | ||
.write_stream_async(&mut writer, &mut stream) | ||
.await | ||
.map_err(|e| Error::Other(format!("Failed to write blocks in export: {e}")))?; | ||
// Write zstd frames and include a skippable index | ||
forest::Encoder::write(&mut writer, roots, frames).await?; | ||
|
||
// Flush to ensure everything has been successfully written | ||
writer.flush().await.context("failed to flush")?; | ||
writer.close().await.context("failed to close")?; | ||
|
||
let digest = match &mut writer { | ||
Either::Left(left) => left.get_mut().finalize().await, | ||
Either::Right(right) => right.finalize().await, | ||
} | ||
.map_err(|e| Error::Other(e.to_string()))?; | ||
let digest = writer.finalize().map_err(|e| Error::Other(e.to_string()))?; | ||
|
||
Ok(digest) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a breaking change, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In other words, should any services (e.g., upload snapshot) be updated to this logic?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Surprisingly it is not! The files are all backward compatible. The new snapshots can still be used with old versions of Forest or even with Lotus. They just won't get any of the new cool benefits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we also good with extensions and many regular expressions to match files?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh,yeah, we'll need to update some regexes.