From 422a98639612b4a3cd817e1478c69d4d2d98b76f Mon Sep 17 00:00:00 2001 From: tycho garen Date: Wed, 24 Jul 2024 17:43:52 -0400 Subject: [PATCH 1/2] chore: cleanup some timestamp handling, mostly in bson --- crates/datasources/src/bson/builder.rs | 56 +++++++++++++++++++++++- crates/datasources/src/bson/errors.rs | 6 +++ crates/datasources/src/common/util.rs | 20 ++++++++- crates/datasources/src/sqlite/convert.rs | 17 ++++--- 4 files changed, 92 insertions(+), 7 deletions(-) diff --git a/crates/datasources/src/bson/builder.rs b/crates/datasources/src/bson/builder.rs index 334eec28b..cd65fba4b 100644 --- a/crates/datasources/src/bson/builder.rs +++ b/crates/datasources/src/bson/builder.rs @@ -29,6 +29,7 @@ use datafusion::arrow::array::{ use datafusion::arrow::datatypes::{DataType, Field, Fields, TimeUnit}; use crate::bson::errors::{BsonError, Result}; +use crate::common::util::try_parse_datetime; /// Similar to arrow's `StructBuilder`, but specific for "shredding" bson /// records. @@ -288,6 +289,41 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> (RawBsonRef::String(v), DataType::Float64) => { append_scalar!(Float64Builder, col, v.parse().unwrap_or_default()) } + (RawBsonRef::String(v), DataType::Date64) => { + append_scalar!( + Date64Builder, + col, + try_parse_datetime(v)?.and_utc().timestamp_millis() + ) + } + (RawBsonRef::String(v), DataType::Date32) => { + append_scalar!( + Date32Builder, + col, + try_parse_datetime(v)?.and_utc().timestamp() as i32 + ) + } + (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => { + append_scalar!( + TimestampMillisecondBuilder, + col, + try_parse_datetime(v)?.and_utc().timestamp_millis() + ) + } + (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Microsecond, _)) => { + append_scalar!( + TimestampMicrosecondBuilder, + col, + try_parse_datetime(v)?.and_utc().timestamp_micros() + ) + } + (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Second, _)) => { + append_scalar!( + TimestampSecondBuilder, + col, + try_parse_datetime(v)?.and_utc().timestamp() + ) + } // Binary (RawBsonRef::Binary(v), DataType::Binary) => append_scalar!(BinaryBuilder, col, v.bytes), @@ -295,7 +331,7 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> append_scalar!(LargeBinaryBuilder, col, v.bytes) } - // Object id + // ObjectId (RawBsonRef::ObjectId(v), DataType::Binary) => { append_scalar!(BinaryBuilder, col, v.bytes()) } @@ -328,6 +364,15 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> ))? ) } + (RawBsonRef::Timestamp(v), DataType::Utf8) => { + append_scalar!( + StringBuilder, + col, + chrono::DateTime::from_timestamp_millis(v.time as i64 * 1000) + .ok_or_else(|| BsonError::InvalidValue(v.to_string()))? + .to_rfc3339() + ) + } // Datetime (actual timestamps that you'd actually use in an application) (RawBsonRef::DateTime(v), DataType::Timestamp(TimeUnit::Second, _)) => { @@ -356,6 +401,15 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> (RawBsonRef::DateTime(v), DataType::Date32) => { append_scalar!(Date32Builder, col, (v.timestamp_millis() / 1000) as i32) } + (RawBsonRef::DateTime(v), DataType::Utf8) => { + append_scalar!( + StringBuilder, + col, + chrono::DateTime::from_timestamp_millis(v.timestamp_millis()) + .ok_or_else(|| BsonError::InvalidValue(v.to_string()))? + .to_rfc3339() + ) + } // Document (RawBsonRef::Document(nested), DataType::Struct(_)) => { diff --git a/crates/datasources/src/bson/errors.rs b/crates/datasources/src/bson/errors.rs index 33338bd56..98d2ae937 100644 --- a/crates/datasources/src/bson/errors.rs +++ b/crates/datasources/src/bson/errors.rs @@ -52,6 +52,12 @@ pub enum BsonError { #[error("no objects found {0}")] NotFound(String), + + #[error(transparent)] + DateTimeParse(#[from] chrono::ParseError), + + #[error("invalid value: {0}")] + InvalidValue(String), } impl From for DataFusionError { diff --git a/crates/datasources/src/common/util.rs b/crates/datasources/src/common/util.rs index 48f90c361..53ca6d96f 100644 --- a/crates/datasources/src/common/util.rs +++ b/crates/datasources/src/common/util.rs @@ -1,7 +1,7 @@ use std::fmt::Write; use std::sync::Arc; -use chrono::{Duration, TimeZone, Utc}; +use chrono::{Duration, NaiveTime, TimeZone, Utc}; use datafusion::arrow::array::{Array, ArrayRef, UInt64Array}; use datafusion::arrow::compute::{cast_with_options, CastOptions}; use datafusion::arrow::datatypes::{DataType, Field, Schema, TimeUnit}; @@ -205,6 +205,24 @@ pub fn create_count_record_batch(count: u64) -> RecordBatch { .unwrap() } +pub fn try_parse_datetime(v: &str) -> Result, chrono::ParseError> { + chrono::DateTime::parse_from_rfc3339(v) + .or_else(|_| chrono::DateTime::parse_from_rfc2822(v)) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S%.f") + .map(|dt| dt.and_utc().fixed_offset()) + }) + .or_else(|_| { + chrono::NaiveDateTime::parse_from_str(v, "%Y-%m-%d %H:%M:%S") + .map(|dt| dt.and_utc().fixed_offset()) + }) + .or_else(|_| { + chrono::NaiveDate::parse_from_str(v, "%Y-%m-%d") + .map(|dt| dt.and_time(NaiveTime::default()).and_utc().fixed_offset()) + }) + .map(|dt| dt.to_utc()) +} + #[cfg(test)] mod tests { diff --git a/crates/datasources/src/sqlite/convert.rs b/crates/datasources/src/sqlite/convert.rs index 5167599bc..54512881b 100644 --- a/crates/datasources/src/sqlite/convert.rs +++ b/crates/datasources/src/sqlite/convert.rs @@ -314,10 +314,11 @@ impl Converter { builder.append_null(); } ValueRef::Text(t) => { - // TODO: Support other str formats let t = std::str::from_utf8(t).unwrap(); - let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + // note: use this parsing order rather + // than util::try_parse_datetime, + // given the likely values of sqlite dates let date = NaiveDate::parse_from_str(t, "%Y-%m-%d") .or_else(|_| { NaiveDateTime::parse_from_str(t, "%Y-%m-%d %H:%M:%S%.f") @@ -339,6 +340,9 @@ impl Converter { to: DataType::Date32, cause: Some(e.to_string()), })?; + + let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap(); + let num_days_since_epoch = date.signed_duration_since(epoch).num_days(); let i = i32::try_from(num_days_since_epoch).map_err(|e| { @@ -436,9 +440,11 @@ impl Converter { builder.append_null(); } ValueRef::Text(t) => { - // TODO: Support other str formats let t = std::str::from_utf8(t).unwrap(); - let epoch = NaiveTime::from_hms_opt(0, 0, 0).unwrap(); + + // note: use this parsing order rather + // than util::try_parse_datetime, + // given the likely values of sqlite dates let time = NaiveTime::parse_from_str(t, "%H:%M:%S%.f") .or_else(|_| NaiveTime::parse_from_str(t, "%H:%M:%S%")) .or_else(|_| { @@ -466,7 +472,8 @@ impl Converter { cause: Some(e.to_string()), })?; - let duration_since_midnight = time.signed_duration_since(epoch); + let duration_since_midnight = + time.signed_duration_since(NaiveTime::default()); let microseconds_since_midnight = duration_since_midnight.num_microseconds().unwrap(); builder.append_value(microseconds_since_midnight); From 7a2faee542df2f9683a9a08af7fc45e1ba0fa8ce Mon Sep 17 00:00:00 2001 From: tycho garen Date: Wed, 24 Jul 2024 23:11:05 -0400 Subject: [PATCH 2/2] fix compile --- crates/datasources/src/bson/builder.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/crates/datasources/src/bson/builder.rs b/crates/datasources/src/bson/builder.rs index cd65fba4b..28dab18ce 100644 --- a/crates/datasources/src/bson/builder.rs +++ b/crates/datasources/src/bson/builder.rs @@ -293,35 +293,35 @@ fn append_value(val: RawBsonRef, typ: &DataType, col: &mut dyn ArrayBuilder) -> append_scalar!( Date64Builder, col, - try_parse_datetime(v)?.and_utc().timestamp_millis() + try_parse_datetime(v)?.timestamp_millis() ) } (RawBsonRef::String(v), DataType::Date32) => { append_scalar!( Date32Builder, col, - try_parse_datetime(v)?.and_utc().timestamp() as i32 + try_parse_datetime(v)?.timestamp() as i32 ) } (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Millisecond, _)) => { append_scalar!( TimestampMillisecondBuilder, col, - try_parse_datetime(v)?.and_utc().timestamp_millis() + try_parse_datetime(v)?.timestamp_millis() ) } (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Microsecond, _)) => { append_scalar!( TimestampMicrosecondBuilder, col, - try_parse_datetime(v)?.and_utc().timestamp_micros() + try_parse_datetime(v)?.timestamp_micros() ) } (RawBsonRef::String(v), DataType::Timestamp(TimeUnit::Second, _)) => { append_scalar!( TimestampSecondBuilder, col, - try_parse_datetime(v)?.and_utc().timestamp() + try_parse_datetime(v)?.timestamp() ) }