Skip to content

Commit

Permalink
fix reviews
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Aug 18, 2022
1 parent 7cd4f9a commit 93cfa2d
Show file tree
Hide file tree
Showing 4 changed files with 65 additions and 58 deletions.
10 changes: 0 additions & 10 deletions analytic_engine/src/sst/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,6 @@ pub mod error {
backtrace: Backtrace,
},

#[snafu(display(
"Hybrid format don't support variable length type, type:{}.\nBacktrace:\n{}",
type_str,
backtrace
))]
VariableLengthType {
type_str: String,
backtrace: Backtrace,
},

#[snafu(display("Failed to poll record batch, err:{}", source))]
PollRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
Expand Down
65 changes: 41 additions & 24 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ use proto::sst::SstMetaData as SstMetaDataPb;
use protobuf::Message;
use snafu::{ensure, Backtrace, OptionExt, ResultExt, Snafu};

use super::hybrid::IndexedType;
use crate::sst::{builder::VariableLengthType, file::SstMetaData, parquet::hybrid};
use crate::sst::{
file::SstMetaData,
parquet::hybrid::{self, IndexedType},
};

#[derive(Debug, Snafu)]
pub enum Error {
Expand Down Expand Up @@ -124,11 +126,21 @@ pub enum Error {

#[snafu(display(
"Key column must be string type. type:{}\nBacktrace:\n{}",
type_str,
type_name,
backtrace
))]
StringKeyColumnRequired {
type_str: String,
type_name: String,
backtrace: Backtrace,
},

#[snafu(display(
"Hybrid format doesn't support variable length type, type:{}.\nBacktrace:\n{}",
type_name,
backtrace
))]
VariableLengthType {
type_name: String,
backtrace: Backtrace,
},
}
Expand Down Expand Up @@ -190,7 +202,7 @@ pub fn decode_sst_meta_data(kv: &KeyValue) -> Result<SstMetaData> {
SstMetaData::try_from(meta_data_pb).context(ConvertSstMetaData)
}

/// RecordEncoder is used for encoding ArrowBatch
/// RecordEncoder is used for encoding ArrowBatch.
///
/// TODO: allow pre-allocate buffer
trait RecordEncoder {
Expand All @@ -204,7 +216,7 @@ trait RecordEncoder {
}

/// EncodingWriter implements `Write` trait, useful when Writer need shared
/// ownership
/// ownership.
///
/// TODO: This is a temp workaround for [ArrowWriter](https://docs.rs/parquet/20.0.0/parquet/arrow/arrow_writer/struct.ArrowWriter.html), since it has no method to get underlying Writer
/// We can fix this by add `into_inner` method to it, or just replace it with
Expand All @@ -226,14 +238,13 @@ impl Write for EncodingWriter {
}

fn flush(&mut self) -> std::io::Result<()> {
let mut inner = self.0.lock().unwrap();
inner.flush()
Ok(())
}
}

