Skip to content

Commit

Permalink
support variable length decode
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Aug 24, 2022
1 parent bc21ede commit 0ace912
Showing 1 changed file with 121 additions and 29 deletions.
150 changes: 121 additions & 29 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{

use arrow_deps::{
arrow::{
array::{Array, ArrayData},
array::{Array, ArrayData, ArrayRef},
buffer::{Buffer, MutableBuffer},
record_batch::RecordBatch as ArrowRecordBatch,
},
Expand All @@ -34,6 +34,8 @@ use crate::sst::{
parquet::hybrid::{self, IndexedType},
};

const OFFSET_SIZE: usize = std::mem::size_of::<i32>();

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display(
Expand Down Expand Up @@ -126,6 +128,22 @@ pub enum Error {
backtrace: Backtrace,
},

#[snafu(display(
"Failed to decode hybrid record batch, err:{}.\nBacktrace:\n{}",
source,
backtrace
))]
DecodeRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
backtrace: Backtrace,
},

#[snafu(display(
"At least one ListArray(such as Timestamp) is required to decode hybrid record batch.\nBacktrace:\n{}",
backtrace
))]
ListArrayRequired { backtrace: Backtrace },

#[snafu(display("Tsid is required for hybrid format.\nBacktrace:\n{}", backtrace))]
TsidRequired { backtrace: Backtrace },

Expand Down Expand Up @@ -491,63 +509,137 @@ impl ParquetDecoder {
))
}

/// Stretch hybrid collpased column into columnar column.
/// `value_offsets` specify offsets each value occupied, which means that
/// the number of a `value[n]` is `value_offsets[n] - value_offsets[n-1]`.
/// Ex:
///
/// `array_ref` is `a b c`, `value_offsets` is `[0, 3, 5, 6]`, then
/// output array is `a a a b b c`
fn stretch_variable_length_column(
array_ref: &ArrayRef,
value_offsets: &[i32],
values_num: usize,
) -> Result<ArrayRef> {
let offset_slices = array_ref.data().buffers()[0].as_slice();
let value_slices = array_ref.data().buffers()[1].as_slice();
debug!(
"raw buffer slice, offset:{:#02x?}, value:{:#02x?}",
offset_slices, value_slices
);

let mut offsets = Vec::with_capacity(offset_slices.len() / OFFSET_SIZE);
for i in (0..offset_slices.len()).step_by(OFFSET_SIZE) {
let offset = i32::from_le_bytes(offset_slices[i..i + OFFSET_SIZE].try_into().unwrap());
offsets.push(offset);
}

let mut value_bytes = 0;
for (idx, (current, prev)) in offsets[1..].iter().zip(&offsets).enumerate() {
let value_len = current - prev;
let value_num = value_offsets[idx + 1] - value_offsets[idx];
value_bytes += value_len * value_num;
}
let mut new_offsets = MutableBuffer::new(OFFSET_SIZE * values_num);
let mut new_values = MutableBuffer::new(value_bytes as usize);
let mut length_so_far: i32 = 0;
new_offsets.push(length_so_far);

for (idx, (current, prev)) in offsets[1..].iter().zip(&offsets).enumerate() {
let value_len = current - prev;
let value_num = value_offsets[idx + 1] - value_offsets[idx];
new_values
.extend(value_slices[*prev as usize..*current as usize].repeat(value_num as usize));
for _ in 0..value_num {
length_so_far += value_len;
new_offsets.push(length_so_far);
}
}
let array_data = ArrayData::builder(array_ref.data_type().clone())
.len(values_num)
.add_buffer(new_offsets.into())
.add_buffer(new_values.into())
.build()
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
}

/// Like `stretch_variable_length_column`, but value is fixed-size
fn stretch_fixed_length_column(
array_ref: &ArrayRef,
value_size: usize,
value_offsets: &[i32],
values_num: usize,
) -> Result<ArrayRef> {
let mut new_buffer = MutableBuffer::new(value_size * values_num);
let raw_buffer = array_ref.data().buffers()[0].as_slice();
for (idx, offset) in (0..raw_buffer.len()).step_by(value_size).enumerate() {
let value_num = value_offsets[idx + 1] - value_offsets[idx];
new_buffer.extend(raw_buffer[offset..offset + value_size].repeat(value_num as usize))
}
let array_data = ArrayData::builder(array_ref.data_type().clone())
.add_buffer(new_buffer.into())
.len(values_num)
.build()
.map_err(|e| Box::new(e) as _)
.context(DecodeRecordBatch)?;

Ok(array_data.into())
}

/// Decode records from hybrid to columnar format
pub fn decode(self) -> Result<ArrowRecordBatch> {
let new_arrow_schema = Self::convert_schema(self.record_batch.schema());
let columns = self.record_batch.columns();
let mut offsets = None;
let mut num_values = 0;
let mut value_offsets = None;
let mut values_num = 0;
// Find value offsets from first `ListArray` column
for col in columns {
if matches!(col.data_type(), DataType::List(_)) {
let offset_slices = col.data().buffers()[0].as_slice();
num_values = col.data().child_data()[0].len();
values_num = col.data().child_data()[0].len();

let mut i32_offsets = Vec::with_capacity(offset_slices.len() / 4);
for i in (0..offset_slices.len()).step_by(4) {
let offset = i32::from_le_bytes(offset_slices[i..i + 4].try_into().unwrap());
i32_offsets.push(offset);
}
offsets = Some(i32_offsets);
value_offsets = Some(i32_offsets);
break;
}
}
debug!("offsets:{:?}, num_values:{}", offsets, num_values);

let offsets = offsets.unwrap();
ensure!(value_offsets.is_some(), ListArrayRequired {});

let value_offsets = value_offsets.unwrap();
let new_cols = columns
.iter()
.map(|col| {
let data_type = col.data_type();
match data_type {
DataType::List(_nested_field) => col.data().child_data()[0].clone().into(),
DataType::List(_nested_field) => Ok(col.data().child_data()[0].clone().into()),
_ => {
let datum_kind = DatumKind::from_data_type(data_type).unwrap();
let value_size = datum_kind.size().unwrap();
let mut new_buffer = MutableBuffer::new(value_size * num_values);
let raw_buffer = col.data().buffers()[0].as_slice();
for (idx, offset) in (0..raw_buffer.len()).step_by(value_size).enumerate() {
debug!("xx {idx}-{offset}-{}-{value_size}", offsets[idx]);
let value_num = offsets[idx + 1] - offsets[idx];
new_buffer.extend(
raw_buffer[offset..offset + value_size].repeat(value_num as usize),
)
match datum_kind.size() {
None => Self::stretch_variable_length_column(
col,
&value_offsets,
values_num,
),
Some(value_size) => Self::stretch_fixed_length_column(
col,
value_size,
&value_offsets,
values_num,
),
}
let buf: Buffer = new_buffer.into();
debug!("11_{:?}, new_buffer:{:#02x?}", data_type, buf.as_slice());

let array_data = ArrayData::builder(data_type.clone())
.add_buffer(buf)
.len(num_values)
.build()
.unwrap();

array_data.into()
}
}
})
.collect::<Vec<_>>();
.collect::<Result<Vec<_>>>()?;

println!("try_new:{:?}, cols:{:?}", new_arrow_schema, new_cols);
ArrowRecordBatch::try_new(new_arrow_schema, new_cols)
.map_err(|e| Box::new(e) as _)
.context(EncodeRecordBatch)
Expand Down

0 comments on commit 0ace912

Please sign in to comment.