Skip to content

Commit

Permalink
use buffer to build list array
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 committed Aug 12, 2022
1 parent a6e1010 commit a6449be
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 35 deletions.
9 changes: 7 additions & 2 deletions analytic_engine/src/sst/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::sst::file::SstMetaData;

pub mod error {
use common_util::define_result;
use snafu::Snafu;
use snafu::{Backtrace, Snafu};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
Expand All @@ -25,9 +25,14 @@ pub mod error {
source: Box<dyn std::error::Error + Send + Sync>,
},

#[snafu(display("Failed to encode record batch into sst, err:{}.", source,))]
#[snafu(display(
"Failed to encode record batch into sst, err:{}.\nBacktrace:\n{}",
source,
backtrace
))]
EncodeRecordBatch {
source: Box<dyn std::error::Error + Send + Sync>,
backtrace: Backtrace,
},

#[snafu(display("Failed to poll record batch, err:{}", source))]
Expand Down
143 changes: 110 additions & 33 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,21 @@ use std::{collections::BTreeMap, sync::Arc};

use arrow_deps::arrow::{
array::{
Array, ArrayRef, Float64Array, ListArray, StringArray, TimestampMillisecondArray,
UInt64Array,
Array, ArrayData, ArrayRef, Float64Array, ListArray, StringArray,
TimestampMillisecondArray, UInt64Array,
},
buffer::MutableBuffer,
datatypes::{
DataType as ArrowDataType, Float64Type, Schema as ArrowSchema, TimeUnit,
TimestampMillisecondType,
},
datatypes::{Float64Type, Schema as ArrowSchema, TimeUnit, TimestampMillisecondType},
record_batch::RecordBatch as ArrowRecordBatch,
};
use common_types::schema::{ArrowSchemaRef, DataType, Field, Schema};
use log::debug;
use common_types::{
datum::DatumKind,
schema::{ArrowSchemaRef, DataType, Field, Schema},
};
use log::{debug, info};
use snafu::ResultExt;

use crate::sst::builder::{EncodeRecordBatch, Result};
Expand All @@ -20,29 +27,34 @@ const LIST_ITEM_NAME: &str = "item";
/// `TsidBatch` is used to collect column data for the same TSID
/// timestamps.len == each field len
/// NOTE: only support f64 fields now
#[derive(Debug)]
struct TsidBatch {
timestamps: Vec<Option<i64>>,
tag_values: Vec<String>,
timestamp_array: ArrayRef,
timestamp_pos: Vec<(usize, usize)>, // (offset, length)
fields: Vec<Vec<Option<f64>>>,
}

