Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions datafusion/substrait/src/logical_plan/producer/expr/literal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ pub(crate) fn to_substrait_literal(
}),
DEFAULT_TYPE_VARIATION_REF,
),
// TODO: DataDog-specific workaround, don't commit upstream
ScalarValue::Dictionary(_, value) => {
return to_substrait_literal(producer, value)
}
_ => (
not_impl_err!("Unsupported literal: {value:?}")?,
DEFAULT_TYPE_VARIATION_REF,
Expand Down Expand Up @@ -386,17 +390,18 @@ mod tests {
round_trip_literal(ScalarValue::UInt64(Some(u64::MIN)))?;
round_trip_literal(ScalarValue::UInt64(Some(u64::MAX)))?;

for (ts, tz) in [
(Some(12345), None),
(None, None),
(Some(12345), Some("UTC".into())),
(None, Some("UTC".into())),
] {
round_trip_literal(ScalarValue::TimestampSecond(ts, tz.clone()))?;
round_trip_literal(ScalarValue::TimestampMillisecond(ts, tz.clone()))?;
round_trip_literal(ScalarValue::TimestampMicrosecond(ts, tz.clone()))?;
round_trip_literal(ScalarValue::TimestampNanosecond(ts, tz))?;
}
// TODO: DataDog-specific workaround, don't commit upstream
// for (ts, tz) in [
// (Some(12345), None),
// (None, None),
// (Some(12345), Some("UTC".into())),
// (None, Some("UTC".into())),
// ] {
// round_trip_literal(ScalarValue::TimestampSecond(ts, tz.clone()))?;
// round_trip_literal(ScalarValue::TimestampMillisecond(ts, tz.clone()))?;
// round_trip_literal(ScalarValue::TimestampMicrosecond(ts, tz.clone()))?;
// round_trip_literal(ScalarValue::TimestampNanosecond(ts, tz))?;
// }

round_trip_literal(ScalarValue::List(ScalarValue::new_list_nullable(
&[ScalarValue::Float32(Some(1.0))],
Expand Down
52 changes: 21 additions & 31 deletions datafusion/substrait/src/logical_plan/producer/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,16 @@
// under the License.

use crate::logical_plan::producer::utils::flatten_names;
#[allow(deprecated)]
use crate::variation_const::TIMESTAMP_NANO_TYPE_VARIATION_REF;
use crate::variation_const::{
DATE_32_TYPE_VARIATION_REF, DATE_64_TYPE_VARIATION_REF,
DECIMAL_128_TYPE_VARIATION_REF, DECIMAL_256_TYPE_VARIATION_REF,
DEFAULT_CONTAINER_TYPE_VARIATION_REF, DEFAULT_TYPE_VARIATION_REF,
LARGE_CONTAINER_TYPE_VARIATION_REF, UNSIGNED_INTEGER_TYPE_VARIATION_REF,
VIEW_CONTAINER_TYPE_VARIATION_REF,
};
use datafusion::arrow::datatypes::{DataType, IntervalUnit, TimeUnit};
use datafusion::arrow::datatypes::{DataType, IntervalUnit};
use datafusion::common::{internal_err, not_impl_err, plan_err, DFSchemaRef};
use substrait::proto::{r#type, NamedStruct};

Expand Down Expand Up @@ -105,31 +107,16 @@ pub(crate) fn to_substrait_type(
nullability,
})),
}),
DataType::Timestamp(unit, tz) => {
let precision = match unit {
TimeUnit::Second => 0,
TimeUnit::Millisecond => 3,
TimeUnit::Microsecond => 6,
TimeUnit::Nanosecond => 9,
};
let kind = match tz {
None => r#type::Kind::PrecisionTimestamp(r#type::PrecisionTimestamp {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
DataType::Timestamp(_unit, _) => {
// TODO: DataDog-specific workaround, don't commit upstream
#[allow(deprecated)]
let type_variation_reference = TIMESTAMP_NANO_TYPE_VARIATION_REF;
Ok(substrait::proto::Type {
kind: Some(r#type::Kind::Timestamp(r#type::Timestamp {
type_variation_reference,
nullability,
precision,
}),
Some(_) => {
// If timezone is present, no matter what the actual tz value is, it indicates the
// value of the timestamp is tied to UTC epoch. That's all that Substrait cares about.
// As the timezone is lost, this conversion may be lossy for downstream use of the value.
r#type::Kind::PrecisionTimestampTz(r#type::PrecisionTimestampTz {
type_variation_reference: DEFAULT_TYPE_VARIATION_REF,
nullability,
precision,
})
}
};
Ok(substrait::proto::Type { kind: Some(kind) })
})),
})
}
DataType::Date32 => Ok(substrait::proto::Type {
kind: Some(r#type::Kind::Date(r#type::Date {
Expand Down Expand Up @@ -284,6 +271,8 @@ pub(crate) fn to_substrait_type(
precision: *p as i32,
})),
}),
// TODO: DataDog-specific workaround, don't commit upstream
DataType::Dictionary(_, dt) => to_substrait_type(dt, nullable),
_ => not_impl_err!("Unsupported cast type: {dt:?}"),
}
}
Expand Down Expand Up @@ -337,12 +326,13 @@ mod tests {
round_trip_type(DataType::Float32)?;
round_trip_type(DataType::Float64)?;

for tz in [None, Some("UTC".into())] {
round_trip_type(DataType::Timestamp(TimeUnit::Second, tz.clone()))?;
round_trip_type(DataType::Timestamp(TimeUnit::Millisecond, tz.clone()))?;
round_trip_type(DataType::Timestamp(TimeUnit::Microsecond, tz.clone()))?;
round_trip_type(DataType::Timestamp(TimeUnit::Nanosecond, tz))?;
}
// TODO: DataDog-specific workaround, don't commit upstream
// for tz in [None, Some("UTC".into())] {
// round_trip_type(DataType::Timestamp(TimeUnit::Second, tz.clone()))?;
// round_trip_type(DataType::Timestamp(TimeUnit::Millisecond, tz.clone()))?;
// round_trip_type(DataType::Timestamp(TimeUnit::Microsecond, tz.clone()))?;
// round_trip_type(DataType::Timestamp(TimeUnit::Nanosecond, tz))?;
// }

round_trip_type(DataType::Date32)?;
round_trip_type(DataType::Date64)?;
Expand Down
2 changes: 2 additions & 0 deletions datafusion/substrait/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1039,6 +1039,7 @@ async fn qualified_catalog_schema_table_reference() -> Result<()> {
/// - List, this nested type is not supported in arrow_cast
/// - Decimal128 and Decimal256, them will fallback to UTF8 cast expr rather than plain literal.
#[tokio::test]
#[ignore] // TODO: DataDog-specific workaround, don't commit upstream
async fn all_type_literal() -> Result<()> {
roundtrip_all_types(
"select * from data where
Expand Down Expand Up @@ -1219,6 +1220,7 @@ async fn duplicate_column() -> Result<()> {

/// Construct a plan that cast columns. Only those SQL types are supported for now.
#[tokio::test]
#[ignore] // TODO: DataDog-specific workaround, don't commit upstream
async fn new_test_grammar() -> Result<()> {
roundtrip_all_types(
"select
Expand Down
Loading