Skip to content

Commit

Permalink
pass file size from file handler
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Dec 27, 2022
1 parent 30a1d7b commit b4bd39e
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 36 deletions.
3 changes: 2 additions & 1 deletion analytic_engine/src/row_iter/record_batch_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -291,8 +291,9 @@ pub async fn stream_from_sst_file(
) -> Result<SequencedRecordBatchStream> {
sst_file.read_meter().mark();
let path = sst_util::new_sst_file_path(space_id, table_id, sst_file.id());

let mut sst_reader = sst_factory
.new_sst_reader(sst_reader_options, &path, store_picker)
.new_sst_reader(sst_reader_options, &path, store_picker, sst_file.size())
.with_context(|| SstReaderNotFound {
options: sst_reader_options.clone(),
})?;
Expand Down
38 changes: 9 additions & 29 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,12 @@

//! Factory for different kinds sst builder and reader.
use std::{
fmt::Debug,
ops::Range,
sync::{Arc, RwLock},
};
use std::{fmt::Debug, ops::Range, sync::Arc};

use async_trait::async_trait;
use bytes::Bytes;
use common_types::projected_schema::ProjectedSchema;
use common_util::{
error::{GenericError, GenericResult},
runtime::Runtime,
};
use common_util::{error::GenericResult, runtime::Runtime};
use object_store::{ObjectStoreRef, Path};
use table_engine::predicate::PredicateRef;

Expand Down Expand Up @@ -60,6 +53,7 @@ pub trait Factory: Send + Sync + Debug {
options: &SstReaderOptions,
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
file_size: u64,
) -> Option<Box<dyn SstReader + Send + 'a>>;

fn new_sst_builder<'a>(
Expand Down Expand Up @@ -111,38 +105,23 @@ pub struct SstBuilderOptions {
pub struct FileReaderOnObjectStore {
path: Path,
store: ObjectStoreRef,
cached_file_size: RwLock<Option<usize>>,
file_size: u64,
}

impl FileReaderOnObjectStore {
pub fn new(path: Path, store: ObjectStoreRef) -> Self {
pub fn new(path: Path, store: ObjectStoreRef, file_size: u64) -> Self {
Self {
path,
store,
cached_file_size: RwLock::new(None),
file_size,
}
}
}

#[async_trait]
impl AsyncFileReader for FileReaderOnObjectStore {
async fn file_size(&self) -> GenericResult<usize> {
// check cached filed_size first
{
let file_size = self.cached_file_size.read().unwrap();
if let Some(s) = file_size.as_ref() {
return Ok(*s);
}
}

// fetch the size from the underlying store
let head = self
.store
.head(&self.path)
.await
.map_err(|e| Box::new(e) as GenericError)?;
*self.cached_file_size.write().unwrap() = Some(head.size);
Ok(head.size)
usize::try_from(self.file_size).map_err(|e| Box::new(e) as _)
}

async fn get_byte_range(&self, range: Range<usize>) -> GenericResult<Bytes> {
Expand All @@ -169,9 +148,10 @@ impl Factory for FactoryImpl {
options: &SstReaderOptions,
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
file_size: u64,
) -> Option<Box<dyn SstReader + Send + 'a>> {
let store = store_picker.pick_by_freq(options.frequency).clone();
let file_reader = FileReaderOnObjectStore::new(path.clone(), store);
let file_reader = FileReaderOnObjectStore::new(path.clone(), store, file_size);
let parquet_reader = AsyncParquetReader::new(path, Arc::new(file_reader), options);
// TODO: Currently, we only have one sst format, and we have to choose right
// reader for sst according to its real format in the future.
Expand Down
7 changes: 7 additions & 0 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -357,9 +357,16 @@ mod tests {
};

let mut reader: Box<dyn SstReader + Send> = {
let meta = store_picker
.default_store()
.head(&sst_file_path)
.await
.unwrap();

let file_reader = FileReaderOnObjectStore::new(
sst_file_path.clone(),
store_picker.default_store().clone(),
meta.size as u64,
);
let mut reader = AsyncParquetReader::new(
&sst_file_path,
Expand Down
13 changes: 10 additions & 3 deletions benchmarks/src/sst_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ impl SstBench {

let sst_factory = FactoryImpl;
let store_picker: ObjectStorePickerRef = Arc::new(self.store.clone());
let mut sst_reader = sst_factory
.new_sst_reader(&self.sst_reader_options, &sst_path, &store_picker)
.unwrap();

self.runtime.block_on(async {
let meta = store_picker.default_store().head(&sst_path).await.unwrap();

let mut sst_reader = sst_factory
.new_sst_reader(
&self.sst_reader_options,
&sst_path,
&store_picker,
meta.size as u64,
)
.unwrap();
let begin_instant = Instant::now();
let mut sst_stream = sst_reader.read().await.unwrap();

Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/sst_tools.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,14 @@ async fn sst_to_record_batch_stream(
) -> RecordBatchStream {
let sst_factory = FactoryImpl;
let store_picker: ObjectStorePickerRef = Arc::new(store.clone());
let meta = store_picker.default_store().head(input_path).await.unwrap();
let mut sst_reader = sst_factory
.new_sst_reader(sst_reader_options, input_path, &store_picker)
.new_sst_reader(
sst_reader_options,
input_path,
&store_picker,
meta.size as u64,
)
.unwrap();

let sst_stream = sst_reader.read().await.unwrap();
Expand Down
8 changes: 7 additions & 1 deletion benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,14 @@ pub async fn load_sst_to_memtable(
};
let sst_factory = FactoryImpl;
let store_picker: ObjectStorePickerRef = Arc::new(store.clone());
let meta = store_picker.default_store().head(sst_path).await.unwrap();
let mut sst_reader = sst_factory
.new_sst_reader(&sst_reader_options, sst_path, &store_picker)
.new_sst_reader(
&sst_reader_options,
sst_path,
&store_picker,
meta.size as u64,
)
.unwrap();

let mut sst_stream = sst_reader.read().await.unwrap();
Expand Down
8 changes: 7 additions & 1 deletion tools/src/bin/sst-convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,14 @@ async fn run(args: Args, runtime: Arc<Runtime>) -> Result<()> {
num_rows_per_row_group: 8192,
};
let store_picker: ObjectStorePickerRef = Arc::new(store);
let meta = store_picker
.default_store()
.head(&input_path)
.await
.unwrap();

let mut reader = factory
.new_sst_reader(&reader_opts, &input_path, &store_picker)
.new_sst_reader(&reader_opts, &input_path, &store_picker, meta.size as u64)
.expect("no sst reader found");

let builder_opts = SstBuilderOptions {
Expand Down

0 comments on commit b4bd39e

Please sign in to comment.