Skip to content

Commit

Permalink
chore(query): zero copy conversion with arrow-rs (#10818)
Browse files Browse the repository at this point in the history
* zero-copy conversion with arrow-rs

* support metadata to arrow's schema

* fix string type mapping to arrow's LargeBinary
  • Loading branch information
sundy-li authored Mar 29, 2023
1 parent 940cd59 commit 8b6c32b
Show file tree
Hide file tree
Showing 5 changed files with 187 additions and 115 deletions.
225 changes: 130 additions & 95 deletions src/query/expression/src/convert_arrow_rs/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,52 +18,42 @@ use arrow_array::make_array;
use arrow_array::Array;
use arrow_array::ArrowPrimitiveType;
use arrow_array::BooleanArray;
use arrow_array::Int16Array;
use arrow_array::Int32Array;
use arrow_array::Int64Array;
use arrow_array::Int8Array;
use arrow_array::LargeStringArray;
use arrow_array::LargeBinaryArray;
use arrow_array::NullArray;
use arrow_array::PrimitiveArray;
use arrow_array::StringArray;
use arrow_array::UInt16Array;
use arrow_array::UInt32Array;
use arrow_array::UInt64Array;
use arrow_array::UInt8Array;
use arrow_buffer::i256;
use arrow_buffer::buffer::BooleanBuffer;
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::ArrowNativeType;
use arrow_buffer::Buffer;
use arrow_data::ArrayData;
use arrow_data::ArrayDataBuilder;
use arrow_schema::ArrowError;
use arrow_schema::DataType;
use arrow_schema::Field;
use arrow_schema::TimeUnit;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::buffer::Buffer as Buffer2;
use common_arrow::arrow::Either;
use common_arrow::arrow::types::NativeType;
use ordered_float::OrderedFloat;

use crate::types::decimal::DecimalColumn;
use crate::types::nullable::NullableColumn;
use crate::types::number::NumberColumn;
use crate::types::string::StringColumn;
use crate::types::F32;
use crate::types::F64;
use crate::Column;
use crate::ARROW_EXT_TYPE_EMPTY_ARRAY;
use crate::ARROW_EXT_TYPE_EMPTY_MAP;
use crate::ARROW_EXT_TYPE_VARIANT;
use crate::EXTENSION_KEY;

fn try_take_buffer<T: Clone>(buffer: Buffer2<T>) -> Vec<T> {
// currently need a copy if buffer has more then 1 reference
match buffer.into_mut() {
Either::Left(b) => b.as_slice().to_vec(),
Either::Right(v) => v,
}
}

fn numbers_into<TN: ArrowNativeType, TA: ArrowPrimitiveType>(
fn numbers_into<TN: ArrowNativeType + NativeType, TA: ArrowPrimitiveType>(
buf: Buffer2<TN>,
data_type: DataType,
) -> Result<Arc<dyn Array>, ArrowError> {
let v: Vec<TN> = try_take_buffer(buf);
let len = v.len();
let buf = Buffer::from_vec(v);
let len = buf.len();
let buf = Buffer::from(buf);
let data = ArrayData::builder(data_type)
.len(len)
.offset(0)
Expand All @@ -79,24 +69,35 @@ impl Column {
Column::EmptyArray { len } => Arc::new(NullArray::new(len)),
Column::EmptyMap { len } => Arc::new(NullArray::new(len)),
Column::Boolean(bitmap) => {
let buf = bitmap.as_slice().0;
let len = bitmap.len();
let null_buffer = NullBuffer::from(bitmap);
let array_data = ArrayData::builder(DataType::Boolean)
.len(bitmap.len())
.add_buffer(buf.into())
.len(len)
.add_buffer(null_buffer.buffer().clone())
.build()?;
Arc::new(BooleanArray::from(array_data))
}
Column::String(col) => {
let len = col.len();
let values: Vec<u8> = try_take_buffer(col.data);
let offsets: Vec<u64> = try_take_buffer(col.offsets);
let offsets = Buffer::from_vec(offsets);
let array_data = ArrayData::builder(DataType::LargeUtf8)
let values = Buffer::from(col.data);
let offsets = Buffer::from(col.offsets);
let array_data = ArrayData::builder(DataType::LargeBinary)
.len(len)
.add_buffer(offsets)
.add_buffer(values);
let array_data = unsafe { array_data.build_unchecked() };
Arc::new(LargeBinaryArray::from(array_data))
}
Column::Variant(col) => {
let len = col.len();
let values = Buffer::from(col.data);
let offsets = Buffer::from(col.offsets);
let array_data = ArrayData::builder(DataType::LargeBinary)
.len(len)
.add_buffer(offsets)
.add_buffer(values.into());
.add_buffer(values);
let array_data = unsafe { array_data.build_unchecked() };
Arc::new(LargeStringArray::from(array_data))
Arc::new(LargeBinaryArray::from(array_data))
}
Column::Number(NumberColumn::UInt8(buf)) => {
numbers_into::<u8, arrow_array::types::UInt8Type>(buf, DataType::UInt8)?
Expand Down Expand Up @@ -139,17 +140,12 @@ impl Column {
)?
}
Column::Decimal(DecimalColumn::Decimal256(buf, size)) => {
let v: Vec<ethnum::i256> = try_take_buffer(buf);
// todo(youngsofun): arrow_rs use u128 for lo while arrow2 use i128, recheck it later.
let v: Vec<i256> = v
.into_iter()
.map(|i| {
let (hi, lo) = i.into_words();
i256::from_parts(lo as u128, hi)
})
.collect();
let len = v.len();
let buf = Buffer::from_vec(v);
let buf = unsafe {
std::mem::transmute::<_, Buffer2<common_arrow::arrow::types::i256>>(buf)
};
let len = buf.len();
let buf = Buffer::from(buf);
let data =
ArrayData::builder(DataType::Decimal256(size.precision, size.scale as i8))
.len(len)
Expand All @@ -170,12 +166,13 @@ impl Column {
numbers_into::<i32, arrow_array::types::Date32Type>(buf, DataType::Date32)?
}
Column::Nullable(col) => {
let arrow_array = col.column.into_arrow_rs()?;
let data = arrow_array.into_data();
let buf = col.validity.as_slice().0;
let builder = ArrayDataBuilder::from(data);
// bitmap copied here
let data = builder.null_bit_buffer(Some(buf.into())).build()?;
let null_buffer = NullBuffer::from(col.validity);
let inner = col.column.into_arrow_rs()?;
let builder = ArrayDataBuilder::from(inner.data().clone());

let data = builder
.null_bit_buffer(Some(null_buffer.buffer().clone()))
.build()?;
make_array(data)
}
_ => {
Expand All @@ -188,97 +185,135 @@ impl Column {
Ok(array)
}

pub fn from_arrow_rs(array: Arc<dyn Array>) -> Result<Self, ArrowError> {
pub fn from_arrow_rs(array: Arc<dyn Array>, field: &Field) -> Result<Self, ArrowError> {
if let Some(extent) = field.metadata().get(EXTENSION_KEY).map(|v| v.as_str()) {
match extent {
ARROW_EXT_TYPE_EMPTY_ARRAY => return Ok(Column::EmptyArray { len: array.len() }),
ARROW_EXT_TYPE_EMPTY_MAP => return Ok(Column::EmptyMap { len: array.len() }),
_ => {}
}
}

let data_type = array.data_type();
let data = array.data_ref();
let column = match data_type {
DataType::Null => Column::Null { len: array.len() },
DataType::LargeUtf8 => {
let array = array.as_any().downcast_ref::<LargeStringArray>().unwrap();
let offsets = array.value_offsets().to_vec();
let offsets = unsafe { std::mem::transmute::<Vec<i64>, Vec<u64>>(offsets) };
let offsets =
unsafe { std::mem::transmute::<_, Buffer2<u64>>(data.buffers()[0].clone()) };
let values =
unsafe { std::mem::transmute::<_, Buffer2<u8>>(data.buffers()[1].clone()) };

Column::Variant(StringColumn {
offsets,
data: values,
})
}

DataType::LargeBinary => {
let offsets =
unsafe { std::mem::transmute::<_, Buffer2<u64>>(data.buffers()[0].clone()) };
let values =
unsafe { std::mem::transmute::<_, Buffer2<u8>>(data.buffers()[1].clone()) };

match field.metadata().get(EXTENSION_KEY).map(|v| v.as_str()) {
Some(ARROW_EXT_TYPE_VARIANT) => Column::Variant(StringColumn {
offsets,
data: values,
}),
_ => Column::String(StringColumn {
offsets,
data: values,
}),
}
}

DataType::Utf8 => {
let offsets =
unsafe { std::mem::transmute::<_, Buffer2<i32>>(data.buffers()[0].clone()) };
let offsets: Vec<u64> = offsets.iter().map(|x| *x as u64).collect::<Vec<_>>();
let values =
unsafe { std::mem::transmute::<_, Buffer2<u8>>(data.buffers()[1].clone()) };

Column::String(StringColumn {
offsets: offsets.into(),
data: array.value_data().to_vec().into(),
data: values,
})
}
DataType::Utf8 => {
let array = array.as_any().downcast_ref::<StringArray>().unwrap();
let offsets = array
.value_offsets()
.iter()
.map(|x| *x as u64)
.collect::<Vec<_>>();

DataType::Binary => {
let offsets =
unsafe { std::mem::transmute::<_, Buffer2<i32>>(data.buffers()[0].clone()) };
let offsets: Vec<u64> = offsets.iter().map(|x| *x as u64).collect::<Vec<_>>();
let values =
unsafe { std::mem::transmute::<_, Buffer2<u8>>(data.buffers()[1].clone()) };

Column::String(StringColumn {
offsets: offsets.into(),
data: array.value_data().to_vec().into(),
data: values,
})
}
DataType::Boolean => {
let array = array.as_any().downcast_ref::<BooleanArray>().unwrap();
let bytes = array.values().clone().into_vec().map_err(|_| {
ArrowError::CastError(
"can not covert Buffer of BooleanArray to Vec".to_string(),
)
})?;
let bitmap = Bitmap::try_new(bytes, array.len()).map_err(|e| {
ArrowError::CastError(format!(
"can not covert BooleanArray to Column::Boolean: {e:?}"
))
})?;
let boolean_buffer =
BooleanBuffer::new(data.buffers()[0].clone(), data.offset(), data.len());

let null_buffer = NullBuffer::new(boolean_buffer);

let bitmap = Bitmap::from_null_buffer(null_buffer);
Column::Boolean(bitmap)
}
DataType::Int8 => {
let array = array.as_any().downcast_ref::<Int8Array>().unwrap();
let buffer2: Buffer2<i8> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::Int8(buffer2))
}
DataType::UInt8 => {
let array = array.as_any().downcast_ref::<UInt8Array>().unwrap();
let buffer2: Buffer2<u8> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::UInt8(buffer2))
}
DataType::Int16 => {
let array = array.as_any().downcast_ref::<Int16Array>().unwrap();
let buffer2: Buffer2<i16> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::Int16(buffer2))
}
DataType::UInt16 => {
let array = array.as_any().downcast_ref::<UInt16Array>().unwrap();
let buffer2: Buffer2<u16> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::UInt16(buffer2))
}
DataType::Int32 => {
let array = array.as_any().downcast_ref::<Int32Array>().unwrap();
let buffer2: Buffer2<i32> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::Int32(buffer2))
}
DataType::UInt32 => {
let array = array.as_any().downcast_ref::<UInt32Array>().unwrap();
let buffer2: Buffer2<u32> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::UInt32(buffer2))
}
DataType::Int64 => {
let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
let buffer2: Buffer2<i64> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::Int64(buffer2))
}
DataType::UInt64 => {
let array = array.as_any().downcast_ref::<UInt64Array>().unwrap();
let buffer2: Buffer2<u64> = array.values().to_vec().into();
let buffer2 = Buffer2::from(data.buffers()[0].clone());
Column::Number(NumberColumn::UInt64(buffer2))
}

DataType::Float32 => {
let buffer2: Buffer2<f32> = Buffer2::from(data.buffers()[0].clone());
let buffer = unsafe { std::mem::transmute::<_, Buffer2<F32>>(buffer2) };
Column::Number(NumberColumn::Float32(buffer))
}

DataType::Float64 => {
let buffer2: Buffer2<f32> = Buffer2::from(data.buffers()[0].clone());
let buffer = unsafe { std::mem::transmute::<_, Buffer2<F64>>(buffer2) };

Column::Number(NumberColumn::Float64(buffer))
}

_ => Err(ArrowError::NotYetImplemented(format!(
"Column::from_arrow_rs() for {data_type} not implemented yet"
)))?,
};
if let Some(nulls) = array.into_data().nulls() {
let validity =
Bitmap::try_new(nulls.buffer().to_vec(), nulls.offset()).map_err(|e| {
ArrowError::CastError(format!(
"fail to cast arrow_rs::NullBuffer to arrow2::Bitmap: {e}"
))
})?;
let validity = Bitmap::from_null_buffer(nulls.clone());
let column = NullableColumn { column, validity };
Ok(Column::Nullable(Box::new(column)))
} else {
Expand Down
4 changes: 2 additions & 2 deletions src/query/expression/src/convert_arrow_rs/record_batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ impl DataBlock {

pub fn from_record_batch(batch: &RecordBatch) -> Result<(Self, DataSchema), ArrowError> {
let mut columns = Vec::with_capacity(batch.columns().len());
for array in batch.columns() {
columns.push(Column::from_arrow_rs(array.clone())?)
for (array, field) in batch.columns().iter().zip(batch.schema().fields().iter()) {
columns.push(Column::from_arrow_rs(array.clone(), field)?)
}
let schema = DataSchema::try_from(&(*batch.schema()))?;
Ok((DataBlock::new_from_columns(columns), schema))
Expand Down
Loading

1 comment on commit 8b6c32b

@vercel
Copy link

@vercel vercel bot commented on 8b6c32b Mar 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Successfully deployed to the following URLs:

databend – ./

databend.rs
databend-git-main-databend.vercel.app
databend-databend.vercel.app
databend.vercel.app

Please sign in to comment.