Skip to content

Commit

Permalink
Merge pull request #4348 from b41sh/variant
Browse files Browse the repository at this point in the history
Support Semi-structured variant data type
  • Loading branch information
BohuTANG authored Mar 17, 2022
2 parents c11b7e2 + 8173f0a commit e0e704e
Show file tree
Hide file tree
Showing 42 changed files with 1,301 additions and 55 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/datavalues/src/columns/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,7 @@ where A: AsRef<dyn Array>
Array => Arc::new(ArrayColumn::from_arrow_array(self.as_ref())),
Struct => Arc::new(StructColumn::from_arrow_array(self.as_ref())),
String => Arc::new(StringColumn::from_arrow_array(self.as_ref())),
Variant => Arc::new(JsonColumn::from_arrow_array(self.as_ref())),
}
}

Expand Down
5 changes: 5 additions & 0 deletions common/datavalues/src/columns/eq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,12 @@ pub fn equal(lhs: &dyn Column, rhs: &dyn Column) -> bool {

lhs.values() == rhs.values()
}
Variant => {
let lhs: &JsonColumn = lhs.as_any().downcast_ref().unwrap();
let rhs: &JsonColumn = rhs.as_any().downcast_ref().unwrap();

lhs.values() == rhs.values()
}
other => with_match_physical_primitive_type_error!(other, |$T| {
let lhs: &PrimitiveColumn<$T> = lhs.as_any().downcast_ref().unwrap();
let rhs: &PrimitiveColumn<$T> = rhs.as_any().downcast_ref().unwrap();
Expand Down
3 changes: 3 additions & 0 deletions common/datavalues/src/columns/group_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,3 +293,6 @@ impl GroupHash for StringColumn {
Ok(())
}
}

// TODO(b41sh): implement GroupHash for JsonColumn
impl GroupHash for JsonColumn {}
2 changes: 2 additions & 0 deletions common/datavalues/src/columns/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ mod group_hash;
mod mutable;
mod null;
mod nullable;
mod object;
mod primitive;
pub mod series;
mod string;
Expand All @@ -42,6 +43,7 @@ pub use group_hash::GroupHash;
pub use mutable::*;
pub use null::*;
pub use nullable::*;
pub use object::*;
pub use primitive::*;
pub use series::*;
pub use string::*;
Expand Down
68 changes: 68 additions & 0 deletions common/datavalues/src/columns/object/iterator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::iter::TrustedLen;

use crate::prelude::*;

#[derive(Debug, Clone)]
pub struct ObjectValueIter<'a, T: ObjectType> {
column: &'a ObjectColumn<T>,
index: usize,
}

impl<'a, T: ObjectType> ObjectValueIter<'a, T> {
/// Creates a new [`ObjectValueIter`]
pub fn new(column: &'a ObjectColumn<T>) -> Self {
Self { column, index: 0 }
}
}

impl<'a, T> Iterator for ObjectValueIter<'a, T>
where T: Scalar + ObjectType
{
type Item = T::RefType<'a>;

#[inline]
fn next(&mut self) -> Option<Self::Item> {
let index = self.index;
if self.index >= self.column.len() {
return None;
} else {
self.index += 1;
}
self.column.values.get(index).map(|c| c.as_scalar_ref())
}

fn size_hint(&self) -> (usize, Option<usize>) {
(
self.column.len() - self.index,
Some(self.column.len() - self.index),
)
}
}

impl<'a, T: ObjectType> ExactSizeIterator for ObjectValueIter<'a, T> {
fn len(&self) -> usize {
self.column.len() - self.index
}
}

unsafe impl<T: ObjectType> TrustedLen for ObjectValueIter<'_, T> {}

impl<'a, T: ObjectType> ObjectColumn<T> {
pub fn iter(&'a self) -> ObjectValueIter<'a, T> {
ObjectValueIter::new(self)
}
}
242 changes: 242 additions & 0 deletions common/datavalues/src/columns/object/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,242 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod iterator;
mod mutable;

use std::sync::Arc;

use common_arrow::arrow::array::Array;
use common_arrow::arrow::bitmap::Bitmap;
use common_arrow::arrow::buffer::Buffer;
pub use iterator::*;
pub use mutable::*;
use serde_json::Value as JsonValue;

use crate::prelude::*;

/// ObjectColumn is a generic struct that wrapped any structure or enumeration,
/// such as JsonValue or BitMap.
#[derive(Clone)]
pub struct ObjectColumn<T: ObjectType> {
values: Vec<T>,
}

impl<T: ObjectType> From<LargeBinaryArray> for ObjectColumn<T> {
fn from(array: LargeBinaryArray) -> Self {
Self::new(array)
}
}

