From 135ffd8d4dcb7e1a032ff36175442289a5ba7ebc Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Tue, 24 Jun 2025 18:42:09 +0200 Subject: [PATCH 1/4] init --- .../functions-table/src/generate_series.rs | 424 ++++++++++++++---- .../test_files/table_functions.slt | 86 +++- 2 files changed, 431 insertions(+), 79 deletions(-) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index c875874c569d..9b8e50828d0b 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -15,8 +15,12 @@ // specific language governing permissions and limitations // under the License. -use arrow::array::Int64Array; -use arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use arrow::array::timezone::Tz; +use arrow::array::types::TimestampNanosecondType; +use arrow::array::{Int64Array, TimestampNanosecondArray}; +use arrow::datatypes::{ + DataType, Field, IntervalMonthDayNano, Schema, SchemaRef, TimeUnit, +}; use arrow::record_batch::RecordBatch; use async_trait::async_trait; use datafusion_catalog::Session; @@ -28,96 +32,228 @@ use datafusion_physical_plan::memory::{LazyBatchGenerator, LazyMemoryExec}; use datafusion_physical_plan::ExecutionPlan; use parking_lot::RwLock; use std::fmt; +use std::str::FromStr; use std::sync::Arc; /// Indicates the arguments used for generating a series. #[derive(Debug, Clone)] enum GenSeriesArgs { /// ContainsNull signifies that at least one argument(start, end, step) was null, thus no series will be generated. - ContainsNull { + ContainsNull { name: &'static str }, + /// Int64Args holds the start, end, and step values for generating integer series when all arguments are not null. + Int64Args { + start: i64, + end: i64, + step: i64, + /// Indicates whether the end value should be included in the series. include_end: bool, name: &'static str, }, - /// AllNotNullArgs holds the start, end, and step values for generating the series when all arguments are not null. - AllNotNullArgs { + /// TimestampArgs holds the start, end, and step values for generating timestamp series when all arguments are not null. + TimestampArgs { start: i64, end: i64, - step: i64, + step: IntervalMonthDayNano, + tz: Option>, /// Indicates whether the end value should be included in the series. include_end: bool, name: &'static str, }, } -/// Table that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step +/// Table that generates a series of integers/timestamps from `start`(inclusive) to `end`, incrementing by step #[derive(Debug, Clone)] struct GenerateSeriesTable { schema: SchemaRef, args: GenSeriesArgs, } -/// Table state that generates a series of integers from `start`(inclusive) to `end`(inclusive), incrementing by step +/// Table state that generates a series of values from `start`(inclusive) to `end`, incrementing by step #[derive(Debug, Clone)] -struct GenerateSeriesState { - schema: SchemaRef, - start: i64, // Kept for display - end: i64, - step: i64, - batch_size: usize, - - /// Tracks current position when generating table - current: i64, - /// Indicates whether the end value should be included in the series. - include_end: bool, - name: &'static str, -} - -impl GenerateSeriesState { - fn reach_end(&self, val: i64) -> bool { - if self.step > 0 { - if self.include_end { - return val > self.end; - } else { - return val >= self.end; - } - } - - if self.include_end { - val < self.end - } else { - val <= self.end - } - } +enum GenerateSeriesState { + Int64 { + schema: SchemaRef, + start: i64, // Kept for display + end: i64, + step: i64, + batch_size: usize, + /// Tracks current position when generating table + current: i64, + /// Indicates whether the end value should be included in the series. + include_end: bool, + name: &'static str, + }, + Timestamp { + schema: SchemaRef, + start: i64, + end: i64, + step: IntervalMonthDayNano, + tz: Option>, + parsed_tz: Tz, + batch_size: usize, + /// Tracks current position when generating table + current: i64, + /// Indicates whether the end value should be included in the series. + include_end: bool, + name: &'static str, + }, + Empty { + batch_size: usize, + name: &'static str, + }, } /// Detail to display for 'Explain' plan impl fmt::Display for GenerateSeriesState { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!( - f, - "{}: start={}, end={}, batch_size={}", - self.name, self.start, self.end, self.batch_size - ) + match self { + GenerateSeriesState::Int64 { + name, + start, + end, + batch_size, + .. + } + | GenerateSeriesState::Timestamp { + name, + start, + end, + batch_size, + .. + } => { + write!( + f, + "{name}: start={start}, end={end}, batch_size={batch_size}" + ) + } + GenerateSeriesState::Empty { + name, batch_size, .. + } => { + write!(f, "{name}: empty, batch_size={batch_size}") + } + } } } impl LazyBatchGenerator for GenerateSeriesState { fn generate_next_batch(&mut self) -> Result> { - let mut buf = Vec::with_capacity(self.batch_size); - while buf.len() < self.batch_size && !self.reach_end(self.current) { - buf.push(self.current); - self.current += self.step; - } - let array = Int64Array::from(buf); + match self { + GenerateSeriesState::Int64 { + schema, + end, + step, + batch_size, + current, + include_end, + .. + } => { + let mut buf = Vec::with_capacity(*batch_size); + let end_val = *end; + let step_val = *step; + let include_end_val = *include_end; - if array.is_empty() { - return Ok(None); - } + while buf.len() < *batch_size + && !reach_end_int64(*current, end_val, step_val, include_end_val) + { + buf.push(*current); + *current += step_val; + } + let array = Int64Array::from(buf); + + if array.is_empty() { + return Ok(None); + } + + let batch = + RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(array)])?; + Ok(Some(batch)) + } + GenerateSeriesState::Timestamp { + schema, + end, + step, + tz, + parsed_tz, + batch_size, + current, + include_end, + start: _, + name: _, + } => { + let mut buf = Vec::with_capacity(*batch_size); + let step_val = *step; + let include_end_val = *include_end; + let step_negative = + step_val.months < 0 || step_val.days < 0 || step_val.nanoseconds < 0; + + while buf.len() < *batch_size { + let should_stop = if include_end_val { + if step_negative { + current < end + } else { + current > end + } + } else if step_negative { + current <= end + } else { + current >= end + }; + + if should_stop { + break; + } + + // Store current value before advancing + let current_value = *current; + + // Add interval using proper calendar arithmetic for next iteration + let Some(next_ts) = TimestampNanosecondType::add_month_day_nano( + *current, step_val, *parsed_tz, + ) else { + return plan_err!( + "Failed to add interval {:?} to timestamp {}", + step_val, + current_value + ); + }; + + *current = next_ts; + + // Push the current value after successfully advancing + buf.push(current_value); + } + + let array = TimestampNanosecondArray::from(buf); + // Create array with proper timezone + let array = match tz { + Some(tz_str) => array.with_timezone(Arc::clone(tz_str)), + None => array, + }; + + if array.is_empty() { + return Ok(None); + } - let batch = - RecordBatch::try_new(Arc::clone(&self.schema), vec![Arc::new(array)])?; + let batch = + RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(array)])?; + Ok(Some(batch)) + } + GenerateSeriesState::Empty { .. } => Ok(None), + } + } +} - Ok(Some(batch)) +fn reach_end_int64(val: i64, end: i64, step: i64, include_end: bool) -> bool { + if step > 0 { + if include_end { + val > end + } else { + val >= end + } + } else if include_end { + val < end + } else { + val <= end } } @@ -147,39 +283,63 @@ impl TableProvider for GenerateSeriesTable { Some(projection) => Arc::new(self.schema.project(projection)?), None => self.schema(), }; - let state = match self.args { + let series_state = match &self.args { // if args have null, then return 0 row - GenSeriesArgs::ContainsNull { include_end, name } => GenerateSeriesState { - schema: self.schema(), - start: 0, - end: 0, - step: 1, - current: 1, - batch_size, - include_end, - name, - }, - GenSeriesArgs::AllNotNullArgs { + GenSeriesArgs::ContainsNull { name } => { + GenerateSeriesState::Empty { batch_size, name } + } + GenSeriesArgs::Int64Args { start, end, step, include_end, name, - } => GenerateSeriesState { + } => GenerateSeriesState::Int64 { schema: self.schema(), + start: *start, + end: *end, + step: *step, + current: *start, + batch_size, + include_end: *include_end, + name, + }, + GenSeriesArgs::TimestampArgs { start, end, step, - current: start, - batch_size, + tz, include_end, name, - }, + } => { + let parsed_tz = tz + .as_ref() + .map(|s| Tz::from_str(s.as_ref())) + .transpose() + .map_err(|e| { + datafusion_common::DataFusionError::Internal(format!( + "Failed to parse timezone: {e}" + )) + })? + .unwrap_or_else(|| Tz::from_str("+00:00").unwrap()); + GenerateSeriesState::Timestamp { + schema: self.schema(), + start: *start, + end: *end, + step: *step, + tz: tz.clone(), + parsed_tz, + current: *start, + batch_size, + include_end: *include_end, + name, + } + } }; Ok(Arc::new(LazyMemoryExec::try_new( schema, - vec![Arc::new(RwLock::new(state))], + vec![Arc::new(RwLock::new(series_state))], )?)) } } @@ -196,6 +356,29 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { return plan_err!("{} function requires 1 to 3 arguments", self.name); } + // Determine the data type from the first argument + match &exprs[0] { + Expr::Literal( + // Default to int64 for null + ScalarValue::Null | ScalarValue::Int64(_), + _, + ) => self.call_int64(exprs), + Expr::Literal(s, _) if matches!(s.data_type(), DataType::Timestamp(_, _)) => { + self.call_timestamp(exprs) + } + Expr::Literal(scalar, _) => { + plan_err!( + "Argument #1 must be an INTEGER, TIMESTAMP or NULL, got {:?}", + scalar.data_type() + ) + } + _ => plan_err!("Arguments must be literals"), + } + } +} + +impl GenerateSeriesFuncImpl { + fn call_int64(&self, exprs: &[Expr]) -> Result> { let mut normalize_args = Vec::new(); for (expr_index, expr) in exprs.iter().enumerate() { match expr { @@ -221,10 +404,7 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { // contain null return Ok(Arc::new(GenerateSeriesTable { schema, - args: GenSeriesArgs::ContainsNull { - include_end: self.include_end, - name: self.name, - }, + args: GenSeriesArgs::ContainsNull { name: self.name }, })); } @@ -251,10 +431,100 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { Ok(Arc::new(GenerateSeriesTable { schema, - args: GenSeriesArgs::AllNotNullArgs { + args: GenSeriesArgs::Int64Args { + start, + end, + step, + include_end: self.include_end, + name: self.name, + }, + })) + } + + fn call_timestamp(&self, exprs: &[Expr]) -> Result> { + if exprs.len() != 3 { + return plan_err!( + "{} function with timestamps requires exactly 3 arguments", + self.name + ); + } + + // Parse start timestamp + let (start_ts, tz) = match &exprs[0] { + Expr::Literal(ScalarValue::TimestampNanosecond(ts, tz), _) => { + (*ts, tz.clone()) + } + other => { + return plan_err!( + "First argument must be a timestamp or NULL, got {:?}", + other + ) + } + }; + + // Parse end timestamp + let end_ts = match &exprs[1] { + Expr::Literal(ScalarValue::Null, _) => None, + Expr::Literal(ScalarValue::TimestampNanosecond(ts, _), _) => *ts, + other => { + return plan_err!( + "Second argument must be a timestamp or NULL, got {:?}", + other + ) + } + }; + + // Parse step interval + let step_interval = match &exprs[2] { + Expr::Literal(ScalarValue::Null, _) => None, + Expr::Literal(ScalarValue::IntervalMonthDayNano(interval), _) => *interval, + other => { + return plan_err!( + "Third argument must be an interval or NULL, got {:?}", + other + ) + } + }; + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Timestamp(TimeUnit::Nanosecond, tz.clone()), + false, + )])); + + // Check if any argument is null + let (Some(start), Some(end), Some(step)) = (start_ts, end_ts, step_interval) + else { + return Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::ContainsNull { name: self.name }, + })); + }; + + // Basic validation + if step.months == 0 && step.days == 0 && step.nanoseconds == 0 { + return plan_err!("Step interval cannot be zero"); + } + + // Check for infinite series conditions with timestamps + let step_is_positive = step.months > 0 || step.days > 0 || step.nanoseconds > 0; + let step_is_negative = step.months < 0 || step.days < 0 || step.nanoseconds < 0; + + if start > end && step_is_positive { + return plan_err!("Start is bigger than end, but increment is positive: Cannot generate infinite series"); + } + + if start < end && step_is_negative { + return plan_err!("Start is smaller than end, but increment is negative: Cannot generate infinite series"); + } + + Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::TimestampArgs { start, end, step, + tz, include_end: self.include_end, name: self.name, }, diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index ad33d9dd0acf..b8a3d5cc9aa8 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -177,7 +177,7 @@ statement error DataFusion error: Error during planning: generate_series functio SELECT * FROM generate_series(1, 2, 3, 4) -statement error DataFusion error: Error during planning: Argument #1 must be an INTEGER or NULL, got Literal\(Utf8\("foo"\), None\) +statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP or NULL, got Utf8 SELECT * FROM generate_series('foo', 'bar') # UDF and UDTF `generate_series` can be used simultaneously @@ -300,7 +300,7 @@ statement error DataFusion error: Error during planning: range function requires SELECT * FROM range(1, 2, 3, 4) -statement error DataFusion error: Error during planning: Argument #1 must be an INTEGER or NULL, got Literal\(Utf8\("foo"\), None\) +statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP or NULL, got Utf8 SELECT * FROM range('foo', 'bar') statement error DataFusion error: Error during planning: Argument #2 must be an INTEGER or NULL, got Literal\(Utf8\("bar"\), None\) @@ -312,3 +312,85 @@ SELECT range(1, t1.end) FROM range(3, 5) as t1(end) ---- [1, 2, 3] [1, 2] + +# +# Test timestamp ranges +# + +# Basic timestamp range with 1 day interval +query P rowsort +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-04T00:00:00', INTERVAL '1' DAY) +---- +2023-01-01T00:00:00 +2023-01-02T00:00:00 +2023-01-03T00:00:00 + +# Timestamp range with hour interval +query P rowsort +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-01T03:00:00', INTERVAL '1' HOUR) +---- +2023-01-01T00:00:00 +2023-01-01T01:00:00 +2023-01-01T02:00:00 + +# Timestamp range with month interval +query P rowsort +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-04-01T00:00:00', INTERVAL '1' MONTH) +---- +2023-01-01T00:00:00 +2023-02-01T00:00:00 +2023-03-01T00:00:00 + +# Timestamp generate_series (includes end) +query P rowsort +SELECT * FROM generate_series(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-03T00:00:00', INTERVAL '1' DAY) +---- +2023-01-01T00:00:00 +2023-01-02T00:00:00 +2023-01-03T00:00:00 + +# Timestamp range with timezone +query P +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00+00:00', TIMESTAMP '2023-01-03T00:00:00+00:00', INTERVAL '1' DAY) +---- +2023-01-01T00:00:00 +2023-01-02T00:00:00 + +# Negative timestamp range (going backwards) +query P +SELECT * FROM range(TIMESTAMP '2023-01-03T00:00:00', TIMESTAMP '2023-01-01T00:00:00', INTERVAL '-1' DAY) +---- +2023-01-03T00:00:00 +2023-01-02T00:00:00 + +query error DataFusion error: Error during planning: Start is bigger than end, but increment is positive: Cannot generate infinite series +SELECT * FROM range(TIMESTAMP '2023-01-03T00:00:00', TIMESTAMP '2023-01-01T00:00:00', INTERVAL '1' DAY) + +query error DataFusion error: Error during planning: Start is smaller than end, but increment is negative: Cannot generate infinite series +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-02T00:00:00', INTERVAL '-1' DAY) + +query error DataFusion error: Error during planning: range function with timestamps requires exactly 3 arguments +SELECT * FROM range(TIMESTAMP '2023-01-03T00:00:00', TIMESTAMP '2023-01-01T00:00:00') + +# Single timestamp (start == end) +query P +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-01T00:00:00', INTERVAL '1' DAY) +---- + +# Timestamp range with NULL values +query P +SELECT * FROM range(NULL::TIMESTAMP, TIMESTAMP '2023-01-03T00:00:00', INTERVAL '1' DAY) +---- + +query P +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', NULL::TIMESTAMP, INTERVAL '1' DAY) +---- + +# No interval gives no rows +query P +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-03T00:00:00', NULL::INTERVAL) +---- + +# Zero-length interval gives error +query error DataFusion error: Error during planning: Step interval cannot be zero +SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-03T00:00:00', INTERVAL '0' DAY) From 30950f641b638e7266874a0cbaf92e7af475cd3e Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Wed, 25 Jun 2025 09:11:35 +0200 Subject: [PATCH 2/4] trait based --- .../functions-table/src/generate_series.rs | 374 +++++++++--------- .../test_files/table_functions.slt | 6 + 2 files changed, 196 insertions(+), 184 deletions(-) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index 9b8e50828d0b..e96c79565117 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -17,7 +17,7 @@ use arrow::array::timezone::Tz; use arrow::array::types::TimestampNanosecondType; -use arrow::array::{Int64Array, TimestampNanosecondArray}; +use arrow::array::{ArrayRef, Int64Array, TimestampNanosecondArray}; use arrow::datatypes::{ DataType, Field, IntervalMonthDayNano, Schema, SchemaRef, TimeUnit, }; @@ -35,6 +35,135 @@ use std::fmt; use std::str::FromStr; use std::sync::Arc; +/// Empty generator that produces no rows - used when series arguments contain null values +#[derive(Debug, Clone)] +struct Empty { + name: &'static str, +} + +impl LazyBatchGenerator for Empty { + fn generate_next_batch(&mut self) -> Result> { + Ok(None) + } +} + +impl fmt::Display for Empty { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "{}: empty", self.name) + } +} + +/// Trait for values that can be generated in a series +trait SeriesValue: fmt::Debug + Clone + Send + Sync + 'static { + type StepType: fmt::Debug + Clone + Send + Sync; + type ValueType: fmt::Debug + Clone + Send + Sync; + + /// Check if we've reached the end of the series + fn should_stop(&self, end: Self, step: &Self::StepType, include_end: bool) -> bool; + + /// Advance to the next value in the series + fn advance(&mut self, step: &Self::StepType) -> Result<()>; + + /// Create an Arrow array from a vector of values + fn create_array(&self, values: Vec) -> Result; + + /// Convert self to ValueType for array creation + fn to_value_type(&self) -> Self::ValueType; + + /// Display the value for debugging + fn display_value(&self) -> String; +} + +impl SeriesValue for i64 { + type StepType = i64; + type ValueType = i64; + + fn should_stop(&self, end: Self, step: &Self::StepType, include_end: bool) -> bool { + reach_end_int64(*self, end, *step, include_end) + } + + fn advance(&mut self, step: &Self::StepType) -> Result<()> { + *self += step; + Ok(()) + } + + fn create_array(&self, values: Vec) -> Result { + Ok(Arc::new(Int64Array::from(values))) + } + + fn to_value_type(&self) -> Self::ValueType { + *self + } + + fn display_value(&self) -> String { + self.to_string() + } +} + +#[derive(Debug, Clone)] +struct TimestampValue { + value: i64, + parsed_tz: Tz, + tz_str: Option>, +} + +impl SeriesValue for TimestampValue { + type StepType = IntervalMonthDayNano; + type ValueType = i64; + + fn should_stop(&self, end: Self, step: &Self::StepType, include_end: bool) -> bool { + let step_negative = step.months < 0 || step.days < 0 || step.nanoseconds < 0; + + if include_end { + if step_negative { + self.value < end.value + } else { + self.value > end.value + } + } else if step_negative { + self.value <= end.value + } else { + self.value >= end.value + } + } + + fn advance(&mut self, step: &Self::StepType) -> Result<()> { + let Some(next_ts) = TimestampNanosecondType::add_month_day_nano( + self.value, + *step, + self.parsed_tz, + ) else { + return plan_err!( + "Failed to add interval {:?} to timestamp {}", + step, + self.value + ); + }; + self.value = next_ts; + Ok(()) + } + + fn create_array(&self, values: Vec) -> Result { + let array = TimestampNanosecondArray::from(values); + + // Use timezone from self (now we have access to tz through &self) + let array = match self.tz_str.as_ref() { + Some(tz_str) => array.with_timezone(Arc::clone(tz_str)), + None => array, + }; + + Ok(Arc::new(array)) + } + + fn to_value_type(&self) -> Self::ValueType { + self.value + } + + fn display_value(&self) -> String { + self.value.to_string() + } +} + /// Indicates the arguments used for generating a series. #[derive(Debug, Clone)] enum GenSeriesArgs { @@ -68,178 +197,51 @@ struct GenerateSeriesTable { args: GenSeriesArgs, } -/// Table state that generates a series of values from `start`(inclusive) to `end`, incrementing by step #[derive(Debug, Clone)] -enum GenerateSeriesState { - Int64 { - schema: SchemaRef, - start: i64, // Kept for display - end: i64, - step: i64, - batch_size: usize, - /// Tracks current position when generating table - current: i64, - /// Indicates whether the end value should be included in the series. - include_end: bool, - name: &'static str, - }, - Timestamp { - schema: SchemaRef, - start: i64, - end: i64, - step: IntervalMonthDayNano, - tz: Option>, - parsed_tz: Tz, - batch_size: usize, - /// Tracks current position when generating table - current: i64, - /// Indicates whether the end value should be included in the series. - include_end: bool, - name: &'static str, - }, - Empty { - batch_size: usize, - name: &'static str, - }, -} - -/// Detail to display for 'Explain' plan -impl fmt::Display for GenerateSeriesState { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - match self { - GenerateSeriesState::Int64 { - name, - start, - end, - batch_size, - .. - } - | GenerateSeriesState::Timestamp { - name, - start, - end, - batch_size, - .. - } => { - write!( - f, - "{name}: start={start}, end={end}, batch_size={batch_size}" - ) - } - GenerateSeriesState::Empty { - name, batch_size, .. - } => { - write!(f, "{name}: empty, batch_size={batch_size}") - } - } - } +struct GenericSeriesState { + schema: SchemaRef, + start: T, + end: T, + step: T::StepType, + batch_size: usize, + current: T, + include_end: bool, + name: &'static str, } -impl LazyBatchGenerator for GenerateSeriesState { +impl LazyBatchGenerator for GenericSeriesState { fn generate_next_batch(&mut self) -> Result> { - match self { - GenerateSeriesState::Int64 { - schema, - end, - step, - batch_size, - current, - include_end, - .. - } => { - let mut buf = Vec::with_capacity(*batch_size); - let end_val = *end; - let step_val = *step; - let include_end_val = *include_end; - - while buf.len() < *batch_size - && !reach_end_int64(*current, end_val, step_val, include_end_val) - { - buf.push(*current); - *current += step_val; - } - let array = Int64Array::from(buf); - - if array.is_empty() { - return Ok(None); - } - - let batch = - RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(array)])?; - Ok(Some(batch)) - } - GenerateSeriesState::Timestamp { - schema, - end, - step, - tz, - parsed_tz, - batch_size, - current, - include_end, - start: _, - name: _, - } => { - let mut buf = Vec::with_capacity(*batch_size); - let step_val = *step; - let include_end_val = *include_end; - let step_negative = - step_val.months < 0 || step_val.days < 0 || step_val.nanoseconds < 0; - - while buf.len() < *batch_size { - let should_stop = if include_end_val { - if step_negative { - current < end - } else { - current > end - } - } else if step_negative { - current <= end - } else { - current >= end - }; - - if should_stop { - break; - } - - // Store current value before advancing - let current_value = *current; - - // Add interval using proper calendar arithmetic for next iteration - let Some(next_ts) = TimestampNanosecondType::add_month_day_nano( - *current, step_val, *parsed_tz, - ) else { - return plan_err!( - "Failed to add interval {:?} to timestamp {}", - step_val, - current_value - ); - }; - - *current = next_ts; - - // Push the current value after successfully advancing - buf.push(current_value); - } + let mut buf = Vec::with_capacity(self.batch_size); + + while buf.len() < self.batch_size + && !self + .current + .should_stop(self.end.clone(), &self.step, self.include_end) + { + buf.push(self.current.to_value_type()); + self.current.advance(&self.step)?; + } - let array = TimestampNanosecondArray::from(buf); - // Create array with proper timezone - let array = match tz { - Some(tz_str) => array.with_timezone(Arc::clone(tz_str)), - None => array, - }; + if buf.is_empty() { + return Ok(None); + } - if array.is_empty() { - return Ok(None); - } + let array = self.current.create_array(buf)?; + let batch = RecordBatch::try_new(Arc::clone(&self.schema), vec![array])?; + Ok(Some(batch)) + } +} - let batch = - RecordBatch::try_new(Arc::clone(schema), vec![Arc::new(array)])?; - Ok(Some(batch)) - } - GenerateSeriesState::Empty { .. } => Ok(None), - } +impl fmt::Display for GenericSeriesState { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!( + f, + "{}: start={}, end={}, batch_size={}", + self.name, + self.start.display_value(), + self.end.display_value(), + self.batch_size + ) } } @@ -283,18 +285,15 @@ impl TableProvider for GenerateSeriesTable { Some(projection) => Arc::new(self.schema.project(projection)?), None => self.schema(), }; - let series_state = match &self.args { - // if args have null, then return 0 row - GenSeriesArgs::ContainsNull { name } => { - GenerateSeriesState::Empty { batch_size, name } - } + let generator: Arc> = match &self.args { + GenSeriesArgs::ContainsNull { name } => Arc::new(RwLock::new(Empty { name })), GenSeriesArgs::Int64Args { start, end, step, include_end, name, - } => GenerateSeriesState::Int64 { + } => Arc::new(RwLock::new(GenericSeriesState { schema: self.schema(), start: *start, end: *end, @@ -303,7 +302,7 @@ impl TableProvider for GenerateSeriesTable { batch_size, include_end: *include_end, name, - }, + })), GenSeriesArgs::TimestampArgs { start, end, @@ -322,25 +321,32 @@ impl TableProvider for GenerateSeriesTable { )) })? .unwrap_or_else(|| Tz::from_str("+00:00").unwrap()); - GenerateSeriesState::Timestamp { + Arc::new(RwLock::new(GenericSeriesState { schema: self.schema(), - start: *start, - end: *end, + start: TimestampValue { + value: *start, + parsed_tz, + tz_str: tz.clone(), + }, + end: TimestampValue { + value: *end, + parsed_tz, + tz_str: tz.clone(), + }, step: *step, - tz: tz.clone(), - parsed_tz, - current: *start, + current: TimestampValue { + value: *start, + parsed_tz, + tz_str: tz.clone(), + }, batch_size, include_end: *include_end, name, - } + })) } }; - Ok(Arc::new(LazyMemoryExec::try_new( - schema, - vec![Arc::new(RwLock::new(series_state))], - )?)) + Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?)) } } diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index b8a3d5cc9aa8..73b61e665b5c 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -394,3 +394,9 @@ SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-03T00:00 # Zero-length interval gives error query error DataFusion error: Error during planning: Step interval cannot be zero SELECT * FROM range(TIMESTAMP '2023-01-01T00:00:00', TIMESTAMP '2023-01-03T00:00:00', INTERVAL '0' DAY) + +# Timezone-aware +query P +SELECT * FROM range(TIMESTAMPTZ '2023-02-01T00:00:00-07:00', TIMESTAMPTZ '2023-02-01T09:00:00+01:00', INTERVAL '1' HOUR); +---- +2023-02-01T07:00:00Z From b23a3aaa3aba2ad530ad3ad8d39a6f4241bed09d Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Wed, 25 Jun 2025 10:50:14 +0200 Subject: [PATCH 3/4] also support date --- .../functions-table/src/generate_series.rs | 195 +++++++++++++++--- .../test_files/table_functions.slt | 74 ++++++- 2 files changed, 241 insertions(+), 28 deletions(-) diff --git a/datafusion/functions-table/src/generate_series.rs b/datafusion/functions-table/src/generate_series.rs index e96c79565117..ecd8870124ab 100644 --- a/datafusion/functions-table/src/generate_series.rs +++ b/datafusion/functions-table/src/generate_series.rs @@ -103,7 +103,7 @@ impl SeriesValue for i64 { #[derive(Debug, Clone)] struct TimestampValue { value: i64, - parsed_tz: Tz, + parsed_tz: Option, tz_str: Option>, } @@ -128,11 +128,12 @@ impl SeriesValue for TimestampValue { } fn advance(&mut self, step: &Self::StepType) -> Result<()> { - let Some(next_ts) = TimestampNanosecondType::add_month_day_nano( - self.value, - *step, - self.parsed_tz, - ) else { + let tz = self + .parsed_tz + .unwrap_or_else(|| Tz::from_str("+00:00").unwrap()); + let Some(next_ts) = + TimestampNanosecondType::add_month_day_nano(self.value, *step, tz) + else { return plan_err!( "Failed to add interval {:?} to timestamp {}", step, @@ -188,6 +189,16 @@ enum GenSeriesArgs { include_end: bool, name: &'static str, }, + /// DateArgs holds the start, end, and step values for generating date series when all arguments are not null. + /// Internally, dates are converted to timestamps and use the timestamp logic. + DateArgs { + start: i64, + end: i64, + step: IntervalMonthDayNano, + /// Indicates whether the end value should be included in the series. + include_end: bool, + name: &'static str, + }, } /// Table that generates a series of integers/timestamps from `start`(inclusive) to `end`, incrementing by step @@ -259,6 +270,29 @@ fn reach_end_int64(val: i64, end: i64, step: i64, include_end: bool) -> bool { } } +fn validate_interval_step( + step: IntervalMonthDayNano, + start: i64, + end: i64, +) -> Result<()> { + if step.months == 0 && step.days == 0 && step.nanoseconds == 0 { + return plan_err!("Step interval cannot be zero"); + } + + let step_is_positive = step.months > 0 || step.days > 0 || step.nanoseconds > 0; + let step_is_negative = step.months < 0 || step.days < 0 || step.nanoseconds < 0; + + if start > end && step_is_positive { + return plan_err!("Start is bigger than end, but increment is positive: Cannot generate infinite series"); + } + + if start < end && step_is_negative { + return plan_err!("Start is smaller than end, but increment is negative: Cannot generate infinite series"); + } + + Ok(()) +} + #[async_trait] impl TableProvider for GenerateSeriesTable { fn as_any(&self) -> &dyn std::any::Any { @@ -325,18 +359,18 @@ impl TableProvider for GenerateSeriesTable { schema: self.schema(), start: TimestampValue { value: *start, - parsed_tz, + parsed_tz: Some(parsed_tz), tz_str: tz.clone(), }, end: TimestampValue { value: *end, - parsed_tz, + parsed_tz: Some(parsed_tz), tz_str: tz.clone(), }, step: *step, current: TimestampValue { value: *start, - parsed_tz, + parsed_tz: Some(parsed_tz), tz_str: tz.clone(), }, batch_size, @@ -344,6 +378,34 @@ impl TableProvider for GenerateSeriesTable { name, })) } + GenSeriesArgs::DateArgs { + start, + end, + step, + include_end, + name, + } => Arc::new(RwLock::new(GenericSeriesState { + schema: self.schema(), + start: TimestampValue { + value: *start, + parsed_tz: None, + tz_str: None, + }, + end: TimestampValue { + value: *end, + parsed_tz: None, + tz_str: None, + }, + step: *step, + current: TimestampValue { + value: *start, + parsed_tz: None, + tz_str: None, + }, + batch_size, + include_end: *include_end, + name, + })), }; Ok(Arc::new(LazyMemoryExec::try_new(schema, vec![generator])?)) @@ -372,9 +434,12 @@ impl TableFunctionImpl for GenerateSeriesFuncImpl { Expr::Literal(s, _) if matches!(s.data_type(), DataType::Timestamp(_, _)) => { self.call_timestamp(exprs) } + Expr::Literal(s, _) if matches!(s.data_type(), DataType::Date32) => { + self.call_date(exprs) + } Expr::Literal(scalar, _) => { plan_err!( - "Argument #1 must be an INTEGER, TIMESTAMP or NULL, got {:?}", + "Argument #1 must be an INTEGER, TIMESTAMP, DATE or NULL, got {:?}", scalar.data_type() ) } @@ -507,22 +572,8 @@ impl GenerateSeriesFuncImpl { })); }; - // Basic validation - if step.months == 0 && step.days == 0 && step.nanoseconds == 0 { - return plan_err!("Step interval cannot be zero"); - } - - // Check for infinite series conditions with timestamps - let step_is_positive = step.months > 0 || step.days > 0 || step.nanoseconds > 0; - let step_is_negative = step.months < 0 || step.days < 0 || step.nanoseconds < 0; - - if start > end && step_is_positive { - return plan_err!("Start is bigger than end, but increment is positive: Cannot generate infinite series"); - } - - if start < end && step_is_negative { - return plan_err!("Start is smaller than end, but increment is negative: Cannot generate infinite series"); - } + // Validate step interval + validate_interval_step(step, start, end)?; Ok(Arc::new(GenerateSeriesTable { schema, @@ -536,6 +587,98 @@ impl GenerateSeriesFuncImpl { }, })) } + + fn call_date(&self, exprs: &[Expr]) -> Result> { + if exprs.len() != 3 { + return plan_err!( + "{} function with dates requires exactly 3 arguments", + self.name + ); + } + + let schema = Arc::new(Schema::new(vec![Field::new( + "value", + DataType::Timestamp(TimeUnit::Nanosecond, None), + false, + )])); + + // Parse start date + let start_date = match &exprs[0] { + Expr::Literal(ScalarValue::Date32(Some(date)), _) => *date, + Expr::Literal(ScalarValue::Date32(None), _) + | Expr::Literal(ScalarValue::Null, _) => { + return Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::ContainsNull { name: self.name }, + })); + } + other => { + return plan_err!( + "First argument must be a date or NULL, got {:?}", + other + ) + } + }; + + // Parse end date + let end_date = match &exprs[1] { + Expr::Literal(ScalarValue::Date32(Some(date)), _) => *date, + Expr::Literal(ScalarValue::Date32(None), _) + | Expr::Literal(ScalarValue::Null, _) => { + return Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::ContainsNull { name: self.name }, + })); + } + other => { + return plan_err!( + "Second argument must be a date or NULL, got {:?}", + other + ) + } + }; + + // Parse step interval + let step_interval = match &exprs[2] { + Expr::Literal(ScalarValue::IntervalMonthDayNano(Some(interval)), _) => { + *interval + } + Expr::Literal(ScalarValue::IntervalMonthDayNano(None), _) + | Expr::Literal(ScalarValue::Null, _) => { + return Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::ContainsNull { name: self.name }, + })); + } + other => { + return plan_err!( + "Third argument must be an interval or NULL, got {:?}", + other + ) + } + }; + + // Convert Date32 (days since epoch) to timestamp nanoseconds (nanoseconds since epoch) + // Date32 is days since 1970-01-01, so multiply by nanoseconds per day + const NANOS_PER_DAY: i64 = 24 * 60 * 60 * 1_000_000_000; + + let start_ts = start_date as i64 * NANOS_PER_DAY; + let end_ts = end_date as i64 * NANOS_PER_DAY; + + // Validate step interval + validate_interval_step(step_interval, start_ts, end_ts)?; + + Ok(Arc::new(GenerateSeriesTable { + schema, + args: GenSeriesArgs::DateArgs { + start: start_ts, + end: end_ts, + step: step_interval, + include_end: self.include_end, + name: self.name, + }, + })) + } } #[derive(Debug)] diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index 73b61e665b5c..f39fba223b0e 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -177,7 +177,7 @@ statement error DataFusion error: Error during planning: generate_series functio SELECT * FROM generate_series(1, 2, 3, 4) -statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP or NULL, got Utf8 +statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP, DATE or NULL, got Utf8 SELECT * FROM generate_series('foo', 'bar') # UDF and UDTF `generate_series` can be used simultaneously @@ -300,7 +300,7 @@ statement error DataFusion error: Error during planning: range function requires SELECT * FROM range(1, 2, 3, 4) -statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP or NULL, got Utf8 +statement error DataFusion error: Error during planning: Argument \#1 must be an INTEGER, TIMESTAMP, DATE or NULL, got Utf8 SELECT * FROM range('foo', 'bar') statement error DataFusion error: Error during planning: Argument #2 must be an INTEGER or NULL, got Literal\(Utf8\("bar"\), None\) @@ -400,3 +400,73 @@ query P SELECT * FROM range(TIMESTAMPTZ '2023-02-01T00:00:00-07:00', TIMESTAMPTZ '2023-02-01T09:00:00+01:00', INTERVAL '1' HOUR); ---- 2023-02-01T07:00:00Z + +# Basic date range with hour interval +query P +SELECT * FROM range(DATE '1992-01-01', DATE '1992-01-03', INTERVAL '6' HOUR); +---- +1992-01-01T00:00:00 +1992-01-01T06:00:00 +1992-01-01T12:00:00 +1992-01-01T18:00:00 +1992-01-02T00:00:00 +1992-01-02T06:00:00 +1992-01-02T12:00:00 +1992-01-02T18:00:00 + +# Date range with day interval +query P +SELECT * FROM range(DATE '1992-09-01', DATE '1992-09-05', INTERVAL '1' DAY); +---- +1992-09-01T00:00:00 +1992-09-02T00:00:00 +1992-09-03T00:00:00 +1992-09-04T00:00:00 + +# Date range with month interval +query P +SELECT * FROM range(DATE '1992-09-01', DATE '1993-01-01', INTERVAL '1' MONTH); +---- +1992-09-01T00:00:00 +1992-10-01T00:00:00 +1992-11-01T00:00:00 +1992-12-01T00:00:00 + +# Date range generate_series includes end +query P +SELECT * FROM generate_series(DATE '1992-09-01', DATE '1992-09-03', INTERVAL '1' DAY); +---- +1992-09-01T00:00:00 +1992-09-02T00:00:00 +1992-09-03T00:00:00 + +# Backwards date range +query P +SELECT * FROM range(DATE '1992-09-05', DATE '1992-09-01', INTERVAL '-1' DAY); +---- +1992-09-05T00:00:00 +1992-09-04T00:00:00 +1992-09-03T00:00:00 +1992-09-02T00:00:00 + +# NULL handling for dates +query P +SELECT * FROM range(DATE '1992-09-01', NULL::DATE, INTERVAL '1' MONTH) +---- + +query P +SELECT * FROM range(NULL::DATE, DATE '1992-09-01', INTERVAL '1' MONTH) +---- + +query P +SELECT * FROM range(DATE '1992-09-01', DATE '1992-10-01', NULL::INTERVAL) +---- + +query error DataFusion error: Error during planning: Start is bigger than end, but increment is positive: Cannot generate infinite series +SELECT * FROM range(DATE '2023-01-03', DATE '2023-01-01', INTERVAL '1' DAY) + +query error DataFusion error: Error during planning: Start is smaller than end, but increment is negative: Cannot generate infinite series +SELECT * FROM range(DATE '2023-01-01', DATE '2023-01-02', INTERVAL '-1' DAY) + +query error DataFusion error: Error during planning: range function with dates requires exactly 3 arguments +SELECT * FROM range(DATE '2023-01-01', DATE '2023-01-03') From 3bfb4538f14cc2ed81198eb1256bd21f96694453 Mon Sep 17 00:00:00 2001 From: Simon Vandel Sillesen Date: Wed, 25 Jun 2025 12:11:47 +0200 Subject: [PATCH 4/4] explain --- .../sqllogictest/test_files/table_functions.slt | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/datafusion/sqllogictest/test_files/table_functions.slt b/datafusion/sqllogictest/test_files/table_functions.slt index f39fba223b0e..e53306958c4a 100644 --- a/datafusion/sqllogictest/test_files/table_functions.slt +++ b/datafusion/sqllogictest/test_files/table_functions.slt @@ -440,6 +440,12 @@ SELECT * FROM generate_series(DATE '1992-09-01', DATE '1992-09-03', INTERVAL '1' 1992-09-02T00:00:00 1992-09-03T00:00:00 +query TT +EXPLAIN SELECT * FROM generate_series(DATE '1992-09-01', DATE '1992-09-03', INTERVAL '1' DAY); +---- +logical_plan TableScan: generate_series() projection=[value] +physical_plan LazyMemoryExec: partitions=1, batch_generators=[generate_series: start=715305600000000000, end=715478400000000000, batch_size=8192] + # Backwards date range query P SELECT * FROM range(DATE '1992-09-05', DATE '1992-09-01', INTERVAL '-1' DAY); @@ -454,6 +460,12 @@ query P SELECT * FROM range(DATE '1992-09-01', NULL::DATE, INTERVAL '1' MONTH) ---- +query TT +EXPLAIN SELECT * FROM range(DATE '1992-09-01', NULL::DATE, INTERVAL '1' MONTH) +---- +logical_plan TableScan: range() projection=[value] +physical_plan LazyMemoryExec: partitions=1, batch_generators=[range: empty] + query P SELECT * FROM range(NULL::DATE, DATE '1992-09-01', INTERVAL '1' MONTH) ----