Skip to content

Commit

Permalink
add boolean, date, timestamp & binary partition types (#1180)
Browse files Browse the repository at this point in the history
# Description
Adds boolean, date, timestamp & binary partition value types

# Related Issue(s)
closes #1170

---------

Signed-off-by: Marijn Valk <marijncv@hotmail.com>
Co-authored-by: Will Jones <willjones127@gmail.com>
  • Loading branch information
marijncv and wjones127 authored Mar 6, 2023
1 parent 01896f5 commit bb9f040
Showing 1 changed file with 121 additions and 7 deletions.
128 changes: 121 additions & 7 deletions rust/src/writer/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ use std::sync::Arc;
use crate::writer::DeltaWriterError;
use crate::DeltaTableError;

use arrow::array::{as_primitive_array, Array};
use arrow::array::{
as_boolean_array, as_generic_binary_array, as_primitive_array, as_string_array, Array,
};
use arrow::datatypes::{
DataType, Int16Type, Int32Type, Int64Type, Int8Type, Schema as ArrowSchema,
SchemaRef as ArrowSchemaRef, UInt16Type, UInt32Type, UInt64Type, UInt8Type,
DataType, Date32Type, Date64Type, Int16Type, Int32Type, Int64Type, Int8Type,
Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, TimeUnit, TimestampMicrosecondType,
TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt16Type, UInt32Type,
UInt64Type, UInt8Type,
};
use arrow::json::reader::{Decoder, DecoderOptions};
use arrow::record_batch::*;
Expand All @@ -20,6 +24,8 @@ use serde_json::Value;
use uuid::Uuid;

const NULL_PARTITION_VALUE_DATA_PATH: &str = "__HIVE_DEFAULT_PARTITION__";
const PARTITION_DATE_FORMAT: &str = "%Y-%m-%d";
const PARTITION_DATETIME_FORMAT: &str = "%Y-%m-%d %H:%M:%S";

#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub(crate) struct PartitionPath {
Expand Down Expand Up @@ -137,11 +143,52 @@ pub(crate) fn stringified_partition_value(
DataType::UInt16 => as_primitive_array::<UInt16Type>(arr).value(0).to_string(),
DataType::UInt32 => as_primitive_array::<UInt32Type>(arr).value(0).to_string(),
DataType::UInt64 => as_primitive_array::<UInt64Type>(arr).value(0).to_string(),
DataType::Utf8 => {
let data = arrow::array::as_string_array(arr);

data.value(0).to_string()
DataType::Utf8 => as_string_array(arr).value(0).to_string(),
DataType::Boolean => as_boolean_array(arr).value(0).to_string(),
DataType::Date32 => as_primitive_array::<Date32Type>(arr)
.value_as_date(0)
.unwrap()
.format(PARTITION_DATE_FORMAT)
.to_string(),
DataType::Date64 => as_primitive_array::<Date64Type>(arr)
.value_as_date(0)
.unwrap()
.format(PARTITION_DATE_FORMAT)
.to_string(),
DataType::Timestamp(TimeUnit::Second, _) => as_primitive_array::<TimestampSecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string(),
DataType::Timestamp(TimeUnit::Millisecond, _) => {
as_primitive_array::<TimestampMillisecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Timestamp(TimeUnit::Microsecond, _) => {
as_primitive_array::<TimestampMicrosecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Timestamp(TimeUnit::Nanosecond, _) => {
as_primitive_array::<TimestampNanosecondType>(arr)
.value_as_datetime(0)
.unwrap()
.format(PARTITION_DATETIME_FORMAT)
.to_string()
}
DataType::Binary => as_generic_binary_array::<i32>(arr)
.value(0)
.escape_ascii()
.to_string(),
DataType::LargeBinary => as_generic_binary_array::<i64>(arr)
.value(0)
.escape_ascii()
.to_string(),
// TODO: handle more types
_ => {
unimplemented!("Unimplemented data type: {:?}", data_type);
Expand Down Expand Up @@ -235,3 +282,70 @@ impl Write for ShareableBuffer {
(*inner).flush()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::{
BinaryArray, BooleanArray, Date32Array, Date64Array, Int16Array, Int32Array, Int64Array,
Int8Array, LargeBinaryArray, StringArray, TimestampMicrosecondArray,
TimestampMillisecondArray, TimestampNanosecondArray, TimestampSecondArray, UInt16Array,
UInt32Array, UInt64Array, UInt8Array,
};

#[test]
fn test_stringified_partition_value() {
let reference_pairs: Vec<(Arc<dyn Array>, Option<&str>)> = vec![
(Arc::new(Int8Array::from(vec![None])), None),
(Arc::new(Int8Array::from(vec![1])), Some("1")),
(Arc::new(Int16Array::from(vec![1])), Some("1")),
(Arc::new(Int32Array::from(vec![1])), Some("1")),
(Arc::new(Int64Array::from(vec![1])), Some("1")),
(Arc::new(UInt8Array::from(vec![1])), Some("1")),
(Arc::new(UInt16Array::from(vec![1])), Some("1")),
(Arc::new(UInt32Array::from(vec![1])), Some("1")),
(Arc::new(UInt64Array::from(vec![1])), Some("1")),
(Arc::new(UInt8Array::from(vec![1])), Some("1")),
(Arc::new(StringArray::from(vec!["1"])), Some("1")),
(Arc::new(BooleanArray::from(vec![true])), Some("true")),
(Arc::new(BooleanArray::from(vec![false])), Some("false")),
(Arc::new(Date32Array::from(vec![1])), Some("1970-01-02")),
(
Arc::new(Date64Array::from(vec![86400000])),
Some("1970-01-02"),
),
(
Arc::new(TimestampSecondArray::from(vec![1])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampMillisecondArray::from(vec![1000])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampMicrosecondArray::from(vec![1000000])),
Some("1970-01-01 00:00:01"),
),
(
Arc::new(TimestampNanosecondArray::from(vec![1000000000])),
Some("1970-01-01 00:00:01"),
),
(Arc::new(BinaryArray::from_vec(vec![b"1"])), Some("1")),
(
Arc::new(BinaryArray::from_vec(vec![b"\x00\\"])),
Some("\\x00\\\\"),
),
(Arc::new(LargeBinaryArray::from_vec(vec![b"1"])), Some("1")),
(
Arc::new(LargeBinaryArray::from_vec(vec![b"\x00\\"])),
Some("\\x00\\\\"),
),
];
for (vals, result) in reference_pairs {
assert_eq!(
stringified_partition_value(&vals).unwrap().as_deref(),
result
)
}
}
}

0 comments on commit bb9f040

Please sign in to comment.