Skip to content

Commit

Permalink
chore: use stream/sink CAR loader (#3090)
Browse files Browse the repository at this point in the history
Co-authored-by: Hubert <hubert@chainsafe.io>
  • Loading branch information
lemmih and LesnyRumcajs authored Jul 10, 2023
1 parent f47aff7 commit 4f0d75f
Show file tree
Hide file tree
Showing 6 changed files with 161 additions and 31 deletions.
25 changes: 25 additions & 0 deletions scripts/db_params_hyperfine.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env bash
set -euo pipefail
CHAIN=calibnet

# https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/debug/filecoin_full_calibnet_2023-04-07_450000.car
SNAPSHOT=filecoin_full_calibnet_2023-04-07_450000.car
if [ ! -f $SNAPSHOT ]
then
aria2c -x 4 "https://forest-snapshots.fra1.cdn.digitaloceanspaces.com/debug/filecoin_full_calibnet_2023-04-07_450000.car"
fi

cargo build --release

# For some reason, cleaning the database with --cleanup gives me wildly inconsistent results.
hyperfine \
--runs 5 \
--parameter-list CHUNK_SIZE 1000,5000,10000,20000,40000,200000,500000 \
--parameter-list BUFFER_CAPACITY 0,1,2,3 \
--export-markdown db_tune_params.md \
--command-name 'forest-import-{CHUNK_SIZE}-{BUFFER_CAPACITY}' \
"echo \"[client]\nchunk_size = {CHUNK_SIZE}\nbuffer_size = {BUFFER_CAPACITY}\" > /tmp/forest.conf; \
./target/release/forest \
--chain ${CHAIN} --config /tmp/forest.conf --rpc false --no-gc --encrypt-keystore false --halt-after-import \
--import-snapshot ${SNAPSHOT}; \
./target/release/forest-cli --chain ${CHAIN} db clean --force"
44 changes: 44 additions & 0 deletions src/cli_shared/cli/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,24 @@ use directories::ProjectDirs;
use serde::{Deserialize, Serialize};
use serde_with::{serde_as, DurationSeconds};

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct ChunkSize(pub u32);
impl Default for ChunkSize {
fn default() -> Self {
ChunkSize(500_000)
}
}

#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(transparent)]
pub struct BufferSize(pub u32);
impl Default for BufferSize {
fn default() -> Self {
BufferSize(1)
}
}

#[serde_as]
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(default)]
Expand All @@ -31,6 +49,12 @@ pub struct Client {
/// Skips loading import CAR file and assumes it's already been loaded.
/// Will use the CIDs in the header of the file to index the chain.
pub skip_load: bool,
/// When importing CAR files, chunk key-value pairs before committing them
/// to the database.
pub chunk_size: ChunkSize,
/// When importing CAR files, maintain a read-ahead buffer measured in
/// number of chunks.
pub buffer_size: BufferSize,
pub encrypt_keystore: bool,
/// Metrics bind, e.g. 127.0.0.1:6116
pub metrics_address: SocketAddr,
Expand All @@ -56,6 +80,8 @@ impl Default for Client {
snapshot_height: None,
snapshot_head: None,
skip_load: false,
chunk_size: ChunkSize::default(),
buffer_size: BufferSize::default(),
encrypt_keystore: true,
metrics_address: FromStr::from_str("0.0.0.0:6116").unwrap(),
rpc_address: SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), DEFAULT_PORT),
Expand All @@ -64,3 +90,21 @@ impl Default for Client {
}
}
}

#[cfg(test)]
mod test {
use super::{BufferSize, ChunkSize};
use quickcheck::Arbitrary;

impl Arbitrary for ChunkSize {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
ChunkSize(u32::arbitrary(g))
}
}

impl Arbitrary for BufferSize {
fn arbitrary(g: &mut quickcheck::Gen) -> Self {
BufferSize(u32::arbitrary(g))
}
}
}
3 changes: 3 additions & 0 deletions src/cli_shared/cli/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ mod test {
path::PathBuf,
};

