Skip to content

Commit 705fae5

Browse files
committed
feat(cubestore): implement date_bin()
1 parent c0466fd commit 705fae5

File tree

2 files changed

+166
-1
lines changed

2 files changed

+166
-1
lines changed

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ impl ContextProvider for MetaStoreSchemaProvider {
409409
"unix_timestamp" | "UNIX_TIMESTAMP" => CubeScalarUDFKind::UnixTimestamp,
410410
"date_add" | "DATE_ADD" => CubeScalarUDFKind::DateAdd,
411411
"date_sub" | "DATE_SUB" => CubeScalarUDFKind::DateSub,
412+
"date_bin" | "DATE_BIN" => CubeScalarUDFKind::DateBin,
412413
_ => return None,
413414
};
414415
return Some(Arc::new(scalar_udf_by_kind(kind).descriptor()));

rust/cubestore/cubestore/src/queryplanner/udfs.rs

Lines changed: 165 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use crate::queryplanner::coalesce::{coalesce, SUPPORTED_COALESCE_TYPES};
22
use crate::queryplanner::hll::{Hll, HllUnion};
33
use crate::CubeError;
44
use chrono::{TimeZone, Utc};
5-
use datafusion::arrow::array::{Array, BinaryArray, TimestampNanosecondArray, UInt64Builder};
5+
use datafusion::arrow::array::{
6+
Array, ArrayRef, BinaryArray, TimestampNanosecondArray, UInt64Builder,
7+
};
68
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
79
use datafusion::cube_ext::datetime::{date_addsub_array, date_addsub_scalar};
810
use datafusion::error::DataFusionError;
@@ -16,6 +18,12 @@ use smallvec::smallvec;
1618
use smallvec::SmallVec;
1719
use std::sync::Arc;
1820

21+
const NANOS_IN_DAY: i64 = 86_400_000_000_000;
22+
const DAYS_IN_MONTH: i64 = 30; // Approximation for months
23+
const DAYS_IN_YEAR: i64 = 365; // Approximation for years
24+
const NANOS_IN_MONTH: i64 = DAYS_IN_MONTH * NANOS_IN_DAY;
25+
const NANOS_IN_YEAR: i64 = DAYS_IN_YEAR * NANOS_IN_DAY;
26+
1927
#[derive(Copy, Clone, Debug, Serialize, Deserialize)]
2028
pub enum CubeScalarUDFKind {
2129
HllCardinality, // cardinality(), accepting the HyperLogLog sketches.
@@ -24,6 +32,7 @@ pub enum CubeScalarUDFKind {
2432
UnixTimestamp,
2533
DateAdd,
2634
DateSub,
35+
DateBin,
2736
}
2837

2938
pub trait CubeScalarUDF {
@@ -40,6 +49,7 @@ pub fn scalar_udf_by_kind(k: CubeScalarUDFKind) -> Box<dyn CubeScalarUDF> {
4049
CubeScalarUDFKind::UnixTimestamp => Box::new(UnixTimestamp {}),
4150
CubeScalarUDFKind::DateAdd => Box::new(DateAddSub { is_add: true }),
4251
CubeScalarUDFKind::DateSub => Box::new(DateAddSub { is_add: false }),
52+
CubeScalarUDFKind::DateBin => Box::new(DateBin {}),
4353
}
4454
}
4555

@@ -63,6 +73,9 @@ pub fn scalar_kind_by_name(n: &str) -> Option<CubeScalarUDFKind> {
6373
if n == "DATE_SUB" {
6474
return Some(CubeScalarUDFKind::DateSub);
6575
}
76+
if n == "DATE_BIN" {
77+
return Some(CubeScalarUDFKind::DateBin);
78+
}
6679
return None;
6780
}
6881

@@ -192,6 +205,157 @@ impl CubeScalarUDF for UnixTimestamp {
192205
}
193206
}
194207