struct ColumnarRecordEncoder {
buf: EncodingWriter,
// wrap in Option so ownership can be take out behind `&mut self`
// wrap in Option so ownership can be taken out behind `&mut self`
arrow_writer: Option<ArrowWriter<EncodingWriter>>,
arrow_schema: ArrowSchemaRef,
}
Expand Down Expand Up @@ -289,7 +300,7 @@ impl RecordEncoder for ColumnarRecordEncoder {

struct HybridRecordEncoder {
buf: EncodingWriter,
// wrap in Option so ownership can be take out behind `&mut self`
// wrap in Option so ownership can be taken out behind `&mut self`
arrow_writer: Option<ArrowWriter<EncodingWriter>>,
arrow_schema: ArrowSchemaRef,
tsid_type: IndexedType,
Expand All @@ -301,6 +312,8 @@ struct HybridRecordEncoder {

impl HybridRecordEncoder {
fn try_new(write_props: WriterProperties, schema: &Schema) -> Result<Self> {
// TODO: What we really want here is a unique ID, tsid is one case
// Maybe support other cases later.
let tsid_idx = schema.index_of_tsid().context(TsidRequired)?;
let timestamp_type = IndexedType {
idx: schema.timestamp_index(),
Expand All @@ -314,27 +327,31 @@ impl HybridRecordEncoder {
let mut key_types = Vec::new();
let mut non_key_types = Vec::new();
for (idx, col) in schema.columns().iter().enumerate() {
if idx != timestamp_type.idx && idx != tsid_idx {
continue;
}

if schema.non_key_column(idx) {
let _ = col
.data_type
.size()
.context(VariableLengthType {
type_str: col.data_type.to_string(),
})
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)?;
// TODO: support variable length type
ensure!(
col.data_type.size().is_some(),
VariableLengthType {
type_name: col.data_type.to_string(),
}
);

non_key_types.push(IndexedType {
idx,
data_type: schema.column(idx).data_type,
});
} else if idx != timestamp_type.idx && idx != tsid_idx {
if !matches!(col.data_type, DatumKind::String) {
return StringKeyColumnRequired {
type_str: col.data_type.to_string(),
} else {
// TODO: support non-string key columns
ensure!(
matches!(col.data_type, DatumKind::String),
StringKeyColumnRequired {
type_name: col.data_type.to_string(),
}
.fail();
}
);
key_types.push(IndexedType {
idx,
data_type: col.data_type,
Expand Down
46 changes: 23 additions & 23 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ use common_types::{
schema::{ArrowSchemaRef, DataType, Field, Schema},
};
use log::debug;
use snafu::{OptionExt, ResultExt};
use snafu::ResultExt;

use crate::sst::builder::{EncodeRecordBatch, Result, VariableLengthType};
use crate::sst::builder::{EncodeRecordBatch, Result};

// hard coded in https://github.com/apache/arrow-rs/blob/20.0.0/arrow/src/array/array_list.rs#L185
const LIST_ITEM_NAME: &str = "item";

/// ArrayHandle is used to keep different offsets of array, which can be concat
/// together.
/// ArrayHandle is used to keep different offsets of array, which can be
/// concatenated together.
///
/// Note:
/// 1. Array.slice(offset, length) don't work as expected, since the
Expand Down Expand Up @@ -101,6 +101,10 @@ struct IndexedArray {
array: ArrayRef,
}

fn is_collapsable_column(idx: usize, timestamp_idx: usize, non_key_column_idxes: &[usize]) -> bool {
idx == timestamp_idx || non_key_column_idxes.contains(&idx)
}

/// Convert timestamp/non key columns to list type
pub fn build_hybrid_arrow_schema(
timestamp_idx: usize,
Expand All @@ -113,16 +117,13 @@ pub fn build_hybrid_arrow_schema(
.iter()
.enumerate()
.map(|(idx, field)| {
if idx == timestamp_idx || non_key_column_idxes.contains(&idx) {
Field::new(
field.name(),
DataType::List(Box::new(Field::new(
LIST_ITEM_NAME,
field.data_type().clone(),
true,
))),
if is_collapsable_column(idx, timestamp_idx, &non_key_column_idxes) {
let field_type = DataType::List(Box::new(Field::new(
LIST_ITEM_NAME,
field.data_type().clone(),
true,
)
)));
Field::new(field.name(), field_type, true)
} else {
field.clone()
}
Expand Down Expand Up @@ -150,9 +151,10 @@ impl ListArrayBuilder {
}

fn build_child_data(&self, offsets: &mut MutableBuffer) -> Result<ArrayData> {
let data_type_size = self.datum_kind.size().context(VariableLengthType {
type_str: self.datum_kind.to_string(),
})?;
let data_type_size = self
.datum_kind
.size()
.expect("checked in HybridRecordEncoder::try_new");
let values_num = self.list_of_arrays.iter().map(|handle| handle.len()).sum();
let mut values = MutableBuffer::new(values_num * data_type_size);
let mut null_buffer = MutableBuffer::new_null(values_num);
Expand Down Expand Up @@ -200,10 +202,8 @@ impl ListArrayBuilder {
Ok(values_array_data)
}

/// This function is an translation of [GenericListArray.from_iter_primitive](https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#151)
/// This function is a translation of [GenericListArray.from_iter_primitive](https://docs.rs/arrow/20.0.0/src/arrow/array/array_list.rs.html#151)
fn build(self) -> Result<ListArray> {
assert!(!self.list_of_arrays.is_empty());

let array_len = self.list_of_arrays.len();
let mut offsets =
MutableBuffer::new(self.list_of_arrays.len() * std::mem::size_of::<i32>());
Expand Down Expand Up @@ -280,7 +280,7 @@ fn build_hybrid_record(
})
})
.collect::<Result<Vec<_>>>()?;
let all_columns = vec![
let all_columns = [
vec![tsid_array, timestamp_array],
key_column_arrays,
non_key_column_arrays,
Expand All @@ -293,7 +293,7 @@ fn build_hybrid_record(
.collect::<Vec<_>>();

ArrowRecordBatch::try_new(arrow_schema, all_columns)
.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)
}

Expand All @@ -305,11 +305,11 @@ pub fn convert_to_hybrid_record(
key_types: &[IndexedType],
non_key_types: &[IndexedType],
hybrid_arrow_schema: ArrowSchemaRef,
arrow_record_batch_vec: Vec<ArrowRecordBatch>,
arrow_record_batchs: Vec<ArrowRecordBatch>,
) -> Result<ArrowRecordBatch> {
// TODO: should keep tsid ordering here?
let mut batch_by_tsid = BTreeMap::new();
for record_batch in arrow_record_batch_vec {
for record_batch in arrow_record_batchs {
let tsid_array = record_batch
.column(tsid_type.idx)
.as_any()
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ impl ProjectAndFilterReader {
let arrow_schema = record_batch.schema();
let schema = Schema::try_from(arrow_schema).context(InvalidSchema)?;
let record_batch = match schema.storage_format() {
StorageFormat::Hybrid => todo!(),
StorageFormat::Hybrid => todo!("Will implement this in PR 207"),
StorageFormat::Columnar => record_batch,
};

Expand Down

0 comments on commit 93cfa2d

Please sign in to comment.