Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: write hybrid storage format #185

Merged
merged 15 commits into from
Aug 19, 2022
69 changes: 9 additions & 60 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use std::sync::{
Arc,
};

use arrow_deps::{
arrow::record_batch::RecordBatch as ArrowRecordBatch,
datafusion::parquet::basic::Compression,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
};
use arrow_deps::datafusion::parquet::basic::Compression;
use async_trait::async_trait;
use common_types::request_id::RequestId;
use futures::StreamExt;
Expand All @@ -23,7 +19,7 @@ use crate::sst::{
builder::{RecordBatchStream, SstBuilder, *},
factory::SstBuilderOptions,
file::SstMetaData,
parquet::encoding,
parquet::encoding::ParquetEncoder,
};

/// The implementation of sst based on parquet and object storage.
Expand Down Expand Up @@ -54,7 +50,6 @@ impl<'a> ParquetSstBuilder<'a> {
struct RecordBytesReader {
request_id: RequestId,
record_stream: RecordBatchStream,
encoding_buffer: Vec<u8>,
num_rows_per_row_group: usize,
compression: Compression,
meta_data: SstMetaData,
Expand All @@ -63,37 +58,14 @@ struct RecordBytesReader {
fetched_row_num: usize,
}

/// Build the write properties containing the sst meta data.
fn build_write_properties(
num_rows_per_row_group: usize,
compression: Compression,
meta_data: &SstMetaData,
) -> Result<WriterProperties> {
let meta_data_kv = encoding::encode_sst_meta_data(meta_data.clone())
.map_err(|e| Box::new(e) as _)
.context(EncodeMetaData)?;

Ok(WriterProperties::builder()
.set_key_value_metadata(Some(vec![meta_data_kv]))
.set_max_row_group_size(num_rows_per_row_group)
.set_compression(compression)
.build())
}

impl RecordBytesReader {
async fn read_all(mut self) -> Result<Vec<u8>> {
let mut arrow_record_batch_vec = Vec::new();

// build writer
let write_props = build_write_properties(
let mut parquet_encoder = ParquetEncoder::try_new(
self.num_rows_per_row_group,
self.compression,
&self.meta_data,
)?;
let mut arrow_writer = ArrowWriter::try_new(
&mut self.encoding_buffer,
self.meta_data.schema.as_arrow_schema_ref().clone(),
Some(write_props),
)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
Expand All @@ -114,7 +86,8 @@ impl RecordBytesReader {
if self.fetched_row_num >= self.num_rows_per_row_group {
let buf_len = arrow_record_batch_vec.len();
self.fetched_row_num = 0;
let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec)
let row_num = parquet_encoder
.encode_record_batch(arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
arrow_record_batch_vec = Vec::with_capacity(buf_len);
Expand All @@ -124,41 +97,18 @@ impl RecordBytesReader {

// final check if there is any record batch left
if self.fetched_row_num != 0 {
let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec)
let row_num = parquet_encoder
.encode_record_batch(arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
self.total_row_num.fetch_add(row_num, Ordering::Relaxed);
}

arrow_writer
let bytes = parquet_encoder
.close()
.map_err(|e| Box::new(e) as _)
.context(EncodeMetaData)?;

Ok(self.encoding_buffer)
}

/// Encode the record batch with [ArrowWriter] and the encoded contents is
/// written to the buffer.
fn encode_record_batch<W: std::io::Write>(
arrow_writer: &mut ArrowWriter<W>,
arrow_record_batch_vec: Vec<ArrowRecordBatch>,
) -> Result<usize> {
if arrow_record_batch_vec.is_empty() {
return Ok(0);
}

let arrow_schema = arrow_record_batch_vec[0].schema();
let record_batch = ArrowRecordBatch::concat(&arrow_schema, &arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;

arrow_writer
.write(&record_batch)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;

Ok(record_batch.num_rows())
Ok(bytes)
}
}

Expand All @@ -179,7 +129,6 @@ impl<'a> SstBuilder for ParquetSstBuilder<'a> {
let reader = RecordBytesReader {
request_id,
record_stream,
encoding_buffer: vec![],
num_rows_per_row_group: self.num_rows_per_row_group,
compression: self.compression,
total_row_num: total_row_num.clone(),
Expand Down
Loading