Skip to content

Commit

Permalink
feat: write hybrid storage format (apache#185)
Browse files Browse the repository at this point in the history
* add convert_to_hybrid

* add parquet encoder to keep schema consistent between writer and record

* use buffer to build list array

* fields support all primitive array

* support bitmap

* check tsid in try_new

* fix CR, add more docs

* fix clippy

* rename field column to non key column

* fix clippy

* fix wrong data type size

* fix reviews

* fix naming convertion, add is_collapsible_column method

* fix clippy

* null bimap init to 1 to avoid unnecessory operation
  • Loading branch information
jiacai2050 authored Aug 19, 2022
1 parent 96a0dc9 commit 1cefda7
Show file tree
Hide file tree
Showing 9 changed files with 900 additions and 68 deletions.
69 changes: 9 additions & 60 deletions analytic_engine/src/sst/parquet/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ use std::sync::{
Arc,
};

use arrow_deps::{
arrow::record_batch::RecordBatch as ArrowRecordBatch,
datafusion::parquet::basic::Compression,
parquet::{arrow::ArrowWriter, file::properties::WriterProperties},
};
use arrow_deps::datafusion::parquet::basic::Compression;
use async_trait::async_trait;
use common_types::request_id::RequestId;
use futures::StreamExt;
Expand All @@ -23,7 +19,7 @@ use crate::sst::{
builder::{RecordBatchStream, SstBuilder, *},
factory::SstBuilderOptions,
file::SstMetaData,
parquet::encoding,
parquet::encoding::ParquetEncoder,
};

/// The implementation of sst based on parquet and object storage.
Expand Down Expand Up @@ -54,7 +50,6 @@ impl<'a> ParquetSstBuilder<'a> {
struct RecordBytesReader {
request_id: RequestId,
record_stream: RecordBatchStream,
encoding_buffer: Vec<u8>,
num_rows_per_row_group: usize,
compression: Compression,
meta_data: SstMetaData,
Expand All @@ -63,37 +58,14 @@ struct RecordBytesReader {
fetched_row_num: usize,
}

/// Build the write properties containing the sst meta data.
fn build_write_properties(
num_rows_per_row_group: usize,
compression: Compression,
meta_data: &SstMetaData,
) -> Result<WriterProperties> {
let meta_data_kv = encoding::encode_sst_meta_data(meta_data.clone())
.map_err(|e| Box::new(e) as _)
.context(EncodeMetaData)?;

Ok(WriterProperties::builder()
.set_key_value_metadata(Some(vec![meta_data_kv]))
.set_max_row_group_size(num_rows_per_row_group)
.set_compression(compression)
.build())
}

impl RecordBytesReader {
async fn read_all(mut self) -> Result<Vec<u8>> {
let mut arrow_record_batch_vec = Vec::new();

// build writer
let write_props = build_write_properties(
let mut parquet_encoder = ParquetEncoder::try_new(
self.num_rows_per_row_group,
self.compression,
&self.meta_data,
)?;
let mut arrow_writer = ArrowWriter::try_new(
&mut self.encoding_buffer,
self.meta_data.schema.as_arrow_schema_ref().clone(),
Some(write_props),
)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
Expand All @@ -114,7 +86,8 @@ impl RecordBytesReader {
if self.fetched_row_num >= self.num_rows_per_row_group {
let buf_len = arrow_record_batch_vec.len();
self.fetched_row_num = 0;
let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec)
let row_num = parquet_encoder
.encode_record_batch(arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
arrow_record_batch_vec = Vec::with_capacity(buf_len);
Expand All @@ -124,41 +97,18 @@ impl RecordBytesReader {

// final check if there is any record batch left
if self.fetched_row_num != 0 {
let row_num = Self::encode_record_batch(&mut arrow_writer, arrow_record_batch_vec)
let row_num = parquet_encoder
.encode_record_batch(arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
self.total_row_num.fetch_add(row_num, Ordering::Relaxed);
}

arrow_writer
let bytes = parquet_encoder
.close()
.map_err(|e| Box::new(e) as _)
.context(EncodeMetaData)?;

Ok(self.encoding_buffer)
}

/// Encode the record batch with [ArrowWriter] and the encoded contents is
/// written to the buffer.
fn encode_record_batch<W: std::io::Write>(
arrow_writer: &mut ArrowWriter<W>,
arrow_record_batch_vec: Vec<ArrowRecordBatch>,
) -> Result<usize> {
if arrow_record_batch_vec.is_empty() {
return Ok(0);
}

let arrow_schema = arrow_record_batch_vec[0].schema();
let record_batch = ArrowRecordBatch::concat(&arrow_schema, &arrow_record_batch_vec)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;

arrow_writer
.write(&record_batch)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;

Ok(record_batch.num_rows())
Ok(bytes)
}
}

Expand All @@ -179,7 +129,6 @@ impl<'a> SstBuilder for ParquetSstBuilder<'a> {
let reader = RecordBytesReader {
request_id,
record_stream,
encoding_buffer: vec![],
num_rows_per_row_group: self.num_rows_per_row_group,
compression: self.compression,
total_row_num: total_row_num.clone(),
Expand Down
Loading

0 comments on commit 1cefda7

Please sign in to comment.