Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rm nippyjar from reth-interfaces #7081

Merged
merged 6 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion crates/interfaces/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ workspace = true

[dependencies]
reth-primitives.workspace = true
reth-nippy-jar.workspace = true
reth-rpc-types.workspace = true
reth-network-api.workspace = true
# TODO(onbjerg): We only need this for [BlockBody]
Expand Down
6 changes: 0 additions & 6 deletions crates/interfaces/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,6 @@ pub enum ProviderError {
BlockNumberOverflow(U256),
}

impl From<reth_nippy_jar::NippyJarError> for ProviderError {
fn from(err: reth_nippy_jar::NippyJarError) -> Self {
ProviderError::NippyJar(err.to_string())
}
}

impl From<reth_primitives::fs::FsPathError> for ProviderError {
fn from(err: reth_primitives::fs::FsPathError) -> Self {
ProviderError::FsPathError(err.to_string())
Expand Down
8 changes: 6 additions & 2 deletions crates/static-file/src/segments/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ use reth_primitives::{
},
BlockNumber, StaticFileSegment,
};
use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO, TransactionsProviderExt};
use reth_provider::{
providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt,
};
use std::{ops::RangeInclusive, path::Path};

pub(crate) type Rows<const COLUMNS: usize> = [Vec<Vec<u8>>; COLUMNS];
Expand Down Expand Up @@ -82,7 +84,9 @@ pub(crate) fn prepare_jar<DB: Database, const COLUMNS: usize>(
let dataset = prepare_compression()?;

nippy_jar = nippy_jar.with_zstd(true, 5_000_000);
nippy_jar.prepare_compression(dataset.to_vec())?;
nippy_jar
.prepare_compression(dataset.to_vec())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
nippy_jar
}
Compression::Uncompressed => nippy_jar,
Expand Down
10 changes: 7 additions & 3 deletions crates/storage/db/src/static_file/cursor.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo};
use crate::table::Decompress;
use derive_more::{Deref, DerefMut};
use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor};
use reth_primitives::{static_file::SegmentHeader, B256};
use std::sync::Arc;
Expand All @@ -13,7 +13,10 @@ pub struct StaticFileCursor<'a>(NippyJarCursor<'a, SegmentHeader>);
impl<'a> StaticFileCursor<'a> {
/// Returns a new [`StaticFileCursor`].
pub fn new(jar: &'a NippyJar<SegmentHeader>, reader: Arc<DataReader>) -> ProviderResult<Self> {
Ok(Self(NippyJarCursor::with_reader(jar, reader)?))
Ok(Self(
NippyJarCursor::with_reader(jar, reader)
.map_err(|err| ProviderError::NippyJar(err.to_string()))?,
))
}

/// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of
Expand Down Expand Up @@ -43,7 +46,8 @@ impl<'a> StaticFileCursor<'a> {
}
None => Ok(None),
},
}?;
}
.map_or(None, |v| v);

Ok(row)
}
Expand Down
25 changes: 19 additions & 6 deletions crates/storage/db/src/static_file/generation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
RawKey, RawTable,
};

