From bb9f040f044a86cd3f6073bb9ee24aa06d709468 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Mon, 6 Mar 2023 16:53:50 +0100 Subject: [PATCH] add boolean, date, timestamp & binary partition types (#1180) # Description Adds boolean, date, timestamp & binary partition value types # Related Issue(s) closes #1170 --------- Signed-off-by: Marijn Valk Co-authored-by: Will Jones --- rust/src/writer/utils.rs | 128 ++++++++++++++++++++++++++++++++++++--- 1 file changed, 121 insertions(+), 7 deletions(-) diff --git a/rust/src/writer/utils.rs b/rust/src/writer/utils.rs index e7af707344..fb2002819a 100644 --- a/rust/src/writer/utils.rs +++ b/rust/src/writer/utils.rs @@ -7,10 +7,14 @@ use std::sync::Arc; use crate::writer::DeltaWriterError; use crate::DeltaTableError; -use arrow::array::{as_primitive_array, Array}; +use arrow::array::{ + as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array, +}; use arrow::datatypes::{ - DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema, - SchemaRef as ArrowSchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type, + DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type, + Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType, + TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type, + UInt64Type, UInt8Type, }; use arrow::json::reader::{Decoder, DecoderOptions}; use arrow::record_batch::*; @@ -20,6 +24,8 @@ use serde_json::Value; use uuid::Uuid; const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__"; +const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d"; +const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S"; #[derive(Debug, Clone, Hash, PartialEq, Eq)] pub(crate) struct PartitionPath { @@ -137,11 +143,52 @@ pub(crate) fn stringified_partition_value( DataType::UInt16 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt32 => as_primitive_array::(arr).value(0).to_string(), DataType::UInt64 => as_primitive_array::(arr).value(0).to_string(), - DataType::Utf8 => { - let data = arrow::array::as_string_array(arr); - - data.value(0).to_string() + DataType::Utf8 => as_string_array(arr).value(0).to_string(), + DataType::Boolean => as_boolean_array(arr).value(0).to_string(), + DataType::Date32 => as_primitive_array::(arr) + .value_as_date(0) + .unwrap() + .format(PARTITION_DATE_FORMAT) + .to_string(), + DataType::Date64 => as_primitive_array::(arr) + .value_as_date(0) + .unwrap() + .format(PARTITION_DATE_FORMAT) + .to_string(), + DataType::Timestamp(TimeUnit::Second, _) => as_primitive_array::(arr) + .value_as_datetime(0) + .unwrap() + .format(PARTITION_DATETIME_FORMAT) + .to_string(), + DataType::Timestamp(TimeUnit::Millisecond, _) => { + as_primitive_array::(arr) + .value_as_datetime(0) + .unwrap() + .format(PARTITION_DATETIME_FORMAT) + .to_string() + } + DataType::Timestamp(TimeUnit::Microsecond, _) => { + as_primitive_array::(arr) + .value_as_datetime(0) + .unwrap() + .format(PARTITION_DATETIME_FORMAT) + .to_string() } + DataType::Timestamp(TimeUnit::Nanosecond, _) => { + as_primitive_array::(arr) + .value_as_datetime(0) + .unwrap() + .format(PARTITION_DATETIME_FORMAT) + .to_string() + } + DataType::Binary => as_generic_binary_array::(arr) + .value(0) + .escape_ascii() + .to_string(), + DataType::LargeBinary => as_generic_binary_array::(arr) + .value(0) + .escape_ascii() + .to_string(), // TODO: handle more types _ => { unimplemented!("Unimplemented data type: {:?}", data_type); @@ -235,3 +282,70 @@ impl Write for ShareableBuffer { (*inner).flush() } } + +#[cfg(test)] +mod tests { + use super::*; + use arrow::array::{ + BinaryArray, BooleanArray, Date32Array, Date64Array, Int16Array, Int32Array, Int64Array, + Int8Array, LargeBinaryArray, StringArray, TimestampMicrosecondArray, + TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array, + UInt32Array, UInt64Array, UInt8Array, + }; + + #[test] + fn test_stringified_partition_value() { + let reference_pairs: Vec<(Arc, Option<&str>)> = vec![ + (Arc::new(Int8Array::from(vec![None])), None), + (Arc::new(Int8Array::from(vec![1])), Some("1")), + (Arc::new(Int16Array::from(vec![1])), Some("1")), + (Arc::new(Int32Array::from(vec![1])), Some("1")), + (Arc::new(Int64Array::from(vec![1])), Some("1")), + (Arc::new(UInt8Array::from(vec![1])), Some("1")), + (Arc::new(UInt16Array::from(vec![1])), Some("1")), + (Arc::new(UInt32Array::from(vec![1])), Some("1")), + (Arc::new(UInt64Array::from(vec![1])), Some("1")), + (Arc::new(UInt8Array::from(vec![1])), Some("1")), + (Arc::new(StringArray::from(vec!["1"])), Some("1")), + (Arc::new(BooleanArray::from(vec![true])), Some("true")), + (Arc::new(BooleanArray::from(vec![false])), Some("false")), + (Arc::new(Date32Array::from(vec![1])), Some("1970-01-02")), + ( + Arc::new(Date64Array::from(vec![86400000])), + Some("1970-01-02"), + ), + ( + Arc::new(TimestampSecondArray::from(vec![1])), + Some("1970-01-01 00:00:01"), + ), + ( + Arc::new(TimestampMillisecondArray::from(vec![1000])), + Some("1970-01-01 00:00:01"), + ), + ( + Arc::new(TimestampMicrosecondArray::from(vec![1000000])), + Some("1970-01-01 00:00:01"), + ), + ( + Arc::new(TimestampNanosecondArray::from(vec![1000000000])), + Some("1970-01-01 00:00:01"), + ), + (Arc::new(BinaryArray::from_vec(vec![b"1"])), Some("1")), + ( + Arc::new(BinaryArray::from_vec(vec![b"\x00\\"])), + Some("\\x00\\\\"), + ), + (Arc::new(LargeBinaryArray::from_vec(vec![b"1"])), Some("1")), + ( + Arc::new(LargeBinaryArray::from_vec(vec![b"\x00\\"])), + Some("\\x00\\\\"), + ), + ]; + for (vals, result) in reference_pairs { + assert_eq!( + stringified_partition_value(&vals).unwrap().as_deref(), + result + ) + } + } +}