use crate::cli_shared::cli::client::{BufferSize, ChunkSize};
use crate::utils::io::ProgressBarVisibility;
use chrono::Duration;
use quickcheck::Arbitrary;
Expand Down Expand Up @@ -169,6 +170,8 @@ mod test {
snapshot_path: Option::arbitrary(g),
snapshot_head: Option::arbitrary(g),
skip_load: bool::arbitrary(g),
chunk_size: ChunkSize::arbitrary(g),
buffer_size: BufferSize::arbitrary(g),
encrypt_keystore: bool::arbitrary(g),
metrics_address: SocketAddr::arbitrary(g),
rpc_address: SocketAddr::arbitrary(g),
Expand Down
10 changes: 8 additions & 2 deletions src/daemon/bundle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2019-2023 ChainSafe Systems
// SPDX-License-Identifier: Apache-2.0, MIT

use crate::cli_shared::cli::Config;
use crate::cli_shared::cli::{BufferSize, ChunkSize, Config};
use crate::genesis::forest_load_car;
use crate::networks::Height;
use crate::shim::clock::ChainEpoch;
Expand Down Expand Up @@ -31,7 +31,13 @@ where
}

for (manifest_cid, reader) in bundles {
let (result, _) = forest_load_car(db.clone(), reader.compat()).await?;
let (result, _) = forest_load_car(
db.clone(),
reader.compat(),
ChunkSize::default(),
BufferSize::default(),
)
.await?;
assert_eq!(
result.len(),
1,
Expand Down
24 changes: 20 additions & 4 deletions src/daemon/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,8 @@ pub(super) async fn start(
&state_manager,
&path.display().to_string(),
config.client.skip_load,
config.client.chunk_size,
config.client.buffer_size,
)
.await
.context("Failed miserably while importing chain from snapshot")?;
Expand Down Expand Up @@ -715,6 +717,7 @@ fn create_password(prompt: &str) -> std::io::Result<String> {
#[cfg(test)]
mod test {
use crate::blocks::BlockHeader;
use crate::cli_shared::cli::{BufferSize, ChunkSize};
use crate::db::MemoryDB;
use crate::networks::ChainConfig;
use crate::shim::address::Address;
Expand Down Expand Up @@ -774,7 +777,14 @@ mod test {
chain_config,
Arc::new(crate::interpreter::RewardActorMessageCalc),
)?);
import_chain::<_>(&sm, file_path, false).await?;
import_chain::<_>(
&sm,
file_path,
false,
ChunkSize::default(),
BufferSize::default(),
)
.await?;
Ok(())
}

Expand All @@ -799,9 +809,15 @@ mod test {
chain_config,
Arc::new(crate::interpreter::RewardActorMessageCalc),
)?);
import_chain::<_>(&sm, "test-snapshots/chain4.car", false)
.await
.context("Failed to import chain")?;
import_chain::<_>(
&sm,
"test-snapshots/chain4.car",
false,
ChunkSize::default(),
BufferSize::default(),
)
.await
.context("Failed to import chain")?;

Ok(())
}
Expand Down
86 changes: 61 additions & 25 deletions src/genesis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,11 @@
use std::{sync::Arc, time};

use crate::blocks::{BlockHeader, TipsetKeys};
use crate::cli_shared::cli::{BufferSize, ChunkSize};
use crate::state_manager::StateManager;
use crate::utils::db::BlockstoreBufferedWriteExt;

use anyhow::bail;
use cid::Cid;
use futures::AsyncRead;
use futures::{sink::SinkExt, stream, AsyncRead, Stream, StreamExt};
use fvm_ipld_blockstore::Blockstore;
use fvm_ipld_car::{load_car, CarReader};
use fvm_ipld_encoding::CborStore;
Expand Down Expand Up @@ -88,6 +87,8 @@ pub async fn import_chain<DB>(
sm: &Arc<StateManager<DB>>,
path: &str,
skip_load: bool,
chunk_size: ChunkSize,
buffer_size: BufferSize,
) -> anyhow::Result<()>
where
DB: Blockstore + Clone + Send + Sync + 'static,
Expand All @@ -97,8 +98,14 @@ where
let stopwatch = time::Instant::now();
let reader = crate::utils::net::reader(path).await?;

