Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions native/core/src/parquet/schema_adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,14 +194,23 @@ impl SparkPhysicalExprAdapter {
let physical_type = cast.input_field().data_type();
let target_type = cast.target_field().data_type();

// For complex nested types (Struct, List, Map), use CometCastColumnExpr
// with spark_parquet_convert which handles field-name-based selection,
// reordering, and nested type casting correctly.
// For complex nested types (Struct, List, Map) and Timestamp timezone
// mismatches, use CometCastColumnExpr with spark_parquet_convert which
// handles field-name-based selection, reordering, nested type casting,
// and metadata-only timestamp timezone relabeling correctly.
//
// Timestamp mismatches (e.g., Timestamp(us, None) -> Timestamp(us, Some("UTC")))
// occur when INT96 Parquet timestamps are coerced to Timestamp(us, None) by
// DataFusion but the logical schema expects Timestamp(us, Some("UTC")).
// Using Spark's Cast here would incorrectly treat the None-timezone values as
// local time (TimestampNTZ) and apply a timezone conversion, but the values are
// already in UTC. spark_parquet_convert handles this as a metadata-only change.
if matches!(
(physical_type, target_type),
(DataType::Struct(_), DataType::Struct(_))
| (DataType::List(_), DataType::List(_))
| (DataType::Map(_, _), DataType::Map(_, _))
| (DataType::Timestamp(_, _), DataType::Timestamp(_, _))
) {
let comet_cast: Arc<dyn PhysicalExpr> = Arc::new(
CometCastColumnExpr::new(
Expand Down
Loading