Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: merge main into breaking-changes #514

Merged
merged 5 commits into from
Dec 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 71 additions & 5 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@

//! Factory for different kinds sst builder and reader.

use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
ops::Range,
sync::{Arc, RwLock},
};

use async_trait::async_trait;
use bytes::Bytes;
use common_types::projected_schema::ProjectedSchema;
use common_util::runtime::Runtime;
use common_util::{
error::{GenericError, GenericResult},
runtime::Runtime,
};
use object_store::{ObjectStoreRef, Path};
use table_engine::predicate::PredicateRef;

use crate::{
sst::{
builder::SstBuilder,
meta_cache::MetaCacheRef,
parquet::{builder::ParquetSstBuilder, AsyncParquetReader, ThreadedReader},
parquet::{
async_reader::AsyncFileReader, builder::ParquetSstBuilder, AsyncParquetReader,
ThreadedReader,
},
reader::SstReader,
},
table_options::Compression,
Expand Down Expand Up @@ -96,6 +108,58 @@ pub struct SstBuilderOptions {
pub compression: Compression,
}

pub struct FileReaderOnObjectStore {
path: Path,
store: ObjectStoreRef,
cached_file_size: RwLock<Option<usize>>,
}

impl FileReaderOnObjectStore {
pub fn new(path: Path, store: ObjectStoreRef) -> Self {
Self {
path,
store,
cached_file_size: RwLock::new(None),
}
}
}

#[async_trait]
impl AsyncFileReader for FileReaderOnObjectStore {
async fn file_size(&self) -> GenericResult<usize> {
// check cached filed_size first
{
let file_size = self.cached_file_size.read().unwrap();
if let Some(s) = file_size.as_ref() {
return Ok(*s);
}
}

// fetch the size from the underlying store
let head = self
.store
.head(&self.path)
.await
.map_err(|e| Box::new(e) as GenericError)?;
*self.cached_file_size.write().unwrap() = Some(head.size);
Ok(head.size)
}

async fn get_byte_range(&self, range: Range<usize>) -> GenericResult<Bytes> {
self.store
.get_range(&self.path, range)
.await
.map_err(|e| Box::new(e) as _)
}

async fn get_byte_ranges(&self, ranges: &[Range<usize>]) -> GenericResult<Vec<Bytes>> {
self.store
.get_ranges(&self.path, ranges)
.await
.map_err(|e| Box::new(e) as _)
}
}

#[derive(Debug, Default)]
pub struct FactoryImpl;

Expand All @@ -106,11 +170,13 @@ impl Factory for FactoryImpl {
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
) -> Option<Box<dyn SstReader + Send + 'a>> {
let store = store_picker.pick_by_freq(options.frequency).clone();
let file_reader = FileReaderOnObjectStore::new(path.clone(), store);
let parquet_reader = AsyncParquetReader::new(path, Arc::new(file_reader), options);
// TODO: Currently, we only have one sst format, and we have to choose right
// reader for sst according to its real format in the future.
let reader = AsyncParquetReader::new(path, store_picker, options);
let reader = ThreadedReader::new(
reader,
parquet_reader,
options.runtime.clone(),
options.background_read_parallelism,
);
Expand Down
142 changes: 97 additions & 45 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ use common_types::{
projected_schema::{ProjectedSchema, RowProjector},
record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey},
};
use common_util::{runtime::Runtime, time::InstantExt};
use datafusion::datasource::file_format;
use common_util::{error::GenericResult, runtime::Runtime, time::InstantExt};
use datafusion::error::DataFusionError as DfError;
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use log::{debug, error, info, warn};
use object_store::{ObjectMeta, ObjectStoreRef, Path};
use object_store::Path;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask},
file::metadata::RowGroupMetaData,
arrow::{
async_reader::AsyncFileReader as AsyncParquetFileReader, ParquetRecordBatchStreamBuilder,
ProjectionMask,
},
file::{
footer,
metadata::{ParquetMetaData, RowGroupMetaData},
},
};
use parquet_ext::ParquetMetaDataRef;
use prometheus::local::LocalHistogram;
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::{
sst::{
factory::{ObjectStorePickerRef, ReadFrequency, SstReaderOptions},
factory::{ReadFrequency, SstReaderOptions},
file::{BloomFilter, SstMetaData},
meta_cache::{MetaCacheRef, MetaData},
metrics,
Expand All @@ -46,11 +51,67 @@ use crate::{

type SendableRecordBatchStream = Pin<Box<dyn Stream<Item = Result<ArrowRecordBatch>> + Send>>;

pub type AsyncFileReaderRef = Arc<dyn AsyncFileReader>;

#[async_trait]
pub trait AsyncFileReader: Send + Sync {
async fn file_size(&self) -> GenericResult<usize>;

async fn get_byte_range(&self, range: Range<usize>) -> GenericResult<Bytes>;

async fn get_byte_ranges(&self, ranges: &[Range<usize>]) -> GenericResult<Vec<Bytes>>;
}

/// Fetch and parse [`ParquetMetadata`] from the file reader.
///
/// Referring to: https://github.com/apache/arrow-datafusion/blob/ac2e5d15e5452e83c835d793a95335e87bf35569/datafusion/core/src/datasource/file_format/parquet.rs#L390-L449
async fn fetch_parquet_metadata_from_file_reader(
file_reader: &dyn AsyncFileReader,
) -> std::result::Result<ParquetMetaData, DfError> {
const FOOTER_LEN: usize = 8;

let file_size = file_reader.file_size().await?;

if file_size < FOOTER_LEN {
let err_msg = format!("file size of {} is less than footer", file_size);
return Err(DfError::Execution(err_msg));
}

let footer_start = file_size - FOOTER_LEN;

let footer_bytes = file_reader
.get_byte_range(footer_start..file_size)
.await
.map_err(|e| DfError::External(e))?;

assert_eq!(footer_bytes.len(), FOOTER_LEN);
let mut footer = [0; FOOTER_LEN];
footer.copy_from_slice(&footer_bytes);

let metadata_len = footer::decode_footer(&footer)?;

if file_size < metadata_len + FOOTER_LEN {
let err_msg = format!(
"file size of {} is smaller than footer + metadata {}",
file_size,
metadata_len + FOOTER_LEN
);
return Err(DfError::Execution(err_msg));
}

let metadata_start = file_size - metadata_len - FOOTER_LEN;
let metadata_bytes = file_reader
.get_byte_range(metadata_start..footer_start)
.await?;

Ok(footer::decode_metadata(&metadata_bytes)?)
}

pub struct Reader<'a> {
/// The path where the data is persisted.
path: &'a Path,
/// The storage where the data is persist.
store: &'a ObjectStoreRef,
file_reader: AsyncFileReaderRef,
projected_schema: ProjectedSchema,
meta_cache: Option<MetaCacheRef>,
predicate: PredicateRef,
Expand All @@ -69,17 +130,16 @@ pub struct Reader<'a> {
impl<'a> Reader<'a> {
pub fn new(
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
file_reader: AsyncFileReaderRef,
options: &SstReaderOptions,
) -> Self {
let batch_size = options.read_batch_row_num;
let parallelism_options =
ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group);
let store = store_picker.pick_by_freq(options.frequency);

Self {
path,
store,
file_reader,
projected_schema: options.projected_schema.clone(),
meta_cache: options.meta_cache.clone(),
predicate: options.predicate.clone(),
Expand Down Expand Up @@ -160,7 +220,7 @@ impl<'a> Reader<'a> {
let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.row_projector.as_ref().unwrap();
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone());

// Get target row groups.
let filtered_row_groups = self.filter_row_groups(
Expand Down Expand Up @@ -245,17 +305,11 @@ impl<'a> Reader<'a> {
Ok(())
}

async fn load_meta_data_from_storage(
&self,
object_meta: &ObjectMeta,
) -> Result<ParquetMetaDataRef> {
let meta_data =
file_format::parquet::fetch_parquet_metadata(self.store.as_ref(), object_meta, None)
.await
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?;

Ok(Arc::new(meta_data))
async fn load_meta_data_from_storage(&self) -> Result<ParquetMetaData> {
fetch_parquet_metadata_from_file_reader(self.file_reader.as_ref())
.await
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)
}

fn need_update_cache(&self) -> bool {
Expand All @@ -278,15 +332,11 @@ impl<'a> Reader<'a> {
let empty_predicate = self.predicate.exprs().is_empty();

let meta_data = {
let object_meta = self
.store
.head(self.path)
.await
.context(ObjectStoreError {})?;
let parquet_meta_data = self.load_meta_data_from_storage(&object_meta).await?;
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_bloom_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, object_meta.size, ignore_bloom_filter)
let file_size = self.file_reader.file_size().await.context(DecodeSstMeta)?;
MetaData::try_new(&parquet_meta_data, file_size, ignore_bloom_filter)
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?
};
Expand Down Expand Up @@ -352,18 +402,16 @@ struct ReaderMetrics {
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
path: Path,
struct ParquetFileReaderAdapter {
file_reader: AsyncFileReaderRef,
meta_data: MetaData,
metrics: ReaderMetrics,
}

impl ObjectStoreReader {
fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self {
impl ParquetFileReaderAdapter {
fn new(file_reader: AsyncFileReaderRef, meta_data: MetaData) -> Self {
Self {
storage,
path,
file_reader,
meta_data,
metrics: ReaderMetrics {
bytes_scanned: 0,
Expand All @@ -373,20 +421,24 @@ impl ObjectStoreReader {
}
}

impl Drop for ObjectStoreReader {
impl Drop for ParquetFileReaderAdapter {
fn drop(&mut self) {
info!("ObjectStoreReader dropped, metrics:{:?}", self.metrics);
info!(
"ParquetFileReaderAdapter is dropped, metrics:{:?}",
self.metrics
);
}
}

impl AsyncFileReader for ObjectStoreReader {
impl AsyncParquetFileReader for ParquetFileReaderAdapter {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
self.storage
.get_range(&self.path, range)

self.file_reader
.get_byte_range(range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{}",
Expand All @@ -406,11 +458,11 @@ impl AsyncFileReader for ObjectStoreReader {
.observe((range.end - range.start) as f64);
}
async move {
self.storage
.get_ranges(&self.path, &ranges)
self.file_reader
.get_byte_ranges(&ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{}",
"Failed to fetch ranges from underlying reader, err:{}",
e
))
})
Expand Down
Loading