Skip to content

Commit

Permalink
feat: impl parallel get_byte_ranges for ObjectStoreReader (#450)
Browse files Browse the repository at this point in the history
* feat: impl parallel get_byte_ranges for ObjectStoreReader

* chore: add sst_get_range_length_histogram in ObjectStoreReader

* fix ci

* refactor by CR
  • Loading branch information
chunshao90 authored Dec 7, 2022
1 parent be10d88 commit 380a327
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 3 deletions.
14 changes: 14 additions & 0 deletions analytic_engine/src/sst/metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// Copyright 2022 CeresDB Project Authors. Licensed under Apache-2.0.

use lazy_static::lazy_static;
use prometheus::{exponential_buckets, register_histogram, Histogram};

lazy_static! {
// Histogram:
// Buckets: 100B,200B,400B,...,2KB
pub static ref SST_GET_RANGE_HISTOGRAM: Histogram = register_histogram!(
"sst_get_range_length",
"Histogram for sst get range length",
exponential_buckets(100.0, 2.0, 5).unwrap()
).unwrap();
}
1 change: 1 addition & 0 deletions analytic_engine/src/sst/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ pub mod factory;
pub mod file;
pub mod manager;
pub mod meta_cache;
pub mod metrics;
pub mod parquet;
pub mod reader;
38 changes: 35 additions & 3 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use parquet::{
file::metadata::RowGroupMetaData,
};
use parquet_ext::ParquetMetaDataRef;
use prometheus::local::LocalHistogram;
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand All @@ -36,6 +37,7 @@ use crate::{
factory::SstReaderOptions,
file::{BloomFilter, SstMetaData},
meta_cache::{MetaCacheRef, MetaData},
metrics,
parquet::{encoding::ParquetDecoder, row_group_filter::RowGroupFilter},
reader::{error::*, Result, SstReader},
},
Expand Down Expand Up @@ -196,9 +198,10 @@ impl<'a> Reader<'a> {
}
}

#[derive(Debug, Default)]
#[derive(Debug)]
struct ReaderMetrics {
bytes_scanned: usize,
sst_get_range_length_histogram: LocalHistogram,
}

struct ObjectStoreReader {
Expand All @@ -214,7 +217,10 @@ impl ObjectStoreReader {
storage,
path,
meta_data,
metrics: Default::default(),
metrics: ReaderMetrics {
bytes_scanned: 0,
sst_get_range_length_histogram: metrics::SST_GET_RANGE_HISTOGRAM.local(),
},
}
}
}
Expand All @@ -228,17 +234,43 @@ impl Drop for ObjectStoreReader {
impl AsyncFileReader for ObjectStoreReader {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
self.storage
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"ObjectStoreReader::get_bytes error: {}",
"Failed to fetch range from object store, err:{}",
e
))
})
.boxed()
}

fn get_byte_ranges(
&mut self,
ranges: Vec<Range<usize>>,
) -> BoxFuture<'_, parquet::errors::Result<Vec<Bytes>>> {
for range in &ranges {
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
}
async move {
self.storage
.get_ranges(&self.path, &ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{}",
e
))
})
.await
}
.boxed()
}

fn get_metadata(
&mut self,
) -> BoxFuture<'_, parquet::errors::Result<Arc<parquet::file::metadata::ParquetMetaData>>> {
Expand Down

0 comments on commit 380a327

Please sign in to comment.