Skip to content

Commit

Permalink
chore: merge main into breaking-changes (#514)
Browse files Browse the repository at this point in the history
* refactor: separate object store from parquet sst async reader (#503)

* refactor: separate object store from parquet sst async reader

* fix errors in unit tests

* use const FOOTER_LEN

* add GenericResult

* refactor: recovery in standalone mode (#414)

* refactor recovery in standalone mode.

* make getting table infos dynamically.

* define `LocalTablesRecoverer`.

* address CR.

* feat: support parse key partition (#506)

* add key partition parser

* refactor the codes

* add unit test

* fix show create table for partition table

* address CR issues

* supplement the error msg when fail to parse partition key

* fix unit tests

Co-authored-by: CooooolFrog <zuliangwanghust@gmail.com>

* chore: define remote_engine grpc service (#505)

Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>

* feat: impl key partition rule (#507)

* impl key partition rule(draft).

* refactor to more extensible and testable version.

* add tests for key partition.

* add df adapter and partition rule's building.

* address CR.

Co-authored-by: WEI Xikai <ShiKaiWi@users.noreply.github.com>
Co-authored-by: kamille <34352236+Rachelint@users.noreply.github.com>
Co-authored-by: CooooolFrog <zuliangwanghust@gmail.com>
Co-authored-by: chunshao.rcs <chunshao.rcs@antgroup.com>
  • Loading branch information
5 people authored Dec 27, 2022
1 parent 28a2ae5 commit 4701a57
Show file tree
Hide file tree
Showing 31 changed files with 1,801 additions and 228 deletions.
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

76 changes: 71 additions & 5 deletions analytic_engine/src/sst/factory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,30 @@

//! Factory for different kinds sst builder and reader.

use std::{fmt::Debug, sync::Arc};
use std::{
fmt::Debug,
ops::Range,
sync::{Arc, RwLock},
};

use async_trait::async_trait;
use bytes::Bytes;
use common_types::projected_schema::ProjectedSchema;
use common_util::runtime::Runtime;
use common_util::{
error::{GenericError, GenericResult},
runtime::Runtime,
};
use object_store::{ObjectStoreRef, Path};
use table_engine::predicate::PredicateRef;

use crate::{
sst::{
builder::SstBuilder,
meta_cache::MetaCacheRef,
parquet::{builder::ParquetSstBuilder, AsyncParquetReader, ThreadedReader},
parquet::{
async_reader::AsyncFileReader, builder::ParquetSstBuilder, AsyncParquetReader,
ThreadedReader,
},
reader::SstReader,
},
table_options::Compression,
Expand Down Expand Up @@ -96,6 +108,58 @@ pub struct SstBuilderOptions {
pub compression: Compression,
}

pub struct FileReaderOnObjectStore {
path: Path,
store: ObjectStoreRef,
cached_file_size: RwLock<Option<usize>>,
}

impl FileReaderOnObjectStore {
pub fn new(path: Path, store: ObjectStoreRef) -> Self {
Self {
path,
store,
cached_file_size: RwLock::new(None),
}
}
}

#[async_trait]
impl AsyncFileReader for FileReaderOnObjectStore {
async fn file_size(&self) -> GenericResult<usize> {
// check cached filed_size first
{
let file_size = self.cached_file_size.read().unwrap();
if let Some(s) = file_size.as_ref() {
return Ok(*s);
}
}

// fetch the size from the underlying store
let head = self
.store
.head(&self.path)
.await
.map_err(|e| Box::new(e) as GenericError)?;
*self.cached_file_size.write().unwrap() = Some(head.size);
Ok(head.size)
}

async fn get_byte_range(&self, range: Range<usize>) -> GenericResult<Bytes> {
self.store
.get_range(&self.path, range)
.await
.map_err(|e| Box::new(e) as _)
}

async fn get_byte_ranges(&self, ranges: &[Range<usize>]) -> GenericResult<Vec<Bytes>> {
self.store
.get_ranges(&self.path, ranges)
.await
.map_err(|e| Box::new(e) as _)
}
}

#[derive(Debug, Default)]
pub struct FactoryImpl;

Expand All @@ -106,11 +170,13 @@ impl Factory for FactoryImpl {
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
) -> Option<Box<dyn SstReader + Send + 'a>> {
let store = store_picker.pick_by_freq(options.frequency).clone();
let file_reader = FileReaderOnObjectStore::new(path.clone(), store);
let parquet_reader = AsyncParquetReader::new(path, Arc::new(file_reader), options);
// TODO: Currently, we only have one sst format, and we have to choose right
// reader for sst according to its real format in the future.
let reader = AsyncParquetReader::new(path, store_picker, options);
let reader = ThreadedReader::new(
reader,
parquet_reader,
options.runtime.clone(),
options.background_read_parallelism,
);
Expand Down
142 changes: 97 additions & 45 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,29 @@ use common_types::{
projected_schema::{ProjectedSchema, RowProjector},
record_batch::{ArrowRecordBatchProjector, RecordBatchWithKey},
};
use common_util::{runtime::Runtime, time::InstantExt};
use datafusion::datasource::file_format;
use common_util::{error::GenericResult, runtime::Runtime, time::InstantExt};
use datafusion::error::DataFusionError as DfError;
use futures::{future::BoxFuture, FutureExt, Stream, StreamExt, TryFutureExt};
use log::{debug, error, info, warn};
use object_store::{ObjectMeta, ObjectStoreRef, Path};
use object_store::Path;
use parquet::{
arrow::{async_reader::AsyncFileReader, ParquetRecordBatchStreamBuilder, ProjectionMask},
file::metadata::RowGroupMetaData,
arrow::{
async_reader::AsyncFileReader as AsyncParquetFileReader, ParquetRecordBatchStreamBuilder,
ProjectionMask,
},
file::{
footer,
metadata::{ParquetMetaData, RowGroupMetaData},
},
};
use parquet_ext::ParquetMetaDataRef;
use prometheus::local::LocalHistogram;
use snafu::ResultExt;
use table_engine::predicate::PredicateRef;
use tokio::sync::mpsc::{self, Receiver, Sender};

use crate::{
sst::{
factory::{ObjectStorePickerRef, ReadFrequency, SstReaderOptions},
factory::{ReadFrequency, SstReaderOptions},
file::{BloomFilter, SstMetaData},
meta_cache::{MetaCacheRef, MetaData},
metrics,
Expand All @@ -46,11 +51,67 @@ use crate::{

type SendableRecordBatchStream = Pin<Box<dyn Stream<Item = Result<ArrowRecordBatch>> + Send>>;

pub type AsyncFileReaderRef = Arc<dyn AsyncFileReader>;

#[async_trait]
pub trait AsyncFileReader: Send + Sync {
async fn file_size(&self) -> GenericResult<usize>;

async fn get_byte_range(&self, range: Range<usize>) -> GenericResult<Bytes>;

async fn get_byte_ranges(&self, ranges: &[Range<usize>]) -> GenericResult<Vec<Bytes>>;
}

/// Fetch and parse [`ParquetMetadata`] from the file reader.
///
/// Referring to: https://github.com/apache/arrow-datafusion/blob/ac2e5d15e5452e83c835d793a95335e87bf35569/datafusion/core/src/datasource/file_format/parquet.rs#L390-L449
async fn fetch_parquet_metadata_from_file_reader(
file_reader: &dyn AsyncFileReader,
) -> std::result::Result<ParquetMetaData, DfError> {
const FOOTER_LEN: usize = 8;

let file_size = file_reader.file_size().await?;

if file_size < FOOTER_LEN {
let err_msg = format!("file size of {} is less than footer", file_size);
return Err(DfError::Execution(err_msg));
}

let footer_start = file_size - FOOTER_LEN;

let footer_bytes = file_reader
.get_byte_range(footer_start..file_size)
.await
.map_err(|e| DfError::External(e))?;

assert_eq!(footer_bytes.len(), FOOTER_LEN);
let mut footer = [0; FOOTER_LEN];
footer.copy_from_slice(&footer_bytes);

let metadata_len = footer::decode_footer(&footer)?;

if file_size < metadata_len + FOOTER_LEN {
let err_msg = format!(
"file size of {} is smaller than footer + metadata {}",
file_size,
metadata_len + FOOTER_LEN
);
return Err(DfError::Execution(err_msg));
}

let metadata_start = file_size - metadata_len - FOOTER_LEN;
let metadata_bytes = file_reader
.get_byte_range(metadata_start..footer_start)
.await?;

Ok(footer::decode_metadata(&metadata_bytes)?)
}

pub struct Reader<'a> {
/// The path where the data is persisted.
path: &'a Path,
/// The storage where the data is persist.
store: &'a ObjectStoreRef,
file_reader: AsyncFileReaderRef,
projected_schema: ProjectedSchema,
meta_cache: Option<MetaCacheRef>,
predicate: PredicateRef,
Expand All @@ -69,17 +130,16 @@ pub struct Reader<'a> {
impl<'a> Reader<'a> {
pub fn new(
path: &'a Path,
store_picker: &'a ObjectStorePickerRef,
file_reader: AsyncFileReaderRef,
options: &SstReaderOptions,
) -> Self {
let batch_size = options.read_batch_row_num;
let parallelism_options =
ParallelismOptions::new(options.read_batch_row_num, options.num_rows_per_row_group);
let store = store_picker.pick_by_freq(options.frequency);

Self {
path,
store,
file_reader,
projected_schema: options.projected_schema.clone(),
meta_cache: options.meta_cache.clone(),
predicate: options.predicate.clone(),
Expand Down Expand Up @@ -160,7 +220,7 @@ impl<'a> Reader<'a> {
let meta_data = self.meta_data.as_ref().unwrap();
let row_projector = self.row_projector.as_ref().unwrap();
let object_store_reader =
ObjectStoreReader::new(self.store.clone(), self.path.clone(), meta_data.clone());
ParquetFileReaderAdapter::new(self.file_reader.clone(), meta_data.clone());

// Get target row groups.
let filtered_row_groups = self.filter_row_groups(
Expand Down Expand Up @@ -245,17 +305,11 @@ impl<'a> Reader<'a> {
Ok(())
}

async fn load_meta_data_from_storage(
&self,
object_meta: &ObjectMeta,
) -> Result<ParquetMetaDataRef> {
let meta_data =
file_format::parquet::fetch_parquet_metadata(self.store.as_ref(), object_meta, None)
.await
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?;

Ok(Arc::new(meta_data))
async fn load_meta_data_from_storage(&self) -> Result<ParquetMetaData> {
fetch_parquet_metadata_from_file_reader(self.file_reader.as_ref())
.await
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)
}

fn need_update_cache(&self) -> bool {
Expand All @@ -278,15 +332,11 @@ impl<'a> Reader<'a> {
let empty_predicate = self.predicate.exprs().is_empty();

let meta_data = {
let object_meta = self
.store
.head(self.path)
.await
.context(ObjectStoreError {})?;
let parquet_meta_data = self.load_meta_data_from_storage(&object_meta).await?;
let parquet_meta_data = self.load_meta_data_from_storage().await?;

let ignore_bloom_filter = avoid_update_cache && empty_predicate;
MetaData::try_new(&parquet_meta_data, object_meta.size, ignore_bloom_filter)
let file_size = self.file_reader.file_size().await.context(DecodeSstMeta)?;
MetaData::try_new(&parquet_meta_data, file_size, ignore_bloom_filter)
.map_err(|e| Box::new(e) as _)
.context(DecodeSstMeta)?
};
Expand Down Expand Up @@ -352,18 +402,16 @@ struct ReaderMetrics {
}

#[derive(Clone)]
struct ObjectStoreReader {
storage: ObjectStoreRef,
path: Path,
struct ParquetFileReaderAdapter {
file_reader: AsyncFileReaderRef,
meta_data: MetaData,
metrics: ReaderMetrics,
}

impl ObjectStoreReader {
fn new(storage: ObjectStoreRef, path: Path, meta_data: MetaData) -> Self {
impl ParquetFileReaderAdapter {
fn new(file_reader: AsyncFileReaderRef, meta_data: MetaData) -> Self {
Self {
storage,
path,
file_reader,
meta_data,
metrics: ReaderMetrics {
bytes_scanned: 0,
Expand All @@ -373,20 +421,24 @@ impl ObjectStoreReader {
}
}

impl Drop for ObjectStoreReader {
impl Drop for ParquetFileReaderAdapter {
fn drop(&mut self) {
info!("ObjectStoreReader dropped, metrics:{:?}", self.metrics);
info!(
"ParquetFileReaderAdapter is dropped, metrics:{:?}",
self.metrics
);
}
}

impl AsyncFileReader for ObjectStoreReader {
impl AsyncParquetFileReader for ParquetFileReaderAdapter {
fn get_bytes(&mut self, range: Range<usize>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
self.metrics.bytes_scanned += range.end - range.start;
self.metrics
.sst_get_range_length_histogram
.observe((range.end - range.start) as f64);
self.storage
.get_range(&self.path, range)

self.file_reader
.get_byte_range(range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{}",
Expand All @@ -406,11 +458,11 @@ impl AsyncFileReader for ObjectStoreReader {
.observe((range.end - range.start) as f64);
}
async move {
self.storage
.get_ranges(&self.path, &ranges)
self.file_reader
.get_byte_ranges(&ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{}",
"Failed to fetch ranges from underlying reader, err:{}",
e
))
})
Expand Down
Loading

0 comments on commit 4701a57

Please sign in to comment.