impl TsidBatch {
fn new(tag_values: Vec<String>, field_num: usize) -> Self {
fn new(tag_values: Vec<String>, timestamp: ArrayRef, field_num: usize) -> Self {
Self {
timestamps: Vec::new(),
tag_values,
timestamp_array: timestamp,
timestamp_pos: Vec::new(),
fields: vec![Vec::new(); field_num],
}
}

fn append_timestamp(&mut self, ts: impl IntoIterator<Item = Option<i64>>) {
self.timestamps.extend(ts);
fn append_timestamp(&mut self, offset: usize, length: usize) {
self.timestamp_pos.push((offset, length));
}

fn append_fields(&mut self, fields: Vec<ArrayRef>) {
assert_eq!(self.fields.len(), fields.len());

for (idx, fields) in fields.into_iter().enumerate() {
// let array_data = fields.data();
// let buffer = array_data.buffers()[0];
let fields_in_one_tsid = fields
.as_any()
.downcast_ref::<Float64Array>()
Expand Down Expand Up @@ -109,6 +121,57 @@ pub fn build_hybrid_arrow_schema(schema: &Schema) -> ArrowSchemaRef {
))
}

fn merge_array_vec_to_list(list_of_arrays: Vec<(ArrayRef, Vec<(usize, usize)>)>) -> ListArray {
assert!(!list_of_arrays.is_empty());

let array_len = list_of_arrays.len();
let data_type = list_of_arrays[0].0.data_type().clone();
let data_type_size = DatumKind::from_data_type(&data_type)
.expect("unsupported datatype")
.size()
.unwrap();
let mut value_len = 0;
for lst in &list_of_arrays {
value_len += lst.1.len();
}
let value_total_bytes = value_len * data_type_size;
let mut values = MutableBuffer::new(value_total_bytes);
let mut offsets = MutableBuffer::new(list_of_arrays.len() * std::mem::size_of::<i32>());
let mut length_so_far: i32 = 0;
offsets.push(length_so_far);

for (array, lst) in list_of_arrays {
let shared_buffer = array.data().buffers()[0].as_slice();
for (offset, length) in lst {
length_so_far += length as i32;
values.extend_from_slice(
&shared_buffer[offset * data_type_size..(offset + length) * data_type_size],
);
}
offsets.push(length_so_far);
}
println!(
"offsets:{:?},values:{:?}",
offsets.as_slice(),
values.as_slice()
);

let values_array_data = ArrayData::builder(data_type.clone())
.len(value_len)
.add_buffer(values.into())
.build()
.unwrap();
let field = Box::new(Field::new(LIST_ITEM_NAME, data_type, true));
let array_data = ArrayData::builder(DataType::List(field))
.len(array_len)
.add_buffer(offsets.into())
.add_child_data(values_array_data);

// let array_data = unsafe { array_data.build_unchecked() };
let array_data = { array_data.build().unwrap() };
ListArray::from(array_data)
}

fn build_hybrid_record(
arrow_schema: ArrowSchemaRef,
tsid_name: IndexedName,
Expand All @@ -117,13 +180,13 @@ fn build_hybrid_record(
field_names: Vec<IndexedName>,
batch_by_tsid: BTreeMap<u64, TsidBatch>,
) -> Result<ArrowRecordBatch> {
let tsid_col = UInt64Array::from_iter_values(batch_by_tsid.keys().cloned().into_iter());
let tsid_col = UInt64Array::from_iter_values(batch_by_tsid.keys().cloned());
let mut ts_col = Vec::new();
let mut field_cols = vec![Vec::new(); field_names.len()];
let mut tag_cols = vec![Vec::new(); tag_names.len()];

for batch in batch_by_tsid.into_values() {
ts_col.push(Some(batch.timestamps.clone()));
ts_col.push((batch.timestamp_array, batch.timestamp_pos));
for (idx, field) in batch.fields.into_iter().enumerate() {
field_cols[idx].push(Some(field));
}
Expand All @@ -137,11 +200,7 @@ fn build_hybrid_record(
};
let ts_array = IndexedArray {
idx: timestamp_name.idx,
array: Arc::new(ListArray::from_iter_primitive::<
TimestampMillisecondType,
_,
_,
>(ts_col)),
array: Arc::new(merge_array_vec_to_list(ts_col)),
};
let tag_arrays = tag_cols
.into_iter()
Expand Down Expand Up @@ -208,13 +267,11 @@ pub fn convert_to_hybrid(
idx,
name: col.name.clone(),
});
} else {
if idx != timestamp_name.idx && idx != tsid_name.idx {
field_names.push(IndexedName {
idx,
name: col.name.clone(),
});
}
} else if idx != timestamp_name.idx && idx != tsid_name.idx {
field_names.push(IndexedName {
idx,
name: col.name.clone(),
});
}
}
debug!(
Expand Down Expand Up @@ -263,14 +320,6 @@ pub fn convert_to_hybrid(
} else {
duplicated_tsids[i + 1].1 - offset
};
// collect timestamps
let timestamps_in_one_tsid = record_batch
.column(timestamp_name.idx)
.slice(offset, length);
let timestamps_in_one_tsid = timestamps_in_one_tsid
.as_any()
.downcast_ref::<TimestampMillisecondArray>()
.expect("checked in plan build");

// collect fields
let mut field_columns = Vec::with_capacity(field_names.len());
Expand All @@ -285,10 +334,11 @@ pub fn convert_to_hybrid(
.iter()
.map(|col| col.value(offset).to_string())
.collect::<Vec<_>>(),
record_batch.column(timestamp_name.idx).clone(),
field_names.len(),
)
});
batch.append_timestamp(timestamps_in_one_tsid.into_iter());
batch.append_timestamp(offset, length);
batch.append_fields(field_columns);
}
}
Expand All @@ -302,3 +352,30 @@ pub fn convert_to_hybrid(
batch_by_tsid,
)
}

#[cfg(test)]
mod tests {
use super::*;

fn timestamp_array(start: i64, end: i64) -> ArrayRef {
Arc::new(TimestampMillisecondArray::from_iter_values(start..end))
}

#[test]
fn merge_timestamp_array_list() {
let list_of_arrays = vec![
(timestamp_array(1, 20), vec![(1, 2), (10, 3)]),
(timestamp_array(1, 20), vec![(1, 2), (10, 3)]),
];

let data = vec![
Some(vec![Some(1), Some(2), Some(10), Some(11), Some(12)]),
Some(vec![Some(1), Some(2), Some(10), Some(11), Some(12)]),
];
let expected = ListArray::from_iter_primitive::<TimestampMillisecondType, _, _>(data);
let list_array = merge_array_vec_to_list(list_of_arrays);

// TODO: null bitmaps is not equals now
assert_eq!(list_array.data().buffers(), expected.data().buffers());
}
}
22 changes: 22 additions & 0 deletions common_types/src/datum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,28 @@ impl DatumKind {
pub fn into_u8(self) -> u8 {
self as u8
}

/// Return None for variable-length type
pub fn size(&self) -> Option<usize> {
let size = match self {
DatumKind::Null => 1,
DatumKind::Timestamp => 8,
DatumKind::Double => 8,
DatumKind::Float => 8,
DatumKind::Varbinary => return None,
DatumKind::String => return None,
DatumKind::UInt64 => 8,
DatumKind::UInt32 => 4,
DatumKind::UInt16 => 2,
DatumKind::UInt8 => 1,
DatumKind::Int64 => 8,
DatumKind::Int32 => 4,
DatumKind::Int16 => 8,
DatumKind::Int8 => 8,
DatumKind::Boolean => 1,
};
Some(size)
}
}

impl TryFrom<&SqlDataType> for DatumKind {
Expand Down

0 comments on commit a6449be

Please sign in to comment.