From bcef2ec18b72d4487272b88d2a23769378bb61fb Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sun, 26 Sep 2021 19:35:52 +0000 Subject: [PATCH] Added support for MapArray. --- README.md | 3 +- .../tests/test_sql.py | 17 ++ integration-testing/unskip.patch | 29 ++- src/array/display.rs | 1 + src/array/equal/map.rs | 5 + src/array/equal/mod.rs | 6 + src/array/ffi.rs | 1 + src/array/growable/mod.rs | 2 +- src/array/list/mod.rs | 21 +- src/array/map/ffi.rs | 41 ++++ src/array/map/iterator.rs | 85 +++++++++ src/array/map/mod.rs | 180 ++++++++++++++++++ src/array/mod.rs | 6 + src/compute/aggregate/memory.rs | 1 + src/datatypes/field.rs | 1 + src/datatypes/mod.rs | 27 +++ src/datatypes/physical_type.rs | 2 + src/ffi/array.rs | 1 + src/ffi/ffi.rs | 21 +- src/ffi/schema.rs | 17 +- src/io/ipc/convert.rs | 19 ++ src/io/ipc/read/array/map.rs | 77 ++++++++ src/io/ipc/read/array/mod.rs | 2 + src/io/ipc/read/deserialize.rs | 12 ++ src/io/ipc/write/serialize.rs | 44 +++++ src/io/json_integration/read.rs | 22 +++ src/io/json_integration/schema.rs | 11 +- src/scalar/mod.rs | 2 +- tests/it/io/ipc/read/file.rs | 15 ++ tests/it/io/ipc/write/file.rs | 6 + 30 files changed, 643 insertions(+), 34 deletions(-) create mode 100644 src/array/equal/map.rs create mode 100644 src/array/map/ffi.rs create mode 100644 src/array/map/iterator.rs create mode 100644 src/array/map/mod.rs create mode 100644 src/io/ipc/read/array/map.rs diff --git a/README.md b/README.md index a7099f3fd29..7d52d0e8ffa 100644 --- a/README.md +++ b/README.md @@ -77,7 +77,6 @@ we also use the `0.x.y` versioning, since we are iterating over the API. ## Features in the original not available in this crate * Parquet read and write of struct and nested lists. -* Map type ## Features in this crate not in pyarrow @@ -86,7 +85,7 @@ we also use the `0.x.y` versioning, since we are iterating over the API. ## Features in pyarrow not in this crate -Too many to enumerate; e.g. nested dictionary arrays, map, nested parquet. +Too many to enumerate; e.g. nested dictionary arrays, nested parquet. ## FAQ diff --git a/arrow-pyarrow-integration-testing/tests/test_sql.py b/arrow-pyarrow-integration-testing/tests/test_sql.py index 2998e27a0b0..8aef60832cf 100644 --- a/arrow-pyarrow-integration-testing/tests/test_sql.py +++ b/arrow-pyarrow-integration-testing/tests/test_sql.py @@ -128,6 +128,23 @@ def test_dict(self): assert a.to_pylist() == b.to_pylist() assert a.type == b.type + def test_map(self): + """ + Python -> Rust -> Python + """ + offsets = [0, None, 2, 6] + pykeys = [b"a", b"b", b"c", b"d", b"e", b"f"] + pyitems = [1, 2, 3, None, 4, 5] + keys = pyarrow.array(pykeys, type="binary") + items = pyarrow.array(pyitems, type="i4") + + a = pyarrow.MapArray.from_arrays(offsets, keys, items) + b = arrow_pyarrow_integration_testing.round_trip_array(a) + + b.validate(full=True) + assert a.to_pylist() == b.to_pylist() + assert a.type == b.type + def test_sparse_union(self): """ Python -> Rust -> Python diff --git a/integration-testing/unskip.patch b/integration-testing/unskip.patch index 674bea2b154..225638a6481 100644 --- a/integration-testing/unskip.patch +++ b/integration-testing/unskip.patch @@ -1,5 +1,5 @@ diff --git a/dev/archery/archery/integration/datagen.py b/dev/archery/archery/integration/datagen.py -index d0c4b3d6c..936351c80 100644 +index d0c4b3d6c..a99b1a42e 100644 --- a/dev/archery/archery/integration/datagen.py +++ b/dev/archery/archery/integration/datagen.py @@ -1568,8 +1568,7 @@ def get_generated_json_files(tempdir=None): @@ -12,7 +12,7 @@ index d0c4b3d6c..936351c80 100644 generate_decimal256_case() .skip_category('Go') # TODO(ARROW-7948): Decimal + Go -@@ -1579,13 +1578,11 @@ def get_generated_json_files(tempdir=None): +@@ -1579,22 +1578,18 @@ def get_generated_json_files(tempdir=None): generate_datetime_case(), generate_interval_case() @@ -27,8 +27,19 @@ index d0c4b3d6c..936351c80 100644 + .skip_category('JS'), - generate_map_case() -@@ -1603,13 +1600,11 @@ def get_generated_json_files(tempdir=None): +- generate_map_case() +- .skip_category('Rust'), ++ generate_map_case(), + + generate_non_canonical_map_case() + .skip_category('Java') # TODO(ARROW-8715) +- .skip_category('JS') # TODO(ARROW-8716) +- .skip_category('Rust'), ++ .skip_category('JS'), # TODO(ARROW-8716) + + generate_nested_case(), + +@@ -1603,13 +1598,11 @@ def get_generated_json_files(tempdir=None): generate_nested_large_offsets_case() .skip_category('Go') @@ -44,3 +55,13 @@ index d0c4b3d6c..936351c80 100644 generate_custom_metadata_case() .skip_category('JS'), +@@ -1634,8 +1627,7 @@ def get_generated_json_files(tempdir=None): + + generate_extension_case() + .skip_category('Go') # TODO(ARROW-3039): requires dictionaries +- .skip_category('JS') +- .skip_category('Rust'), ++ .skip_category('JS'), + ] + + generated_paths = [] diff --git a/src/array/display.rs b/src/array/display.rs index c73e2b8d17f..915c02ea7f6 100644 --- a/src/array/display.rs +++ b/src/array/display.rs @@ -181,6 +181,7 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box Strin DataType::UInt64 => dyn_dict!(array, u64), _ => unreachable!(), }, + Map(_, _) => todo!(), Struct(_) => { let a = array.as_any().downcast_ref::().unwrap(); let displays = a diff --git a/src/array/equal/map.rs b/src/array/equal/map.rs new file mode 100644 index 00000000000..e150fb4a4b4 --- /dev/null +++ b/src/array/equal/map.rs @@ -0,0 +1,5 @@ +use crate::array::{Array, MapArray}; + +pub(super) fn equal(lhs: &MapArray, rhs: &MapArray) -> bool { + lhs.data_type() == rhs.data_type() && lhs.len() == rhs.len() && lhs.iter().eq(rhs.iter()) +} diff --git a/src/array/equal/mod.rs b/src/array/equal/mod.rs index c4acef75e8c..d85a450ccb6 100644 --- a/src/array/equal/mod.rs +++ b/src/array/equal/mod.rs @@ -8,6 +8,7 @@ mod dictionary; mod fixed_size_binary; mod fixed_size_list; mod list; +mod map; mod null; mod primitive; mod struct_; @@ -235,5 +236,10 @@ pub fn equal(lhs: &dyn Array, rhs: &dyn Array) -> bool { let rhs = rhs.as_any().downcast_ref().unwrap(); union::equal(lhs, rhs) } + Map => { + let lhs = lhs.as_any().downcast_ref().unwrap(); + let rhs = rhs.as_any().downcast_ref().unwrap(); + map::equal(lhs, rhs) + } } } diff --git a/src/array/ffi.rs b/src/array/ffi.rs index aeab1469313..0b32adaf4e1 100644 --- a/src/array/ffi.rs +++ b/src/array/ffi.rs @@ -61,6 +61,7 @@ pub fn buffers_children_dictionary(array: &dyn Array) -> BuffersChildren { FixedSizeList => ffi_dyn!(array, FixedSizeListArray), Struct => ffi_dyn!(array, StructArray), Union => ffi_dyn!(array, UnionArray), + Map => ffi_dyn!(array, MapArray), Dictionary(key_type) => { with_match_physical_dictionary_key_type!(key_type, |$T| { let array = array.as_any().downcast_ref::>().unwrap(); diff --git a/src/array/growable/mod.rs b/src/array/growable/mod.rs index f9c3bf6ce25..dc21b231818 100644 --- a/src/array/growable/mod.rs +++ b/src/array/growable/mod.rs @@ -202,7 +202,7 @@ pub fn make_growable<'a>( )) } FixedSizeList => todo!(), - Union => todo!(), + Union | Map => todo!(), Dictionary(key_type) => { with_match_physical_dictionary_key_type!(key_type, |$T| { dyn_dict_growable!($T, arrays, use_validity, capacity) diff --git a/src/array/list/mod.rs b/src/array/list/mod.rs index cb6440de33b..ecf21f454f0 100644 --- a/src/array/list/mod.rs +++ b/src/array/list/mod.rs @@ -120,19 +120,14 @@ impl ListArray { /// Returns the element at index `i` #[inline] pub fn value(&self, i: usize) -> Box { - if self.is_null(i) { - new_empty_array(self.values.data_type().clone()) - } else { - let offsets = self.offsets.as_slice(); - let offset = offsets[i]; - let offset_1 = offsets[i + 1]; - let length = (offset_1 - offset).to_usize(); - - // Safety: - // One of the invariants of the struct - // is that offsets are in bounds - unsafe { self.values.slice_unchecked(offset.to_usize(), length) } - } + let offset = self.offsets[i]; + let offset_1 = self.offsets[i + 1]; + let length = (offset_1 - offset).to_usize(); + + // Safety: + // One of the invariants of the struct + // is that offsets are in bounds + unsafe { self.values.slice_unchecked(offset.to_usize(), length) } } /// Returns the element at index `i` as &str diff --git a/src/array/map/ffi.rs b/src/array/map/ffi.rs new file mode 100644 index 00000000000..50b9039c351 --- /dev/null +++ b/src/array/map/ffi.rs @@ -0,0 +1,41 @@ +use std::sync::Arc; + +use crate::{array::FromFfi, error::Result, ffi}; + +use super::super::{ffi::ToFfi, Array}; +use super::MapArray; + +unsafe impl ToFfi for MapArray { + fn buffers(&self) -> Vec>> { + vec![ + self.validity.as_ref().map(|x| x.as_ptr()), + std::ptr::NonNull::new(self.offsets.as_ptr() as *mut u8), + ] + } + + fn offset(&self) -> usize { + self.offset + } + + fn children(&self) -> Vec> { + vec![self.field.clone()] + } +} + +impl FromFfi for MapArray { + unsafe fn try_from_ffi(array: A) -> Result { + let data_type = array.field().data_type().clone(); + let length = array.array().len(); + let offset = array.array().offset(); + let mut validity = unsafe { array.validity() }?; + let mut offsets = unsafe { array.buffer::(0) }?; + let child = array.child(0)?; + let values = ffi::try_from(child)?.into(); + + if offset > 0 { + offsets = offsets.slice(offset, length); + validity = validity.map(|x| x.slice(offset, length)) + } + Ok(Self::from_data(data_type, offsets, values, validity)) + } +} diff --git a/src/array/map/iterator.rs b/src/array/map/iterator.rs new file mode 100644 index 00000000000..ea8612597b9 --- /dev/null +++ b/src/array/map/iterator.rs @@ -0,0 +1,85 @@ +use crate::array::Array; +use crate::bitmap::utils::{zip_validity, ZipValidity}; +use crate::trusted_len::TrustedLen; + +use super::MapArray; + +/// Iterator of values of an [`ListArray`]. +#[derive(Clone, Debug)] +pub struct MapValuesIter<'a> { + array: &'a MapArray, + index: usize, + end: usize, +} + +impl<'a> MapValuesIter<'a> { + #[inline] + pub fn new(array: &'a MapArray) -> Self { + Self { + array, + index: 0, + end: array.len(), + } + } +} + +impl<'a> Iterator for MapValuesIter<'a> { + type Item = Box; + + #[inline] + fn next(&mut self) -> Option { + if self.index == self.end { + return None; + } + let old = self.index; + self.index += 1; + // Safety: + // self.end is maximized by the length of the array + Some(unsafe { self.array.value_unchecked(old) }) + } + + #[inline] + fn size_hint(&self) -> (usize, Option) { + (self.end - self.index, Some(self.end - self.index)) + } +} + +unsafe impl<'a> TrustedLen for MapValuesIter<'a> {} + +impl<'a> DoubleEndedIterator for MapValuesIter<'a> { + #[inline] + fn next_back(&mut self) -> Option { + if self.index == self.end { + None + } else { + self.end -= 1; + // Safety: + // self.end is maximized by the length of the array + Some(unsafe { self.array.value_unchecked(self.end) }) + } + } +} + +impl<'a> IntoIterator for &'a MapArray { + type Item = Option>; + type IntoIter = ZipValidity<'a, Box, MapValuesIter<'a>>; + + fn into_iter(self) -> Self::IntoIter { + self.iter() + } +} + +impl<'a> MapArray { + /// Returns an iterator of `Option>` + pub fn iter(&'a self) -> ZipValidity<'a, Box, MapValuesIter<'a>> { + zip_validity( + MapValuesIter::new(self), + self.validity.as_ref().map(|x| x.iter()), + ) + } + + /// Returns an iterator of `Box` + pub fn values_iter(&'a self) -> MapValuesIter<'a> { + MapValuesIter::new(self) + } +} diff --git a/src/array/map/mod.rs b/src/array/map/mod.rs new file mode 100644 index 00000000000..aa649f87bed --- /dev/null +++ b/src/array/map/mod.rs @@ -0,0 +1,180 @@ +use std::sync::Arc; + +use crate::{ + bitmap::Bitmap, + buffer::Buffer, + datatypes::{DataType, Field}, + types::Index, +}; + +use super::{new_empty_array, specification::check_offsets, Array}; + +mod ffi; +mod iterator; +pub use iterator::*; + +#[derive(Debug, Clone)] +pub struct MapArray { + data_type: DataType, + // invariant: field.len() == offsets.len() - 1 + offsets: Buffer, + field: Arc, + // invariant: offsets.len() - 1 == Bitmap::len() + validity: Option, + offset: usize, +} + +impl MapArray { + pub(crate) fn get_field(datatype: &DataType) -> &Field { + if let DataType::Map(field, _) = datatype.to_logical_type() { + field.as_ref() + } else { + panic!("MapArray expects `DataType::Map` logical type") + } + } + + pub fn new_null(data_type: DataType, length: usize) -> Self { + let field = new_empty_array(Self::get_field(&data_type).data_type().clone()).into(); + Self::from_data( + data_type, + Buffer::new_zeroed(length + 1), + field, + Some(Bitmap::new_zeroed(length)), + ) + } + + pub fn new_empty(data_type: DataType) -> Self { + let field = new_empty_array(Self::get_field(&data_type).data_type().clone()).into(); + Self::from_data(data_type, Buffer::from(&[0i32]), field, None) + } + + pub fn from_data( + data_type: DataType, + offsets: Buffer, + field: Arc, + validity: Option, + ) -> Self { + check_offsets(&offsets, field.len()); + + if let Some(ref validity) = validity { + assert_eq!(offsets.len() - 1, validity.len()); + } + + if let DataType::Struct(inner) = Self::get_field(&data_type).data_type() { + if inner.len() != 2 { + panic!("MapArray its inner `Struct` to have 2 fields (keys and maps)") + } + } else { + panic!("MapArray expects `DataType::Struct` as its inner logical type") + } + + Self { + data_type, + field, + offsets, + offset: 0, + validity, + } + } + + /// Returns a slice of this [`MapArray`]. + /// # Panics + /// panics iff `offset + length >= self.len()` + pub fn slice(&self, offset: usize, length: usize) -> Self { + assert!( + offset + length <= self.len(), + "the offset of the new Buffer cannot exceed the existing length" + ); + unsafe { self.slice_unchecked(offset, length) } + } + + /// Returns a slice of this [`MapArray`]. + /// # Safety + /// The caller must ensure that `offset + length < self.len()`. + pub unsafe fn slice_unchecked(&self, offset: usize, length: usize) -> Self { + let offsets = self.offsets.clone().slice_unchecked(offset, length + 1); + let validity = self + .validity + .clone() + .map(|x| x.slice_unchecked(offset, length)); + Self { + data_type: self.data_type.clone(), + offsets, + field: self.field.clone(), + validity, + offset: self.offset + offset, + } + } +} + +// Accessors +impl MapArray { + #[inline] + pub fn offsets(&self) -> &Buffer { + &self.offsets + } + + #[inline] + pub fn field(&self) -> &Arc { + &self.field + } + + /// Returns the element at index `i` + #[inline] + pub fn value(&self, i: usize) -> Box { + let offset = self.offsets[i]; + let offset_1 = self.offsets[i + 1]; + let length = (offset_1 - offset).to_usize(); + + // Safety: + // One of the invariants of the struct + // is that offsets are in bounds + unsafe { self.field.slice_unchecked(offset.to_usize(), length) } + } + + /// Returns the element at index `i` as &str + /// # Safety + /// Assumes that the `i < self.len`. + #[inline] + pub unsafe fn value_unchecked(&self, i: usize) -> Box { + let offset = *self.offsets.as_ptr().add(i); + let offset_1 = *self.offsets.as_ptr().add(i + 1); + let length = (offset_1 - offset).to_usize(); + + self.field.slice_unchecked(offset.to_usize(), length) + } +} + +impl Array for MapArray { + #[inline] + fn as_any(&self) -> &dyn std::any::Any { + self + } + + #[inline] + fn len(&self) -> usize { + self.offsets.len() - 1 + } + + #[inline] + fn data_type(&self) -> &DataType { + &self.data_type + } + + #[inline] + fn validity(&self) -> Option<&Bitmap> { + self.validity.as_ref() + } + + fn slice(&self, offset: usize, length: usize) -> Box { + Box::new(self.slice(offset, length)) + } + + unsafe fn slice_unchecked(&self, offset: usize, length: usize) -> Box { + Box::new(self.slice_unchecked(offset, length)) + } + + fn with_validity(&self, _validity: Option) -> Box { + Box::new(self.clone()) + } +} diff --git a/src/array/mod.rs b/src/array/mod.rs index 4fce51612c1..d3d7f916ec9 100644 --- a/src/array/mod.rs +++ b/src/array/mod.rs @@ -248,6 +248,7 @@ impl Display for dyn Array { fmt_dyn!(self, DictionaryArray::<$T>, f) }) } + Map => todo!(), } } } @@ -271,6 +272,7 @@ pub fn new_empty_array(data_type: DataType) -> Box { FixedSizeList => Box::new(FixedSizeListArray::new_empty(data_type)), Struct => Box::new(StructArray::new_empty(data_type)), Union => Box::new(UnionArray::new_empty(data_type)), + Map => todo!(), Dictionary(key_type) => { with_match_physical_dictionary_key_type!(key_type, |$T| { Box::new(DictionaryArray::<$T>::new_empty(data_type)) @@ -300,6 +302,7 @@ pub fn new_null_array(data_type: DataType, length: usize) -> Box { FixedSizeList => Box::new(FixedSizeListArray::new_null(data_type, length)), Struct => Box::new(StructArray::new_null(data_type, length)), Union => Box::new(UnionArray::new_null(data_type, length)), + Map => todo!(), Dictionary(key_type) => { with_match_physical_dictionary_key_type!(key_type, |$T| { Box::new(DictionaryArray::<$T>::new_null(data_type, length)) @@ -337,6 +340,7 @@ pub fn clone(array: &dyn Array) -> Box { FixedSizeList => clone_dyn!(array, FixedSizeListArray), Struct => clone_dyn!(array, StructArray), Union => clone_dyn!(array, UnionArray), + Map => todo!(), Dictionary(key_type) => { with_match_physical_dictionary_key_type!(key_type, |$T| { clone_dyn!(array, DictionaryArray::<$T>) @@ -352,6 +356,7 @@ mod display; mod fixed_size_binary; mod fixed_size_list; mod list; +mod map; mod null; mod primitive; mod specification; @@ -373,6 +378,7 @@ pub use dictionary::{DictionaryArray, DictionaryKey, MutableDictionaryArray}; pub use fixed_size_binary::{FixedSizeBinaryArray, MutableFixedSizeBinaryArray}; pub use fixed_size_list::{FixedSizeListArray, MutableFixedSizeListArray}; pub use list::{ListArray, MutableListArray}; +pub use map::MapArray; pub use null::NullArray; pub use primitive::*; pub use specification::Offset; diff --git a/src/compute/aggregate/memory.rs b/src/compute/aggregate/memory.rs index 33039ecf4e2..12d2bdbb3e3 100644 --- a/src/compute/aggregate/memory.rs +++ b/src/compute/aggregate/memory.rs @@ -109,5 +109,6 @@ pub fn estimated_bytes_size(array: &dyn Array) -> usize { Dictionary(key_type) => with_match_physical_dictionary_key_type!(key_type, |$T| { dyn_dict!(array, $T) }), + Map => todo!(), } } diff --git a/src/datatypes/field.rs b/src/datatypes/field.rs index 41ea83d1003..af18073070a 100644 --- a/src/datatypes/field.rs +++ b/src/datatypes/field.rs @@ -256,6 +256,7 @@ impl Field { | DataType::Utf8 | DataType::LargeUtf8 | DataType::Extension(_, _, _) + | DataType::Map(_, _) | DataType::Decimal(_, _) => { if self.data_type != from.data_type { return Err(ArrowError::Schema( diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index 01c62e5c55f..3d32479ff23 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -93,6 +93,32 @@ pub enum DataType { /// A nested datatype that can represent slots of differing types. /// Third argument represents sparsness Union(Vec, Option>, bool), + /// A nested type that is represented as + /// + /// List> + /// + /// In this layout, the keys and values are each respectively contiguous. We do + /// not constrain the key and value types, so the application is responsible + /// for ensuring that the keys are hashable and unique. Whether the keys are sorted + /// may be set in the metadata for this field. + /// + /// In a field with Map type, the field has a child Struct field, which then + /// has two children: key type and the second the value type. The names of the + /// child fields may be respectively "entries", "key", and "value", but this is + /// not enforced. + /// + /// Map + /// ```text + /// - child[0] entries: Struct + /// - child[0] key: K + /// - child[1] value: V + /// ``` + /// Neither the "entries" field nor the "key" field may be nullable. + /// + /// The metadata is structured so that Arrow systems without special handling + /// for Map can make Map an alias for List. The "layout" attribute for the Map + /// field must have the same contents as a List. + Map(Box, bool), /// A dictionary encoded array (`key_type`, `value_type`), where /// each array element is an index of `key_type` into an /// associated dictionary of `value_type`. @@ -210,6 +236,7 @@ impl DataType { LargeList(_) => PhysicalType::LargeList, Struct(_) => PhysicalType::Struct, Union(_, _, _) => PhysicalType::Union, + Map(_, _) => PhysicalType::Map, Dictionary(key, _) => PhysicalType::Dictionary(to_dictionary_index_type(key.as_ref())), Extension(_, key, _) => key.to_physical_type(), } diff --git a/src/datatypes/physical_type.rs b/src/datatypes/physical_type.rs index aee6f99d122..d950c0b426f 100644 --- a/src/datatypes/physical_type.rs +++ b/src/datatypes/physical_type.rs @@ -29,6 +29,8 @@ pub enum PhysicalType { Struct, /// A nested type that represents slots of differing types. Union, + /// A nested type. + Map, /// A dictionary encoded array by `DictionaryIndexType`. Dictionary(DictionaryIndexType), } diff --git a/src/ffi/array.rs b/src/ffi/array.rs index 9bb83908e80..5b93301fdc0 100644 --- a/src/ffi/array.rs +++ b/src/ffi/array.rs @@ -30,6 +30,7 @@ pub unsafe fn try_from(array: A) -> Result> { }) } Union => Box::new(UnionArray::try_from_ffi(array)?), + Map => Box::new(MapArray::try_from_ffi(array)?), data_type => { return Err(ArrowError::NotYetImplemented(format!( "Importing PhysicalType \"{:?}\" is not yet supported.", diff --git a/src/ffi/ffi.rs b/src/ffi/ffi.rs index cbb08f2fa6e..70b00c25e3f 100644 --- a/src/ffi/ffi.rs +++ b/src/ffi/ffi.rs @@ -24,7 +24,7 @@ use crate::{ bytes::{Bytes, Deallocation}, Buffer, }, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, PhysicalType}, error::{ArrowError, Result}, ffi::schema::get_field_child, types::NativeType, @@ -241,17 +241,18 @@ unsafe fn create_bitmap( // for variable-sized buffers, such as the second buffer of a stringArray, we need // to fetch offset buffer's len to build the second buffer. fn buffer_len(array: &Ffi_ArrowArray, data_type: &DataType, i: usize) -> Result { - Ok(match (data_type, i) { - (DataType::Utf8, 1) - | (DataType::LargeUtf8, 1) - | (DataType::Binary, 1) - | (DataType::LargeBinary, 1) - | (DataType::List(_), 1) - | (DataType::LargeList(_), 1) => { + Ok(match (data_type.to_physical_type(), i) { + (PhysicalType::Utf8, 1) + | (PhysicalType::LargeUtf8, 1) + | (PhysicalType::Binary, 1) + | (PhysicalType::LargeBinary, 1) + | (PhysicalType::List, 1) + | (PhysicalType::LargeList, 1) + | (PhysicalType::Map, 1) => { // the len of the offset buffer (buffer 1) equals length + 1 array.length as usize + 1 } - (DataType::Utf8, 2) | (DataType::Binary, 2) => { + (PhysicalType::Utf8, 2) | (PhysicalType::Binary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = buffer_len(array, data_type, 1)?; // first buffer is the null buffer => add(1) @@ -261,7 +262,7 @@ fn buffer_len(array: &Ffi_ArrowArray, data_type: &DataType, i: usize) -> Result< // get last offset (unsafe { *offset_buffer.add(len - 1) }) as usize } - (DataType::LargeUtf8, 2) | (DataType::LargeBinary, 2) => { + (PhysicalType::LargeUtf8, 2) | (PhysicalType::LargeBinary, 2) => { // the len of the data buffer (buffer 2) equals the last value of the offset buffer (buffer 1) let len = buffer_len(array, data_type, 1)?; // first buffer is the null buffer => add(1) diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 41b062d4996..578d5a49a20 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -56,6 +56,8 @@ impl Ffi_ArrowSchema { let format = to_format(field.data_type()); let name = field.name().clone(); + let mut flags = field.is_nullable() as i64 * 2; + // allocate (and hold) the children let children_vec = match field.data_type() { DataType::List(field) => { @@ -64,6 +66,10 @@ impl Ffi_ArrowSchema { DataType::LargeList(field) => { vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] } + DataType::Map(field, is_sorted) => { + flags += (*is_sorted as i64) * 4; + vec![Box::new(Ffi_ArrowSchema::new(field.as_ref()))] + } DataType::Struct(fields) => fields .iter() .map(|field| Box::new(Ffi_ArrowSchema::new(field))) @@ -81,9 +87,8 @@ impl Ffi_ArrowSchema { .collect::>(); let n_children = children_ptr.len() as i64; - let flags = field.is_nullable() as i64 * 2; - let dictionary = if let DataType::Dictionary(_, values) = field.data_type() { + flags += field.dict_is_ordered().unwrap_or_default() as i64; // we do not store field info in the dict values, so can't recover it all :( let field = Field::new("", values.as_ref().clone(), true); Some(Box::new(Ffi_ArrowSchema::new(&field))) @@ -263,6 +268,12 @@ unsafe fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { let child = schema.child(0); DataType::LargeList(Box::new(to_field(child)?)) } + "+m" => { + let child = schema.child(0); + + let is_sorted = (schema.flags & 4) != 0; + DataType::Map(Box::new(to_field(child)?), is_sorted) + } "+s" => { let children = (0..schema.n_children as usize) .map(|x| to_field(schema.child(x))) @@ -399,6 +410,7 @@ fn to_format(data_type: &DataType) -> String { r.push_str(ids); r } + DataType::Map(_, _) => "+m".to_string(), DataType::Dictionary(index, _) => to_format(index.as_ref()), DataType::Extension(_, inner, _) => to_format(inner.as_ref()), } @@ -408,6 +420,7 @@ pub(super) fn get_field_child(field: &Field, index: usize) -> Result { match (index, field.data_type()) { (0, DataType::List(field)) => Ok(field.as_ref().clone()), (0, DataType::LargeList(field)) => Ok(field.as_ref().clone()), + (0, DataType::Map(field, _)) => Ok(field.as_ref().clone()), (index, DataType::Struct(fields)) => Ok(fields[index].clone()), (index, DataType::Union(fields, _, _)) => Ok(fields[index].clone()), (child, data_type) => Err(ArrowError::Ffi(format!( diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index c42b72e041f..73706853399 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -307,6 +307,14 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo }; DataType::Union(fields, ids, is_sparse) } + ipc::Type::Map => { + let map = field.type_as_map().unwrap(); + let children = field.children().unwrap(); + if children.len() != 1 { + panic!("expect a map to have one child") + } + DataType::Map(Box::new(children.get(0).into()), map.keysSorted()) + } t => unimplemented!("Type {:?} not supported", t), } } @@ -440,6 +448,7 @@ fn type_to_field_type(data_type: &DataType) -> ipc::Type { LargeList(_) => ipc::Type::LargeList, FixedSizeList(_, _) => ipc::Type::FixedSizeList, Union(_, _, _) => ipc::Type::Union, + Map(_, _) => ipc::Type::Map, Struct(_) => ipc::Type::Struct_, Dictionary(_, v) => type_to_field_type(v), Extension(_, v, _) => type_to_field_type(v), @@ -716,6 +725,16 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&children)), } } + Map(field, keys_sorted) => { + let child = build_field(fbb, field); + let mut field_type = ipc::MapBuilder::new(fbb); + field_type.add_keysSorted(*keys_sorted); + FbFieldType { + type_type: ipc::Type::Map, + type_: field_type.finish().as_union_value(), + children: Some(fbb.create_vector(&[child])), + } + } } } diff --git a/src/io/ipc/read/array/map.rs b/src/io/ipc/read/array/map.rs new file mode 100644 index 00000000000..c5281022554 --- /dev/null +++ b/src/io/ipc/read/array/map.rs @@ -0,0 +1,77 @@ +use std::collections::VecDeque; +use std::io::{Read, Seek}; + +use gen::Schema::MetadataVersion; + +use crate::array::MapArray; +use crate::buffer::Buffer; +use crate::datatypes::DataType; +use crate::error::Result; +use crate::io::ipc::gen::Message::BodyCompression; + +use super::super::super::gen; +use super::super::deserialize::{read, skip, Node}; +use super::super::read_basic::*; + +#[allow(clippy::too_many_arguments)] +pub fn read_map( + field_nodes: &mut VecDeque, + data_type: DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, + reader: &mut R, + block_offset: u64, + is_little_endian: bool, + compression: Option, + version: MetadataVersion, +) -> Result { + let field_node = field_nodes.pop_front().unwrap().0; + + let validity = read_validity( + buffers, + field_node, + reader, + block_offset, + is_little_endian, + compression, + )?; + + let offsets = read_buffer::( + buffers, + 1 + field_node.length() as usize, + reader, + block_offset, + is_little_endian, + compression, + ) + // Older versions of the IPC format sometimes do not report an offset + .or_else(|_| Result::Ok(Buffer::::from(&[0i32])))?; + + let value_data_type = MapArray::get_field(&data_type).data_type().clone(); + + let field = read( + field_nodes, + value_data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + version, + )?; + Ok(MapArray::from_data(data_type, offsets, field, validity)) +} + +pub fn skip_map( + field_nodes: &mut VecDeque, + data_type: &DataType, + buffers: &mut VecDeque<&gen::Schema::Buffer>, +) { + let _ = field_nodes.pop_front().unwrap(); + + let _ = buffers.pop_front().unwrap(); + let _ = buffers.pop_front().unwrap(); + + let data_type = MapArray::get_field(data_type).data_type(); + + skip(field_nodes, data_type, buffers) +} diff --git a/src/io/ipc/read/array/mod.rs b/src/io/ipc/read/array/mod.rs index 0dd2610510e..249e5e05e16 100644 --- a/src/io/ipc/read/array/mod.rs +++ b/src/io/ipc/read/array/mod.rs @@ -20,3 +20,5 @@ mod dictionary; pub use dictionary::*; mod union; pub use union::*; +mod map; +pub use map::*; diff --git a/src/io/ipc/read/deserialize.rs b/src/io/ipc/read/deserialize.rs index d32904115cb..82def73ce38 100644 --- a/src/io/ipc/read/deserialize.rs +++ b/src/io/ipc/read/deserialize.rs @@ -186,6 +186,17 @@ pub fn read( version, ) .map(|x| Arc::new(x) as Arc), + Map => read_map( + field_nodes, + data_type, + buffers, + reader, + block_offset, + is_little_endian, + compression, + version, + ) + .map(|x| Arc::new(x) as Arc), } } @@ -208,5 +219,6 @@ pub fn skip( Struct => skip_struct(field_nodes, data_type, buffers), Dictionary(_) => skip_dictionary(field_nodes, buffers), Union => skip_union(field_nodes, data_type, buffers), + Map => skip_map(field_nodes, data_type, buffers), } } diff --git a/src/io/ipc/write/serialize.rs b/src/io/ipc/write/serialize.rs index e21d0a57592..a4e88a9ca98 100644 --- a/src/io/ipc/write/serialize.rs +++ b/src/io/ipc/write/serialize.rs @@ -260,6 +260,47 @@ pub fn write_union( }); } +fn write_map( + array: &dyn Array, + buffers: &mut Vec, + arrow_data: &mut Vec, + nodes: &mut Vec, + offset: &mut i64, + is_little_endian: bool, +) { + let array = array.as_any().downcast_ref::().unwrap(); + let offsets = array.offsets(); + let validity = array.validity(); + + write_bitmap(validity, offsets.len() - 1, buffers, arrow_data, offset); + + let first = *offsets.first().unwrap(); + let last = *offsets.last().unwrap(); + if first == 0 { + write_buffer(offsets, buffers, arrow_data, offset, is_little_endian); + } else { + write_buffer_from_iter( + offsets.iter().map(|x| *x - first), + buffers, + arrow_data, + offset, + is_little_endian, + ); + } + + write( + array + .field() + .slice(first as usize, last as usize - first as usize) + .as_ref(), + buffers, + arrow_data, + nodes, + offset, + is_little_endian, + ); +} + fn write_fixed_size_list( array: &dyn Array, buffers: &mut Vec, @@ -380,6 +421,9 @@ pub fn write( Union => { write_union(array, buffers, arrow_data, nodes, offset, is_little_endian); } + Map => { + write_map(array, buffers, arrow_data, nodes, offset, is_little_endian); + } } } diff --git a/src/io/json_integration/read.rs b/src/io/json_integration/read.rs index 6a6cdfd7de0..e24ae528266 100644 --- a/src/io/json_integration/read.rs +++ b/src/io/json_integration/read.rs @@ -227,6 +227,27 @@ fn to_list( ))) } +fn to_map( + json_col: &ArrowJsonColumn, + data_type: DataType, + dictionaries: &HashMap, +) -> Result> { + let validity = to_validity(&json_col.validity); + + let child_field = MapArray::get_field(&data_type); + let children = &json_col.children.as_ref().unwrap()[0]; + let field = to_array( + child_field.data_type().clone(), + child_field.dict_id(), + children, + dictionaries, + )?; + let offsets = to_offsets::(json_col.offset.as_ref()); + Ok(Arc::new(MapArray::from_data( + data_type, offsets, field, validity, + ))) +} + fn to_dictionary( data_type: DataType, dict_id: i64, @@ -400,6 +421,7 @@ pub fn to_array( let array = UnionArray::from_data(data_type, types, fields, offsets); Ok(Arc::new(array)) } + Map => to_map(json_col, data_type, dictionaries), } } diff --git a/src/io/json_integration/schema.rs b/src/io/json_integration/schema.rs index 12b167401c3..f7d48993ec0 100644 --- a/src/io/json_integration/schema.rs +++ b/src/io/json_integration/schema.rs @@ -57,6 +57,7 @@ impl ToJson for DataType { } DataType::Struct(_) => json!({"name": "struct"}), DataType::Union(_, _, _) => json!({"name": "union"}), + DataType::Map(_, _) => json!({"name": "map"}), DataType::List(_) => json!({ "name": "list"}), DataType::LargeList(_) => json!({ "name": "largelist"}), DataType::FixedSizeList(_, length) => { @@ -406,8 +407,16 @@ fn to_data_type(item: &Value, mut children: Vec) -> Result { }; DataType::Union(children, ids, is_sparse) } + "map" => { + let sorted_keys = if let Some(Value::Bool(sorted_keys)) = item.get("keysSorted") { + *sorted_keys + } else { + return Err(ArrowError::Schema("union requires mode".to_string())); + }; + DataType::Map(Box::new(children.pop().unwrap()), sorted_keys) + } other => { - return Err(ArrowError::Schema(format!( + return Err(ArrowError::NotYetImplemented(format!( "invalid json value type \"{}\"", other ))) diff --git a/src/scalar/mod.rs b/src/scalar/mod.rs index ad5e1acf1c3..7f2a534d8f6 100644 --- a/src/scalar/mod.rs +++ b/src/scalar/mod.rs @@ -115,7 +115,7 @@ pub fn new_scalar(array: &dyn Array, index: usize) -> Box { } FixedSizeBinary => todo!(), FixedSizeList => todo!(), - Union => todo!(), + Union | Map => todo!(), Dictionary(_) => todo!(), } } diff --git a/tests/it/io/ipc/read/file.rs b/tests/it/io/ipc/read/file.rs index cf1920ab252..4a1fe37341d 100644 --- a/tests/it/io/ipc/read/file.rs +++ b/tests/it/io/ipc/read/file.rs @@ -12,6 +12,9 @@ fn test_file(version: &str, file_name: &str) -> Result<()> { testdata, version, file_name ))?; + // read expected JSON output + let (schema, batches) = read_gzip_json(version, file_name)?; + let metadata = read_file_metadata(&mut file)?; let reader = FileReader::new(&mut file, metadata, None); @@ -122,6 +125,18 @@ fn read_generated_100_extension() -> Result<()> { test_file("1.0.0-littleendian", "generated_extension") } +#[test] +fn read_generated_100_map() -> Result<()> { + test_file("1.0.0-littleendian", "generated_map")?; + test_file("1.0.0-bigendian", "generated_map") +} + +#[test] +fn read_generated_100_non_canonical_map() -> Result<()> { + test_file("1.0.0-littleendian", "generated_map_non_canonical")?; + test_file("1.0.0-bigendian", "generated_map_non_canonical") +} + #[test] fn read_generated_017_union() -> Result<()> { test_file("0.17.1", "generated_union") diff --git a/tests/it/io/ipc/write/file.rs b/tests/it/io/ipc/write/file.rs index 005b51403ae..f952a3e61d2 100644 --- a/tests/it/io/ipc/write/file.rs +++ b/tests/it/io/ipc/write/file.rs @@ -176,6 +176,12 @@ fn write_100_union() -> Result<()> { test_file("1.0.0-bigendian", "generated_union") } +#[test] +fn write_100_map() -> Result<()> { + test_file("1.0.0-littleendian", "generated_map")?; + test_file("1.0.0-bigendian", "generated_map") +} + #[test] fn write_generated_017_union() -> Result<()> { test_file("0.17.1", "generated_union")