diff --git a/Cargo.lock b/Cargo.lock index 0d600a87970c..9783f303de5e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5951,7 +5951,6 @@ dependencies = [ "rand 0.8.5", "reth-eth-wire", "reth-network-api", - "reth-nippy-jar", "reth-primitives", "reth-rpc-types", "secp256k1 0.27.0", diff --git a/crates/interfaces/Cargo.toml b/crates/interfaces/Cargo.toml index fafcdaf88e62..971e80ae0185 100644 --- a/crates/interfaces/Cargo.toml +++ b/crates/interfaces/Cargo.toml @@ -12,7 +12,6 @@ workspace = true [dependencies] reth-primitives.workspace = true -reth-nippy-jar.workspace = true reth-rpc-types.workspace = true reth-network-api.workspace = true # TODO(onbjerg): We only need this for [BlockBody] diff --git a/crates/interfaces/src/provider.rs b/crates/interfaces/src/provider.rs index b72c92304fbc..7e5af713f784 100644 --- a/crates/interfaces/src/provider.rs +++ b/crates/interfaces/src/provider.rs @@ -130,12 +130,6 @@ pub enum ProviderError { BlockNumberOverflow(U256), } -impl From for ProviderError { - fn from(err: reth_nippy_jar::NippyJarError) -> Self { - ProviderError::NippyJar(err.to_string()) - } -} - impl From for ProviderError { fn from(err: reth_primitives::fs::FsPathError) -> Self { ProviderError::FsPathError(err.to_string()) diff --git a/crates/static-file/src/segments/mod.rs b/crates/static-file/src/segments/mod.rs index 7cad895aed7b..be3ca0e716a0 100644 --- a/crates/static-file/src/segments/mod.rs +++ b/crates/static-file/src/segments/mod.rs @@ -21,7 +21,9 @@ use reth_primitives::{ }, BlockNumber, StaticFileSegment, }; -use reth_provider::{providers::StaticFileProvider, DatabaseProviderRO, TransactionsProviderExt}; +use reth_provider::{ + providers::StaticFileProvider, DatabaseProviderRO, ProviderError, TransactionsProviderExt, +}; use std::{ops::RangeInclusive, path::Path}; pub(crate) type Rows = [Vec>; COLUMNS]; @@ -82,7 +84,9 @@ pub(crate) fn prepare_jar( let dataset = prepare_compression()?; nippy_jar = nippy_jar.with_zstd(true, 5_000_000); - nippy_jar.prepare_compression(dataset.to_vec())?; + nippy_jar + .prepare_compression(dataset.to_vec()) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; nippy_jar } Compression::Uncompressed => nippy_jar, diff --git a/crates/storage/db/src/static_file/cursor.rs b/crates/storage/db/src/static_file/cursor.rs index 237cbe4518d8..89337b56e12e 100644 --- a/crates/storage/db/src/static_file/cursor.rs +++ b/crates/storage/db/src/static_file/cursor.rs @@ -1,7 +1,7 @@ use super::mask::{ColumnSelectorOne, ColumnSelectorThree, ColumnSelectorTwo}; use crate::table::Decompress; use derive_more::{Deref, DerefMut}; -use reth_interfaces::provider::ProviderResult; +use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::{DataReader, NippyJar, NippyJarCursor}; use reth_primitives::{static_file::SegmentHeader, B256}; use std::sync::Arc; @@ -13,7 +13,10 @@ pub struct StaticFileCursor<'a>(NippyJarCursor<'a, SegmentHeader>); impl<'a> StaticFileCursor<'a> { /// Returns a new [`StaticFileCursor`]. pub fn new(jar: &'a NippyJar, reader: Arc) -> ProviderResult { - Ok(Self(NippyJarCursor::with_reader(jar, reader)?)) + Ok(Self( + NippyJarCursor::with_reader(jar, reader) + .map_err(|err| ProviderError::NippyJar(err.to_string()))?, + )) } /// Returns the current `BlockNumber` or `TxNumber` of the cursor depending on the kind of @@ -43,7 +46,8 @@ impl<'a> StaticFileCursor<'a> { } None => Ok(None), }, - }?; + } + .map_or(None, |v| v); Ok(row) } diff --git a/crates/storage/db/src/static_file/generation.rs b/crates/storage/db/src/static_file/generation.rs index dd9df5c0e936..50db32adb2ae 100644 --- a/crates/storage/db/src/static_file/generation.rs +++ b/crates/storage/db/src/static_file/generation.rs @@ -5,7 +5,7 @@ use crate::{ RawKey, RawTable, }; -use reth_interfaces::provider::ProviderResult; +use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::{ColumnResult, NippyJar, NippyJarHeader, PHFKey}; use reth_tracing::tracing::*; use std::{error::Error as StdError, ops::RangeInclusive}; @@ -55,15 +55,28 @@ macro_rules! generate_static_file_func { // Create PHF and Filter if required if let Some(keys) = keys { debug!(target: "reth::static_file", "Calculating Filter, PHF and offset index list"); - nippy_jar.prepare_index(keys, row_count)?; - debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated."); + match nippy_jar.prepare_index(keys, row_count) { + Ok(_) => { + debug!(target: "reth::static_file", "Filter, PHF and offset index list calculated."); + }, + Err(e) => { + return Err(ProviderError::NippyJar(e.to_string())); + } + } } // Create compression dictionaries if required if let Some(data_sets) = dict_compression_set { debug!(target: "reth::static_file", "Creating compression dictionaries."); - nippy_jar.prepare_compression(data_sets)?; - debug!(target: "reth::static_file", "Compression dictionaries created."); + match nippy_jar.prepare_compression(data_sets){ + Ok(_) => { + debug!(target: "reth::static_file", "Compression dictionaries created."); + }, + Err(e) => { + return Err(ProviderError::NippyJar(e.to_string())); + } + } + } // Creates the cursors for the columns @@ -88,7 +101,7 @@ macro_rules! generate_static_file_func { debug!(target: "reth::static_file", jar=?nippy_jar, "Generating static file."); - let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64)?; + let nippy_jar = nippy_jar.freeze(col_iterators.into_iter().chain(additional).collect(), row_count as u64).map_err(|e| ProviderError::NippyJar(e.to_string())); debug!(target: "reth::static_file", jar=?nippy_jar, "Static file generated."); diff --git a/crates/storage/provider/src/providers/static_file/manager.rs b/crates/storage/provider/src/providers/static_file/manager.rs index 5a27a2a98646..65e61dc44742 100644 --- a/crates/storage/provider/src/providers/static_file/manager.rs +++ b/crates/storage/provider/src/providers/static_file/manager.rs @@ -118,7 +118,8 @@ impl StaticFileProvider { pub fn report_metrics(&self) -> ProviderResult<()> { let Some(metrics) = &self.metrics else { return Ok(()) }; - let static_files = iter_static_files(&self.path)?; + let static_files = + iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))?; for (segment, ranges) in static_files { let mut entries = 0; let mut size = 0; @@ -243,14 +244,15 @@ impl StaticFileProvider { } else { let mut jar = NippyJar::::load( &self.path.join(segment.filename(&fixed_block_range)), - )?; + ) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; if self.load_filters { - jar.load_filters()?; + jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?; } jar }; - jar.delete()?; + jar.delete().map_err(|e| ProviderError::NippyJar(e.to_string()))?; let mut segment_max_block = None; if fixed_block_range.start() > 0 { @@ -276,9 +278,10 @@ impl StaticFileProvider { jar.into() } else { let path = self.path.join(segment.filename(fixed_block_range)); - let mut jar = NippyJar::load(&path)?; + let mut jar = + NippyJar::load(&path).map_err(|e| ProviderError::NippyJar(e.to_string()))?; if self.load_filters { - jar.load_filters()?; + jar.load_filters().map_err(|e| ProviderError::NippyJar(e.to_string()))?; } self.map.entry(key).insert(LoadedJar::new(jar)?).downgrade().into() @@ -353,7 +356,8 @@ impl StaticFileProvider { let jar = NippyJar::::load( &self.path.join(segment.filename(&fixed_range)), - )?; + ) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; // Updates the tx index by first removing all entries which have a higher // block_start than our current static file. @@ -416,7 +420,9 @@ impl StaticFileProvider { tx_index.clear(); - for (segment, ranges) in iter_static_files(&self.path)? { + for (segment, ranges) in + iter_static_files(&self.path).map_err(|e| ProviderError::NippyJar(e.to_string()))? + { // Update last block for each segment if let Some((block_range, _)) = ranges.last() { max_block.insert(segment, block_range.end()); diff --git a/crates/storage/provider/src/providers/static_file/mod.rs b/crates/storage/provider/src/providers/static_file/mod.rs index be1db10b15bc..46a1b7453ec1 100644 --- a/crates/storage/provider/src/providers/static_file/mod.rs +++ b/crates/storage/provider/src/providers/static_file/mod.rs @@ -9,7 +9,7 @@ pub use writer::{StaticFileProviderRW, StaticFileProviderRWRefMut}; mod metrics; -use reth_interfaces::provider::ProviderResult; +use reth_interfaces::provider::{ProviderError, ProviderResult}; use reth_nippy_jar::NippyJar; use reth_primitives::{static_file::SegmentHeader, StaticFileSegment}; use std::{ops::Deref, sync::Arc}; @@ -28,8 +28,13 @@ pub struct LoadedJar { impl LoadedJar { fn new(jar: NippyJar) -> ProviderResult { - let mmap_handle = Arc::new(jar.open_data_reader()?); - Ok(Self { jar, mmap_handle }) + match jar.open_data_reader() { + Ok(data_reader) => { + let mmap_handle = Arc::new(data_reader); + Ok(Self { jar, mmap_handle }) + } + Err(e) => Err(ProviderError::NippyJar(e.to_string())), + } } /// Returns a clone of the mmap handle that can be used to instantiate a cursor. diff --git a/crates/storage/provider/src/providers/static_file/writer.rs b/crates/storage/provider/src/providers/static_file/writer.rs index a34a93317bf8..dd7ee2fc417d 100644 --- a/crates/storage/provider/src/providers/static_file/writer.rs +++ b/crates/storage/provider/src/providers/static_file/writer.rs @@ -64,7 +64,11 @@ impl StaticFileProviderRW { block_range.start(), None, ) { - Ok(provider) => (NippyJar::load(provider.data_path())?, provider.data_path().into()), + Ok(provider) => ( + NippyJar::load(provider.data_path()) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?, + provider.data_path().into(), + ), Err(ProviderError::MissingStaticFileBlock(_, _)) => { let path = static_file_provider.directory().join(segment.filename(&block_range)); (create_jar(segment, &path, block_range), path) @@ -78,7 +82,7 @@ impl StaticFileProviderRW { // This static file has been frozen, so we should Err(ProviderError::FinalizedStaticFile(segment, block)) } - Err(e) => Err(e.into()), + Err(e) => Err(ProviderError::NippyJar(e.to_string())), }?; if let Some(metrics) = &metrics { @@ -97,7 +101,7 @@ impl StaticFileProviderRW { let start = Instant::now(); // Commits offsets and new user_header to disk - self.writer.commit()?; + self.writer.commit().map_err(|e| ProviderError::NippyJar(e.to_string()))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( @@ -128,7 +132,9 @@ impl StaticFileProviderRW { let start = Instant::now(); // Commits offsets and new user_header to disk - self.writer.commit_without_sync_all()?; + self.writer + .commit_without_sync_all() + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; if let Some(metrics) = &self.metrics { metrics.record_segment_operation( @@ -249,11 +255,16 @@ impl StaticFileProviderRW { self.writer = writer; self.data_path = data_path; - NippyJar::::load(&previous_snap)?.delete()?; + NippyJar::::load(&previous_snap) + .map_err(|e| ProviderError::NippyJar(e.to_string()))? + .delete() + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; } else { // Update `SegmentHeader` self.writer.user_header_mut().prune(len); - self.writer.prune_rows(len as usize)?; + self.writer + .prune_rows(len as usize) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; break } @@ -263,7 +274,9 @@ impl StaticFileProviderRW { self.writer.user_header_mut().prune(num_rows); // Truncate data - self.writer.prune_rows(num_rows as usize)?; + self.writer + .prune_rows(num_rows as usize) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; num_rows = 0; } } @@ -285,7 +298,9 @@ impl StaticFileProviderRW { self.buf.clear(); column.to_compact(&mut self.buf); - self.writer.append_column(Some(Ok(&self.buf)))?; + self.writer + .append_column(Some(Ok(&self.buf))) + .map_err(|e| ProviderError::NippyJar(e.to_string()))?; Ok(()) }