impl<T: ObjectType> ObjectColumn<T> {
pub fn new(array: LargeBinaryArray) -> Self {
let mut values: Vec<T> = Vec::with_capacity(array.len());
let offsets = array.offsets().as_slice();
let array_values = array.values().as_slice();
for i in 0..offsets.len() - 1 {
if let Some(validity) = array.validity() {
if let Some(is_set) = validity.get(i) {
if !is_set {
values.push(T::default());
continue;
}
}
}
let off = offsets[i] as usize;
let len = (offsets[i + 1] - offsets[i]) as usize;
let val = std::str::from_utf8(&array_values[off..off + len]).unwrap();
match T::from_str(val) {
Ok(v) => values.push(v),
Err(_) => values.push(T::default()),
}
}

Self { values }
}

pub fn from_arrow_array(array: &dyn Array) -> Self {
let array = array
.as_any()
.downcast_ref::<LargeBinaryArray>()
.expect("object cast should be ok");

Self::new(array.clone())
}

/// # Safety
/// Assumes that the `i < self.len`.
#[inline]
pub unsafe fn value_unchecked(&self, i: usize) -> T {
// soundness: the invariant of the function
self.values.get_unchecked(i).clone()
}

pub fn values(&self) -> &[T] {
self.values.as_slice()
}

/// Create a new DataArray by taking ownership of the Vec. This operation is zero copy.
pub fn new_from_vec(values: Vec<T>) -> Self {
Self { values }
}
}

impl<T: ObjectType> Column for ObjectColumn<T> {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn data_type(&self) -> DataTypePtr {
T::data_type()
}

fn len(&self) -> usize {
self.values.len()
}

fn validity(&self) -> (bool, Option<&Bitmap>) {
(false, None)
}

fn memory_size(&self) -> usize {
self.values.len() * std::mem::size_of::<T>()
}

fn as_arrow_array(&self) -> common_arrow::arrow::array::ArrayRef {
let mut offsets: Vec<i64> = Vec::with_capacity(self.values.len());
let mut values: Vec<u8> = Vec::with_capacity(self.values.len());

let mut offset: i64 = 0;
offsets.push(offset);
for val in &self.values {
let v = val.to_string();
values.extend(v.as_bytes().to_vec());
offset += v.len() as i64;
offsets.push(offset);
}

Arc::new(LargeBinaryArray::from_data(
self.data_type().arrow_type(),
Buffer::from_slice(offsets),
Buffer::from_slice(values),
None,
))
}

fn arc(&self) -> ColumnRef {
Arc::new(self.clone())
}

fn slice(&self, offset: usize, length: usize) -> ColumnRef {
let values = &self.values.clone()[offset..offset + length];
Arc::new(Self {
values: values.to_vec(),
})
}

fn filter(&self, filter: &BooleanColumn) -> ColumnRef {
let length = filter.values().len() - filter.values().null_count();
if length == self.len() {
return Arc::new(self.clone());
}
let iter = self
.values()
.iter()
.zip(filter.values().iter())
.filter(|(_, f)| *f)
.map(|(v, _)| v.clone());

let values: Vec<T> = iter.collect();
let col = ObjectColumn { values };

Arc::new(col)
}

fn scatter(&self, indices: &[usize], scattered_size: usize) -> Vec<ColumnRef> {
let mut builders = Vec::with_capacity(scattered_size);
for _i in 0..scattered_size {
builders.push(MutableObjectColumn::<T>::with_capacity(self.len()));
}

indices
.iter()
.zip(self.values())
.for_each(|(index, value)| {
builders[*index].append_value(value.clone());
});

builders.iter_mut().map(|b| b.to_column()).collect()
}

fn replicate(&self, offsets: &[usize]) -> ColumnRef {
debug_assert!(
offsets.len() == self.len(),
"Size of offsets must match size of column"
);

if offsets.is_empty() {
return self.slice(0, 0);
}

let mut builder =
MutableObjectColumn::<T>::with_capacity(*offsets.last().unwrap() as usize);

let mut previous_offset: usize = 0;

(0..self.len()).for_each(|i| {
let offset: usize = offsets[i];
let data = unsafe { self.value_unchecked(i) };
builder
.values
.extend(std::iter::repeat(data).take(offset - previous_offset));
previous_offset = offset;
});
builder.to_column()
}

fn convert_full_column(&self) -> ColumnRef {
Arc::new(self.clone())
}

fn get(&self, index: usize) -> DataValue {
self.values[index].clone().into()
}
}

impl<T> ScalarColumn for ObjectColumn<T>
where T: Scalar<ColumnType = Self> + ObjectType
{
type Builder = MutableObjectColumn<T>;
type OwnedItem = T;
type RefItem<'a> = <T as Scalar>::RefType<'a>;
type Iterator<'a> = ObjectValueIter<'a, T>;

#[inline]
fn get_data(&self, idx: usize) -> Self::RefItem<'_> {
self.values[idx].as_scalar_ref()
}

fn scalar_iter(&self) -> Self::Iterator<'_> {
ObjectValueIter::new(self)
}
}

pub type JsonColumn = ObjectColumn<JsonValue>;

impl<T: ObjectType> std::fmt::Debug for ObjectColumn<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// TODO(b41sh): implement display_fmt
write!(f, "ObjectColumn")
}
}
Loading

1 comment on commit e0e704e

@vercel
Copy link

@vercel vercel bot commented on e0e704e Mar 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please sign in to comment.