208+
fn interval_to_nanoseconds(interval: &ScalarValue) -> Result<i64, DataFusionError> {
209+
match interval {
210+
ScalarValue::IntervalYearMonth(Some(i)) => {
211+
// since years and months do not have a fixed number of nanoseconds,
212+
// months can have different lengths, and leap years affect the length of a year,
213+
// converting an IntervalYearMonth to nanoseconds requires certain assumptions:
214+
// 1 month = 30 days
215+
// 1 year = 12 months = 365 days
216+
217+
let years = i / 12;
218+
let months = i % 12;
219+
let total_nanoseconds =
220+
(years as i64) * NANOS_IN_YEAR + (months as i64) * NANOS_IN_MONTH;
221+
222+
Ok(total_nanoseconds)
223+
}
224+
ScalarValue::IntervalDayTime(Some(i)) => {
225+
let days: i64 = i.signum() * (i.abs() >> 32);
226+
let millis: i64 = i.signum() * ((i.abs() << 32) >> 32);
227+
let total_nanoseconds = days * NANOS_IN_DAY + millis * 1_000_000;
228+
229+
Ok(total_nanoseconds)
230+
}
231+
_ => Err(DataFusionError::Execution(format!(
232+
"Unsupported interval type"
233+
))),
234+
}
235+
}
236+
237+
struct DateBin {}
238+
impl DateBin {
239+
fn signature() -> Signature {
240+
Signature::OneOf(vec![
241+
Signature::Exact(vec![
242+
DataType::Interval(IntervalUnit::YearMonth),
243+
DataType::Timestamp(TimeUnit::Nanosecond, None),
244+
DataType::Timestamp(TimeUnit::Nanosecond, None),
245+
]),
246+
Signature::Exact(vec![
247+
DataType::Interval(IntervalUnit::DayTime),
248+
DataType::Timestamp(TimeUnit::Nanosecond, None),
249+
DataType::Timestamp(TimeUnit::Nanosecond, None),
250+
]),
251+
])
252+
}
253+
}
254+
impl CubeScalarUDF for DateBin {
255+
fn kind(&self) -> CubeScalarUDFKind {
256+
CubeScalarUDFKind::DateBin
257+
}
258+
259+
fn name(&self) -> &str {
260+
"DATE_BIN"
261+
}
262+
263+
fn descriptor(&self) -> ScalarUDF {
264+
return ScalarUDF {
265+
name: self.name().to_string(),
266+
signature: Self::signature(),
267+
return_type: Arc::new(|_| {
268+
Ok(Arc::new(DataType::Timestamp(TimeUnit::Nanosecond, None)))
269+
}),
270+
fun: Arc::new(move |inputs| {
271+
assert_eq!(inputs.len(), 3);
272+
let interval = match &inputs[0] {
273+
ColumnarValue::Scalar(i) => i.clone(),
274+
_ => {
275+
// We leave this case out for simplicity.
276+
// CubeStore does not allow intervals inside tables, so this is super rare.
277+
return Err(DataFusionError::Execution(format!(
278+
"Only scalar intervals are supported in DATE_BIN"
279+
)));
280+
}
281+
};
282+
let interval_ns = interval_to_nanoseconds(&interval)?;
283+
284+
let origin_ns = match &inputs[2] {
285+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(o))) => *o,
286+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None)) => {
287+
return Err(DataFusionError::Execution(format!(
288+
"Third argument (origin) of DATE_BIN must be a non-null timestamp"
289+
)));
290+
}
291+
_ => {
292+
// Leaving out other rare cases.
293+
// The initial need for the date_bin comes from custom granularities support
294+
// and there will always be a scalar origin point
295+
return Err(DataFusionError::Execution(format!(
296+
"Only scalar origins are supported in DATE_BIN"
297+
)));
298+
}
299+
};
300+
301+
match &inputs[1] {
302+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None)) => Ok(
303+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(None)),
304+
),
305+
ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(Some(t))) => {
306+
let timestamp_ns = *t;
307+
let diff = timestamp_ns - origin_ns;
308+
let num_intervals = diff / interval_ns;
309+
let mut nearest_timestamp = origin_ns + num_intervals * interval_ns;
310+
311+
// We need the closest date before, not after
312+
if diff < 0 {
313+
nearest_timestamp -= interval_ns
314+
}
315+
316+
Ok(ColumnarValue::Scalar(ScalarValue::TimestampNanosecond(
317+
Some(nearest_timestamp),
318+
)))
319+
}
320+
ColumnarValue::Array(t) if t.as_any().is::<TimestampNanosecondArray>() => {
321+
let t = t
322+
.as_any()
323+
.downcast_ref::<TimestampNanosecondArray>()
324+
.unwrap();
325+
326+
let mut builder = TimestampNanosecondArray::builder(t.len());
327+
328+
for i in 0..t.len() {
329+
if t.is_null(i) {
330+
builder.append_null()?;
331+
} else {
332+
let ts = t.value(i);
333+
let diff = ts - origin_ns;
334+
let num_intervals = diff / interval_ns;
335+
let mut nearest_timestamp = origin_ns + num_intervals * interval_ns;
336+
337+
// We need the closest date before, not after
338+
if diff < 0 {
339+
nearest_timestamp -= interval_ns
340+
}
341+
342+
builder.append_value(nearest_timestamp)?;
343+
}
344+
}
345+
346+
Ok(ColumnarValue::Array(Arc::new(builder.finish()) as ArrayRef))
347+
}
348+
_ => {
349+
return Err(DataFusionError::Execution(format!(
350+
"Second argument of DATE_BIN must be a non-null timestamp"
351+
)));
352+
}
353+
}
354+
}),
355+
};
356+
}
357+
}
358+
195359
struct DateAddSub {
196360
is_add: bool,
197361
}

0 commit comments

Comments
 (0)