Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Set timezone for group column timestamp type #13535

Merged
merged 1 commit into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ pub(crate) fn new_group_values(
TimeUnit::Nanosecond => downcast_helper!(Time64NanosecondType, d),
_ => {}
},
DataType::Timestamp(t, _) => match t {
DataType::Timestamp(t, _tz) => match t {
TimeUnit::Second => downcast_helper!(TimestampSecondType, d),
TimeUnit::Millisecond => downcast_helper!(TimestampMillisecondType, d),
TimeUnit::Microsecond => downcast_helper!(TimestampMicrosecondType, d),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -880,12 +880,12 @@ impl<const STREAMING: bool> GroupValuesColumn<STREAMING> {
/// `$t`: the primitive type of the builder
///
macro_rules! instantiate_primitive {
($v:expr, $nullable:expr, $t:ty) => {
($v:expr, $nullable:expr, $t:ty, $data_type:ident) => {
if $nullable {
let b = PrimitiveGroupValueBuilder::<$t, true>::new();
let b = PrimitiveGroupValueBuilder::<$t, true>::new($data_type.to_owned());
$v.push(Box::new(b) as _)
} else {
let b = PrimitiveGroupValueBuilder::<$t, false>::new();
let b = PrimitiveGroupValueBuilder::<$t, false>::new($data_type.to_owned());
$v.push(Box::new(b) as _)
}
};
Expand All @@ -898,53 +898,114 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {

for f in self.schema.fields().iter() {
let nullable = f.is_nullable();
match f.data_type() {
&DataType::Int8 => instantiate_primitive!(v, nullable, Int8Type),
&DataType::Int16 => instantiate_primitive!(v, nullable, Int16Type),
&DataType::Int32 => instantiate_primitive!(v, nullable, Int32Type),
&DataType::Int64 => instantiate_primitive!(v, nullable, Int64Type),
&DataType::UInt8 => instantiate_primitive!(v, nullable, UInt8Type),
&DataType::UInt16 => instantiate_primitive!(v, nullable, UInt16Type),
&DataType::UInt32 => instantiate_primitive!(v, nullable, UInt32Type),
&DataType::UInt64 => instantiate_primitive!(v, nullable, UInt64Type),
let data_type = f.data_type();
match data_type {
&DataType::Int8 => {
instantiate_primitive!(v, nullable, Int8Type, data_type)
}
&DataType::Int16 => {
instantiate_primitive!(v, nullable, Int16Type, data_type)
}
&DataType::Int32 => {
instantiate_primitive!(v, nullable, Int32Type, data_type)
}
&DataType::Int64 => {
instantiate_primitive!(v, nullable, Int64Type, data_type)
}
&DataType::UInt8 => {
instantiate_primitive!(v, nullable, UInt8Type, data_type)
}
&DataType::UInt16 => {
instantiate_primitive!(v, nullable, UInt16Type, data_type)
}
&DataType::UInt32 => {
instantiate_primitive!(v, nullable, UInt32Type, data_type)
}
&DataType::UInt64 => {
instantiate_primitive!(v, nullable, UInt64Type, data_type)
}
&DataType::Float32 => {
instantiate_primitive!(v, nullable, Float32Type)
instantiate_primitive!(v, nullable, Float32Type, data_type)
}
&DataType::Float64 => {
instantiate_primitive!(v, nullable, Float64Type)
instantiate_primitive!(v, nullable, Float64Type, data_type)
}
&DataType::Date32 => {
instantiate_primitive!(v, nullable, Date32Type, data_type)
}
&DataType::Date64 => {
instantiate_primitive!(v, nullable, Date64Type, data_type)
}
&DataType::Date32 => instantiate_primitive!(v, nullable, Date32Type),
&DataType::Date64 => instantiate_primitive!(v, nullable, Date64Type),
&DataType::Time32(t) => match t {
TimeUnit::Second => {
instantiate_primitive!(v, nullable, Time32SecondType)
instantiate_primitive!(
v,
nullable,
Time32SecondType,
data_type
)
}
TimeUnit::Millisecond => {
instantiate_primitive!(v, nullable, Time32MillisecondType)
instantiate_primitive!(
v,
nullable,
Time32MillisecondType,
data_type
)
}
_ => {}
},
&DataType::Time64(t) => match t {
TimeUnit::Microsecond => {
instantiate_primitive!(v, nullable, Time64MicrosecondType)
instantiate_primitive!(
v,
nullable,
Time64MicrosecondType,
data_type
)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(v, nullable, Time64NanosecondType)
instantiate_primitive!(
v,
nullable,
Time64NanosecondType,
data_type
)
}
_ => {}
},
&DataType::Timestamp(t, _) => match t {
TimeUnit::Second => {
instantiate_primitive!(v, nullable, TimestampSecondType)
instantiate_primitive!(
v,
nullable,
TimestampSecondType,
data_type
)
}
TimeUnit::Millisecond => {
instantiate_primitive!(v, nullable, TimestampMillisecondType)
instantiate_primitive!(
v,
nullable,
TimestampMillisecondType,
data_type
)
}
TimeUnit::Microsecond => {
instantiate_primitive!(v, nullable, TimestampMicrosecondType)
instantiate_primitive!(
v,
nullable,
TimestampMicrosecondType,
data_type
)
}
TimeUnit::Nanosecond => {
instantiate_primitive!(v, nullable, TimestampNanosecondType)
instantiate_primitive!(
v,
nullable,
TimestampNanosecondType,
data_type
)
}
},
&DataType::Utf8 => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use crate::aggregates::group_values::null_builder::MaybeNullBufferBuilder;
use arrow::buffer::ScalarBuffer;
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_schema::DataType;
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use itertools::izip;
use std::iter;
Expand All @@ -35,6 +36,7 @@ use std::sync::Arc;
/// `NULLABLE`: if the data can contain any nulls
#[derive(Debug)]
pub struct PrimitiveGroupValueBuilder<T: ArrowPrimitiveType, const NULLABLE: bool> {
data_type: DataType,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I double checked and the single group by column implementation does the same thing

👍

group_values: Vec<T::Native>,
nulls: MaybeNullBufferBuilder,
}
Expand All @@ -44,8 +46,9 @@ where
T: ArrowPrimitiveType,
{
/// Create a new `PrimitiveGroupValueBuilder`
pub fn new() -> Self {
pub fn new(data_type: DataType) -> Self {
Self {
data_type,
group_values: vec![],
nulls: MaybeNullBufferBuilder::new(),
}
Expand Down Expand Up @@ -177,6 +180,7 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn

fn build(self: Box<Self>) -> ArrayRef {
let Self {
data_type,
group_values,
nulls,
} = *self;
Expand All @@ -186,10 +190,9 @@ impl<T: ArrowPrimitiveType, const NULLABLE: bool> GroupColumn
assert!(nulls.is_none(), "unexpected nulls in non nullable input");
}

Arc::new(PrimitiveArray::<T>::new(
ScalarBuffer::from(group_values),
nulls,
))
let arr = PrimitiveArray::<T>::new(ScalarBuffer::from(group_values), nulls);
// Set timezone information for timestamp
Arc::new(arr.with_data_type(data_type))
}

fn take_n(&mut self, n: usize) -> ArrayRef {
Expand All @@ -212,6 +215,7 @@ mod tests {
use arrow::datatypes::Int64Type;
use arrow_array::{ArrayRef, Int64Array};
use arrow_buffer::{BooleanBufferBuilder, NullBuffer};
use arrow_schema::DataType;

use super::GroupColumn;

Expand Down Expand Up @@ -283,7 +287,8 @@ mod tests {
// - exist not null, input not null; values equal

// Define PrimitiveGroupValueBuilder
let mut builder = PrimitiveGroupValueBuilder::<Int64Type, true>::new();
let mut builder =
PrimitiveGroupValueBuilder::<Int64Type, true>::new(DataType::Int64);
let builder_array = Arc::new(Int64Array::from(vec![
None,
None,
Expand Down Expand Up @@ -392,7 +397,8 @@ mod tests {
// - values not equal

// Define PrimitiveGroupValueBuilder
let mut builder = PrimitiveGroupValueBuilder::<Int64Type, false>::new();
let mut builder =
PrimitiveGroupValueBuilder::<Int64Type, false>::new(DataType::Int64);
let builder_array =
Arc::new(Int64Array::from(vec![Some(0), Some(1)])) as ArrayRef;
append(&mut builder, &builder_array, &[0, 1]);
Expand All @@ -419,7 +425,8 @@ mod tests {
// Test the special `all nulls` or `not nulls` input array case
// for vectorized append and equal to

let mut builder = PrimitiveGroupValueBuilder::<Int64Type, true>::new();
let mut builder =
PrimitiveGroupValueBuilder::<Int64Type, true>::new(DataType::Int64);

// All nulls input array
let all_nulls_input_array = Arc::new(Int64Array::from(vec![
Expand Down
16 changes: 16 additions & 0 deletions datafusion/sqllogictest/test_files/group_by.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5483,3 +5483,19 @@ SELECT max(input_table.x), min(input_table.x) from input_table GROUP BY input_ta
----
NaN NaN

# Group by timestamp
query TP
SELECT
'foo' AS text,
arrow_cast('2024-01-01T00:00:00Z'::timestamptz, 'Timestamp(Microsecond, Some("UTC"))') AS ts
GROUP BY ts, text
----
foo 2024-01-01T00:00:00Z

query TP
SELECT
'foo' AS text,
arrow_cast('2024-01-01T00:00:00Z'::timestamptz, 'Timestamp(Second, Some("+08:00"))') AS ts
GROUP BY ts, text
----
foo 2024-01-01T08:00:00+08:00