let (cids, n_records) =
load_and_retrieve_header(sm.blockstore().clone(), reader.compat(), skip_load).await?;
let (cids, n_records) = load_and_retrieve_header(
sm.blockstore().clone(),
reader.compat(),
skip_load,
chunk_size,
buffer_size,
)
.await?;

info!(
"Loaded {} records from .car file in {}s",
Expand Down Expand Up @@ -138,42 +145,71 @@ where
/// header.
async fn load_and_retrieve_header<DB, R>(
store: DB,
mut reader: R,
reader: R,
skip_load: bool,
chunk_size: ChunkSize,
buffer_size: BufferSize,
) -> anyhow::Result<(Vec<Cid>, Option<usize>)>
where
DB: Blockstore + Send + Sync + 'static,
DB: Blockstore + Send + 'static,
R: AsyncRead + Send + Unpin,
{
let result = if skip_load {
(CarReader::new(&mut reader).await?.header.roots, None)
(CarReader::new(reader).await?.header.roots, None)
} else {
let (roots, n_records) = forest_load_car(store, &mut reader).await?;
let (roots, n_records) = forest_load_car(store, reader, chunk_size, buffer_size).await?;
(roots, Some(n_records))
};

Ok(result)
}

pub async fn forest_load_car<DB, R>(store: DB, reader: R) -> anyhow::Result<(Vec<Cid>, usize)>
fn car_stream<R: futures::AsyncRead + Send + Unpin>(
reader: CarReader<R>,
) -> impl Stream<Item = anyhow::Result<fvm_ipld_car::Block>> {
stream::unfold(reader, |mut reader| async move {
reader
.next_block()
.await
.map_err(anyhow::Error::from)
.transpose()
.map(|result| (result, reader))
})
}

pub async fn forest_load_car<DB, R>(
store: DB,
reader: R,
ChunkSize(chunk_size): ChunkSize,
BufferSize(buffer_size): BufferSize,
) -> anyhow::Result<(Vec<Cid>, usize)>
where
R: futures::AsyncRead + Send + Unpin,
DB: Blockstore + Send + Sync + 'static,
DB: Blockstore + Send + 'static,
{
// 1GB
const BUFFER_CAPCITY_BYTES: usize = 1024 * 1024 * 1024;

let (tx, rx) = flume::bounded(100);
let write_task =
tokio::spawn(async move { store.buffered_write(rx, BUFFER_CAPCITY_BYTES).await });
let mut car_reader = CarReader::new(reader).await?;
let header = std::mem::take(&mut car_reader.header);
let mut n_records = 0;
while let Some(block) = car_reader.next_block().await? {
debug!("Importing block: {}", block.cid);
n_records += 1;
tx.send_async((block.cid, block.data)).await?;
}
drop(tx);
write_task.await??;
Ok((car_reader.header.roots, n_records))

let sink = futures::sink::unfold(
store,
|store, blocks: Vec<fvm_ipld_car::Block>| async move {
tokio::task::spawn_blocking(move || {
store.put_many_keyed(blocks.into_iter().map(|block| (block.cid, block.data)))?;
Ok(store)
})
.await?
},
);

// Stream key-value pairs from the CAR file and commit them in chunks. Try
// to maintain a buffer of a few chunks to avoid read-stalling.
car_stream(car_reader)
.inspect(|_| n_records += 1)
.chunks(chunk_size as usize)
.map(|vec| vec.into_iter().collect::<anyhow::Result<Vec<_>>>())
.forward(sink.buffer(buffer_size as usize))
.await?;

Ok((header.roots, n_records))
}

0 comments on commit 4f0d75f

Please sign in to comment.