use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey};
use reth_tracing::tracing::*;
use std::{error::Error as StdError, ops::RangeInclusive};
Expand Down Expand Up @@ -55,15 +55,28 @@ macro_rules! generate_static_file_func {
// Create PHF and Filter if required
if let Some(keys) = keys {
debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list");
nippy_jar.prepare_index(keys, row_count)?;
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
match nippy_jar.prepare_index(keys, row_count) {
Ok(_) => {
debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}
}

// Create compression dictionaries if required
if let Some(data_sets) = dict_compression_set {
debug!(target: "reth::static_file", "Creating compression dictionaries.");
nippy_jar.prepare_compression(data_sets)?;
debug!(target: "reth::static_file", "Compression dictionaries created.");
match nippy_jar.prepare_compression(data_sets){
Ok(_) => {
debug!(target: "reth::static_file", "Compression dictionaries created.");
},
Err(e) => {
return Err(ProviderError::NippyJar(e.to_string()));
}
}

}

// Creates the cursors for the columns
Expand All @@ -88,7 +101,7 @@ macro_rules! generate_static_file_func {

debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file.");

let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?;
let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string()));

debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated.");

Expand Down
22 changes: 14 additions & 8 deletions crates/storage/provider/src/providers/static_file/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,8 @@ impl StaticFileProvider {
pub fn report_metrics(&self) -> ProviderResult<()> {
let Some(metrics) = &self.metrics else { return Ok(()) };

let static_files = iter_static_files(&self.path)?;
let static_files =
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
for (segment, ranges) in static_files {
let mut entries = 0;
let mut size = 0;
Expand Down Expand Up @@ -243,14 +244,15 @@ impl StaticFileProvider {
} else {
let mut jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_block_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}
jar
};

jar.delete()?;
jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?;

let mut segment_max_block = None;
if fixed_block_range.start() > 0 {
Expand All @@ -276,9 +278,10 @@ impl StaticFileProvider {
jar.into()
} else {
let path = self.path.join(segment.filename(fixed_block_range));
let mut jar = NippyJar::load(&path)?;
let mut jar =
NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?;
if self.load_filters {
jar.load_filters()?;
jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?;
}

self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into()
Expand Down Expand Up @@ -353,7 +356,8 @@ impl StaticFileProvider {

let jar = NippyJar::<SegmentHeader>::load(
&self.path.join(segment.filename(&fixed_range)),
)?;
)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;

// Updates the tx index by first removing all entries which have a higher
// block_start than our current static file.
Expand Down Expand Up @@ -416,7 +420,9 @@ impl StaticFileProvider {

tx_index.clear();

for (segment, ranges) in iter_static_files(&self.path)? {
for (segment, ranges) in
iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?
{
// Update last block for each segment
if let Some((block_range, _)) = ranges.last() {
max_block.insert(segment, block_range.end());
Expand Down
11 changes: 8 additions & 3 deletions crates/storage/provider/src/providers/static_file/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut};

mod metrics;

use reth_interfaces::provider::ProviderResult;
use reth_interfaces::provider::{ProviderError, ProviderResult};
use reth_nippy_jar::NippyJar;
use reth_primitives::{static_file::SegmentHeader, StaticFileSegment};
use std::{ops::Deref, sync::Arc};
Expand All @@ -28,8 +28,13 @@ pub struct LoadedJar {

impl LoadedJar {
fn new(jar: NippyJar<SegmentHeader>) -> ProviderResult<Self> {
let mmap_handle = Arc::new(jar.open_data_reader()?);
Ok(Self { jar, mmap_handle })
match jar.open_data_reader() {
Ok(data_reader) => {
let mmap_handle = Arc::new(data_reader);
Ok(Self { jar, mmap_handle })
}
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}
}

/// Returns a clone of the mmap handle that can be used to instantiate a cursor.
Expand Down
31 changes: 23 additions & 8 deletions crates/storage/provider/src/providers/static_file/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ impl StaticFileProviderRW {
block_range.start(),
None,
) {
Ok(provider) => (NippyJar::load(provider.data_path())?, provider.data_path().into()),
Ok(provider) => (
NippyJar::load(provider.data_path())
.map_err(|e| ProviderError::NippyJar(e.to_string()))?,
provider.data_path().into(),
),
Err(ProviderError::MissingStaticFileBlock(_, _)) => {
let path = static_file_provider.directory().join(segment.filename(&block_range));
(create_jar(segment, &path, block_range), path)
Expand All @@ -78,7 +82,7 @@ impl StaticFileProviderRW {
// This static file has been frozen, so we should
Err(ProviderError::FinalizedStaticFile(segment, block))
}
Err(e) => Err(e.into()),
Err(e) => Err(ProviderError::NippyJar(e.to_string())),
}?;

if let Some(metrics) = &metrics {
Expand All @@ -97,7 +101,7 @@ impl StaticFileProviderRW {
let start = Instant::now();

// Commits offsets and new user_header to disk
self.writer.commit()?;
self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?;

if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
Expand Down Expand Up @@ -128,7 +132,9 @@ impl StaticFileProviderRW {
let start = Instant::now();

// Commits offsets and new user_header to disk
self.writer.commit_without_sync_all()?;
self.writer
.commit_without_sync_all()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;

if let Some(metrics) = &self.metrics {
metrics.record_segment_operation(
Expand Down Expand Up @@ -249,11 +255,16 @@ impl StaticFileProviderRW {
self.writer = writer;
self.data_path = data_path;

NippyJar::<SegmentHeader>::load(&previous_snap)?.delete()?;
NippyJar::<SegmentHeader>::load(&previous_snap)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?
.delete()
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
} else {
// Update `SegmentHeader`
self.writer.user_header_mut().prune(len);
self.writer.prune_rows(len as usize)?;
self.writer
.prune_rows(len as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
break
}

Expand All @@ -263,7 +274,9 @@ impl StaticFileProviderRW {
self.writer.user_header_mut().prune(num_rows);

// Truncate data
self.writer.prune_rows(num_rows as usize)?;
self.writer
.prune_rows(num_rows as usize)
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
num_rows = 0;
}
}
Expand All @@ -285,7 +298,9 @@ impl StaticFileProviderRW {
self.buf.clear();
column.to_compact(&mut self.buf);

self.writer.append_column(Some(Ok(&self.buf)))?;
self.writer
.append_column(Some(Ok(&self.buf)))
.map_err(|e| ProviderError::NippyJar(e.to_string()))?;
Ok(())
}

Expand Down
Loading