From 8b6c32b625f6198c035d819a25109ed3527dc2b8 Mon Sep 17 00:00:00 2001 From: sundyli <543950155@qq.com> Date: Wed, 29 Mar 2023 07:09:20 -0700 Subject: [PATCH] chore(query): zero copy conversion with arrow-rs (#10818) * zero-copy conversion with arrow-rs * support metadata to arrow's schema * fix string type mapping to arrow's LargeBinary --- .../expression/src/convert_arrow_rs/array.rs | 225 ++++++++++-------- .../src/convert_arrow_rs/record_batch.rs | 4 +- .../expression/src/convert_arrow_rs/schema.rs | 62 ++++- src/query/expression/src/values.rs | 1 + .../it/servers/flight_sql/testdata/query.txt | 10 +- 5 files changed, 187 insertions(+), 115 deletions(-) diff --git a/src/query/expression/src/convert_arrow_rs/array.rs b/src/query/expression/src/convert_arrow_rs/array.rs index 549224ea343d5..b85aeedd74481 100644 --- a/src/query/expression/src/convert_arrow_rs/array.rs +++ b/src/query/expression/src/convert_arrow_rs/array.rs @@ -18,52 +18,42 @@ use arrow_array::make_array; use arrow_array::Array; use arrow_array::ArrowPrimitiveType; use arrow_array::BooleanArray; -use arrow_array::Int16Array; -use arrow_array::Int32Array; -use arrow_array::Int64Array; -use arrow_array::Int8Array; -use arrow_array::LargeStringArray; +use arrow_array::LargeBinaryArray; use arrow_array::NullArray; use arrow_array::PrimitiveArray; -use arrow_array::StringArray; -use arrow_array::UInt16Array; -use arrow_array::UInt32Array; -use arrow_array::UInt64Array; -use arrow_array::UInt8Array; -use arrow_buffer::i256; +use arrow_buffer::buffer::BooleanBuffer; +use arrow_buffer::buffer::NullBuffer; use arrow_buffer::ArrowNativeType; use arrow_buffer::Buffer; use arrow_data::ArrayData; use arrow_data::ArrayDataBuilder; use arrow_schema::ArrowError; use arrow_schema::DataType; +use arrow_schema::Field; use arrow_schema::TimeUnit; use common_arrow::arrow::bitmap::Bitmap; use common_arrow::arrow::buffer::Buffer as Buffer2; -use common_arrow::arrow::Either; +use common_arrow::arrow::types::NativeType; use ordered_float::OrderedFloat; use crate::types::decimal::DecimalColumn; use crate::types::nullable::NullableColumn; use crate::types::number::NumberColumn; use crate::types::string::StringColumn; +use crate::types::F32; +use crate::types::F64; use crate::Column; +use crate::ARROW_EXT_TYPE_EMPTY_ARRAY; +use crate::ARROW_EXT_TYPE_EMPTY_MAP; +use crate::ARROW_EXT_TYPE_VARIANT; +use crate::EXTENSION_KEY; -fn try_take_buffer(buffer: Buffer2) -> Vec { - // currently need a copy if buffer has more then 1 reference - match buffer.into_mut() { - Either::Left(b) => b.as_slice().to_vec(), - Either::Right(v) => v, - } -} - -fn numbers_into( +fn numbers_into( buf: Buffer2, data_type: DataType, ) -> Result, ArrowError> { - let v: Vec = try_take_buffer(buf); - let len = v.len(); - let buf = Buffer::from_vec(v); + let len = buf.len(); + let buf = Buffer::from(buf); let data = ArrayData::builder(data_type) .len(len) .offset(0) @@ -79,24 +69,35 @@ impl Column { Column::EmptyArray { len } => Arc::new(NullArray::new(len)), Column::EmptyMap { len } => Arc::new(NullArray::new(len)), Column::Boolean(bitmap) => { - let buf = bitmap.as_slice().0; + let len = bitmap.len(); + let null_buffer = NullBuffer::from(bitmap); let array_data = ArrayData::builder(DataType::Boolean) - .len(bitmap.len()) - .add_buffer(buf.into()) + .len(len) + .add_buffer(null_buffer.buffer().clone()) .build()?; Arc::new(BooleanArray::from(array_data)) } Column::String(col) => { let len = col.len(); - let values: Vec = try_take_buffer(col.data); - let offsets: Vec = try_take_buffer(col.offsets); - let offsets = Buffer::from_vec(offsets); - let array_data = ArrayData::builder(DataType::LargeUtf8) + let values = Buffer::from(col.data); + let offsets = Buffer::from(col.offsets); + let array_data = ArrayData::builder(DataType::LargeBinary) + .len(len) + .add_buffer(offsets) + .add_buffer(values); + let array_data = unsafe { array_data.build_unchecked() }; + Arc::new(LargeBinaryArray::from(array_data)) + } + Column::Variant(col) => { + let len = col.len(); + let values = Buffer::from(col.data); + let offsets = Buffer::from(col.offsets); + let array_data = ArrayData::builder(DataType::LargeBinary) .len(len) .add_buffer(offsets) - .add_buffer(values.into()); + .add_buffer(values); let array_data = unsafe { array_data.build_unchecked() }; - Arc::new(LargeStringArray::from(array_data)) + Arc::new(LargeBinaryArray::from(array_data)) } Column::Number(NumberColumn::UInt8(buf)) => { numbers_into::(buf, DataType::UInt8)? @@ -139,17 +140,12 @@ impl Column { )? } Column::Decimal(DecimalColumn::Decimal256(buf, size)) => { - let v: Vec = try_take_buffer(buf); // todo(youngsofun): arrow_rs use u128 for lo while arrow2 use i128, recheck it later. - let v: Vec = v - .into_iter() - .map(|i| { - let (hi, lo) = i.into_words(); - i256::from_parts(lo as u128, hi) - }) - .collect(); - let len = v.len(); - let buf = Buffer::from_vec(v); + let buf = unsafe { + std::mem::transmute::<_, Buffer2>(buf) + }; + let len = buf.len(); + let buf = Buffer::from(buf); let data = ArrayData::builder(DataType::Decimal256(size.precision, size.scale as i8)) .len(len) @@ -170,12 +166,13 @@ impl Column { numbers_into::(buf, DataType::Date32)? } Column::Nullable(col) => { - let arrow_array = col.column.into_arrow_rs()?; - let data = arrow_array.into_data(); - let buf = col.validity.as_slice().0; - let builder = ArrayDataBuilder::from(data); - // bitmap copied here - let data = builder.null_bit_buffer(Some(buf.into())).build()?; + let null_buffer = NullBuffer::from(col.validity); + let inner = col.column.into_arrow_rs()?; + let builder = ArrayDataBuilder::from(inner.data().clone()); + + let data = builder + .null_bit_buffer(Some(null_buffer.buffer().clone())) + .build()?; make_array(data) } _ => { @@ -188,97 +185,135 @@ impl Column { Ok(array) } - pub fn from_arrow_rs(array: Arc) -> Result { + pub fn from_arrow_rs(array: Arc, field: &Field) -> Result { + if let Some(extent) = field.metadata().get(EXTENSION_KEY).map(|v| v.as_str()) { + match extent { + ARROW_EXT_TYPE_EMPTY_ARRAY => return Ok(Column::EmptyArray { len: array.len() }), + ARROW_EXT_TYPE_EMPTY_MAP => return Ok(Column::EmptyMap { len: array.len() }), + _ => {} + } + } + let data_type = array.data_type(); + let data = array.data_ref(); let column = match data_type { DataType::Null => Column::Null { len: array.len() }, DataType::LargeUtf8 => { - let array = array.as_any().downcast_ref::().unwrap(); - let offsets = array.value_offsets().to_vec(); - let offsets = unsafe { std::mem::transmute::, Vec>(offsets) }; + let offsets = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[0].clone()) }; + let values = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[1].clone()) }; + + Column::Variant(StringColumn { + offsets, + data: values, + }) + } + + DataType::LargeBinary => { + let offsets = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[0].clone()) }; + let values = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[1].clone()) }; + + match field.metadata().get(EXTENSION_KEY).map(|v| v.as_str()) { + Some(ARROW_EXT_TYPE_VARIANT) => Column::Variant(StringColumn { + offsets, + data: values, + }), + _ => Column::String(StringColumn { + offsets, + data: values, + }), + } + } + + DataType::Utf8 => { + let offsets = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[0].clone()) }; + let offsets: Vec = offsets.iter().map(|x| *x as u64).collect::>(); + let values = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[1].clone()) }; + Column::String(StringColumn { offsets: offsets.into(), - data: array.value_data().to_vec().into(), + data: values, }) } - DataType::Utf8 => { - let array = array.as_any().downcast_ref::().unwrap(); - let offsets = array - .value_offsets() - .iter() - .map(|x| *x as u64) - .collect::>(); + + DataType::Binary => { + let offsets = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[0].clone()) }; + let offsets: Vec = offsets.iter().map(|x| *x as u64).collect::>(); + let values = + unsafe { std::mem::transmute::<_, Buffer2>(data.buffers()[1].clone()) }; + Column::String(StringColumn { offsets: offsets.into(), - data: array.value_data().to_vec().into(), + data: values, }) } DataType::Boolean => { - let array = array.as_any().downcast_ref::().unwrap(); - let bytes = array.values().clone().into_vec().map_err(|_| { - ArrowError::CastError( - "can not covert Buffer of BooleanArray to Vec".to_string(), - ) - })?; - let bitmap = Bitmap::try_new(bytes, array.len()).map_err(|e| { - ArrowError::CastError(format!( - "can not covert BooleanArray to Column::Boolean: {e:?}" - )) - })?; + let boolean_buffer = + BooleanBuffer::new(data.buffers()[0].clone(), data.offset(), data.len()); + + let null_buffer = NullBuffer::new(boolean_buffer); + + let bitmap = Bitmap::from_null_buffer(null_buffer); Column::Boolean(bitmap) } DataType::Int8 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::Int8(buffer2)) } DataType::UInt8 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::UInt8(buffer2)) } DataType::Int16 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::Int16(buffer2)) } DataType::UInt16 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::UInt16(buffer2)) } DataType::Int32 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::Int32(buffer2)) } DataType::UInt32 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::UInt32(buffer2)) } DataType::Int64 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::Int64(buffer2)) } DataType::UInt64 => { - let array = array.as_any().downcast_ref::().unwrap(); - let buffer2: Buffer2 = array.values().to_vec().into(); + let buffer2 = Buffer2::from(data.buffers()[0].clone()); Column::Number(NumberColumn::UInt64(buffer2)) } + DataType::Float32 => { + let buffer2: Buffer2 = Buffer2::from(data.buffers()[0].clone()); + let buffer = unsafe { std::mem::transmute::<_, Buffer2>(buffer2) }; + Column::Number(NumberColumn::Float32(buffer)) + } + + DataType::Float64 => { + let buffer2: Buffer2 = Buffer2::from(data.buffers()[0].clone()); + let buffer = unsafe { std::mem::transmute::<_, Buffer2>(buffer2) }; + + Column::Number(NumberColumn::Float64(buffer)) + } + _ => Err(ArrowError::NotYetImplemented(format!( "Column::from_arrow_rs() for {data_type} not implemented yet" )))?, }; if let Some(nulls) = array.into_data().nulls() { - let validity = - Bitmap::try_new(nulls.buffer().to_vec(), nulls.offset()).map_err(|e| { - ArrowError::CastError(format!( - "fail to cast arrow_rs::NullBuffer to arrow2::Bitmap: {e}" - )) - })?; + let validity = Bitmap::from_null_buffer(nulls.clone()); let column = NullableColumn { column, validity }; Ok(Column::Nullable(Box::new(column))) } else { diff --git a/src/query/expression/src/convert_arrow_rs/record_batch.rs b/src/query/expression/src/convert_arrow_rs/record_batch.rs index 0cd2d90d5a943..9f85dac86206d 100644 --- a/src/query/expression/src/convert_arrow_rs/record_batch.rs +++ b/src/query/expression/src/convert_arrow_rs/record_batch.rs @@ -34,8 +34,8 @@ impl DataBlock { pub fn from_record_batch(batch: &RecordBatch) -> Result<(Self, DataSchema), ArrowError> { let mut columns = Vec::with_capacity(batch.columns().len()); - for array in batch.columns() { - columns.push(Column::from_arrow_rs(array.clone())?) + for (array, field) in batch.columns().iter().zip(batch.schema().fields().iter()) { + columns.push(Column::from_arrow_rs(array.clone(), field)?) } let schema = DataSchema::try_from(&(*batch.schema()))?; Ok((DataBlock::new_from_columns(columns), schema)) diff --git a/src/query/expression/src/convert_arrow_rs/schema.rs b/src/query/expression/src/convert_arrow_rs/schema.rs index 23bf87cac93df..bf2627e65d0ca 100644 --- a/src/query/expression/src/convert_arrow_rs/schema.rs +++ b/src/query/expression/src/convert_arrow_rs/schema.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; + use arrow_schema::ArrowError; use arrow_schema::DataType as ArrowDataType; use arrow_schema::Field as ArrowField; @@ -25,14 +27,17 @@ use crate::types::NumberDataType; use crate::with_number_type; use crate::DataField; use crate::DataSchema; +use crate::ARROW_EXT_TYPE_EMPTY_ARRAY; +use crate::ARROW_EXT_TYPE_EMPTY_MAP; +use crate::ARROW_EXT_TYPE_VARIANT; +use crate::EXTENSION_KEY; impl From<&DataType> for ArrowDataType { fn from(ty: &DataType) -> Self { match ty { DataType::Null => ArrowDataType::Null, - DataType::Boolean => ArrowDataType::Boolean, - DataType::String => ArrowDataType::LargeUtf8, + DataType::String => ArrowDataType::LargeBinary, DataType::Number(ty) => with_number_type!(|TYPE| match ty { NumberDataType::TYPE => ArrowDataType::TYPE, }), @@ -111,12 +116,36 @@ fn set_nullable(ty: &ArrowDataType) -> ArrowDataType { impl From<&DataField> for ArrowField { fn from(f: &DataField) -> Self { let ty = f.data_type().into(); + + // TODO Nested metadata + let mut metadata = HashMap::new(); + match f.data_type() { + DataType::EmptyArray => { + metadata.insert( + EXTENSION_KEY.to_string(), + ARROW_EXT_TYPE_EMPTY_ARRAY.to_string(), + ); + } + DataType::EmptyMap => { + metadata.insert( + EXTENSION_KEY.to_string(), + ARROW_EXT_TYPE_EMPTY_MAP.to_string(), + ); + } + DataType::Variant => { + metadata.insert( + EXTENSION_KEY.to_string(), + ARROW_EXT_TYPE_VARIANT.to_string(), + ); + } + _ => Default::default(), + }; match ty { ArrowDataType::Struct(_) if f.is_nullable() => { let ty = set_nullable(&ty); - ArrowField::new(f.name(), ty, f.is_nullable()) + ArrowField::new(f.name(), ty, f.is_nullable()).with_metadata(metadata) } - _ => ArrowField::new(f.name(), ty, f.is_nullable()), + _ => ArrowField::new(f.name(), ty, f.is_nullable()).with_metadata(metadata), } } } @@ -134,12 +163,8 @@ impl TryFrom<&ArrowField> for DataField { type Error = ArrowError; fn try_from(f: &ArrowField) -> Result { - let ty = f.data_type().try_into()?; - if f.is_nullable() { - Ok(DataField::new_nullable(f.name().as_str(), ty)) - } else { - Ok(DataField::new(f.name(), ty)) - } + let ty = f.try_into()?; + Ok(DataField::new(f.name(), ty)) } } @@ -158,10 +183,18 @@ impl TryFrom<&ArrowSchema> for DataSchema { } } -impl TryFrom<&ArrowDataType> for DataType { +impl TryFrom<&ArrowField> for DataType { type Error = ArrowError; - fn try_from(ty: &ArrowDataType) -> Result { + fn try_from(f: &ArrowField) -> Result { + match f.metadata().get("Extension").map(|v| v.as_str()) { + Some(ARROW_EXT_TYPE_EMPTY_ARRAY) => return Ok(DataType::EmptyArray), + Some(ARROW_EXT_TYPE_EMPTY_MAP) => return Ok(DataType::EmptyMap), + Some(ARROW_EXT_TYPE_VARIANT) => return Ok(DataType::Variant), + _ => {} + } + + let ty = f.data_type(); let data_type = match ty { ArrowDataType::Null => DataType::Null, ArrowDataType::Boolean => DataType::Boolean, @@ -179,7 +212,10 @@ impl TryFrom<&ArrowDataType> for DataType { ArrowDataType::Float64 => DataType::Number(NumberDataType::Float64), ArrowDataType::Timestamp(_unit, _tz) => DataType::Timestamp, ArrowDataType::Date32 | ArrowDataType::Date64 => DataType::Date, - ArrowDataType::Utf8 | ArrowDataType::LargeUtf8 => DataType::String, + ArrowDataType::Utf8 + | ArrowDataType::LargeUtf8 + | ArrowDataType::Binary + | ArrowDataType::LargeBinary => DataType::String, ArrowDataType::Decimal128(p, s) => { DataType::Decimal(DecimalDataType::Decimal128(DecimalSize { precision: *p, diff --git a/src/query/expression/src/values.rs b/src/query/expression/src/values.rs index 32c0d96f14801..5934ecdaa4368 100755 --- a/src/query/expression/src/values.rs +++ b/src/query/expression/src/values.rs @@ -645,6 +645,7 @@ impl PartialEq for Column { } } +pub const EXTENSION_KEY: &str = "Extension"; pub const ARROW_EXT_TYPE_EMPTY_ARRAY: &str = "EmptyArray"; pub const ARROW_EXT_TYPE_EMPTY_MAP: &str = "EmptyMap"; pub const ARROW_EXT_TYPE_VARIANT: &str = "Variant"; diff --git a/src/query/service/tests/it/servers/flight_sql/testdata/query.txt b/src/query/service/tests/it/servers/flight_sql/testdata/query.txt index cf7728ac1730e..b29bee9542f72 100644 --- a/src/query/service/tests/it/servers/flight_sql/testdata/query.txt +++ b/src/query/service/tests/it/servers/flight_sql/testdata/query.txt @@ -1,8 +1,8 @@ ---------- Input ---------- select 1, 'abc', 1.1, 1.1::float32, 1::nullable(int) ---------- Output --------- -+---+-------+-----+--------------+---------------+ -| 1 | 'abc' | 1.1 | 1.1::float32 | 1::int32 null | -+---+-------+-----+--------------+---------------+ -| 1 | abc | 1.1 | 1.1 | 1 | -+---+-------+-----+--------------+---------------+ ++---+--------+-----+--------------+---------------+ +| 1 | 'abc' | 1.1 | 1.1::float32 | 1::int32 null | ++---+--------+-----+--------------+---------------+ +| 1 | 616263 | 1.1 | 1.1 | 1 | ++---+--------+-----+--------------+---------------+