Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,14 @@ config_namespace! {
/// BLOB instead.
pub binary_as_string: bool, default = false

/// (reading) If true, parquet reader will read columns of
/// physical type int96 as originating from a different resolution
/// than nanosecond. This is useful for reading data from systems like Spark
/// which stores microsecond resolution timestamps in an int96 allowing it
/// to write values with a larger date range than 64-bit timestamps with
/// nanosecond resolution.
pub coerce_int96: Option<String>, transform = str::to_lowercase, default = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any usecase for int96 other than timestamps.

Specifically, maybe we can simply always change the behavior and coerce int96 --> microseconds

At the very least default the option to be enabled perhaps

Copy link
Contributor Author

@mbutrovich mbutrovich Apr 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if there is any usecase for int96 other than timestamps.

Not as far as I know, but I don't think the (deprecated) int96 spec said that it had to represent a timestamp. It's just where Spark, Hive, Impala, etc. ended up.

Specifically, maybe we can simply always change the behavior and coerce int96 --> microseconds
At the very least default the option to be enabled perhaps

It's not clear to me if we should assume that an int96 originated from a system that treated the originating timestamp it as microseconds. While it's very likely that it originated from one of those systems, I don't know how to treat the default in this case. Snowflake, for example, seems to use microseconds for its timestamps when dealing with Iceberg:
https://docs.snowflake.com/en/user-guide/tables-iceberg-data-types#supported-data-types-for-iceberg-tables

I'm hesitant to mess with defaults, but am open to hearing more from the community. @parthchandra

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given how dominant spark is and how rarely used int96 is outside the spark ecosystem, I was thinking that basically if anyone had such a file it is likely we should treat the values as microseconds.

I don't have a strong preference, I was just trying to come up with a way to keep the code less compilcated

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC, the use of int96 originated from Impala/Parquet-cpp where it was used to store nanoseconds (The C++ implementation came from the Impala team). I think the Java implementation ended up with int96 in order to be compatible. Spark came along with its own variant and well, here we are.
(https://issues.apache.org/jira/browse/PARQUET-323)
The Parquet community assumed that this was the only usage of int96 before it was deprecated so I feel it is a safe for us to assume the same.
It can be done as a follow up, though, I feel.


// The following options affect writing to parquet files
// and map to parquet::file::properties::WriterProperties

Expand Down
3 changes: 3 additions & 0 deletions datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,7 @@ impl ParquetOptions {
bloom_filter_on_read: _, // reads not used for writer props
schema_force_view_types: _,
binary_as_string: _, // not used for writer props
coerce_int96: _, // not used for writer props
skip_arrow_metadata: _,
} = self;

Expand Down Expand Up @@ -516,6 +517,7 @@ mod tests {
schema_force_view_types: defaults.schema_force_view_types,
binary_as_string: defaults.binary_as_string,
skip_arrow_metadata: defaults.skip_arrow_metadata,
coerce_int96: None,
}
}

Expand Down Expand Up @@ -622,6 +624,7 @@ mod tests {
schema_force_view_types: global_options_defaults.schema_force_view_types,
binary_as_string: global_options_defaults.binary_as_string,
skip_arrow_metadata: global_options_defaults.skip_arrow_metadata,
coerce_int96: None,
},
column_specific_options,
key_value_metadata,
Expand Down
11 changes: 10 additions & 1 deletion datafusion/core/src/datasource/file_format/avro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,15 @@ mod tests {
let testdata = test_util::arrow_test_data();
let store_root = format!("{testdata}/avro");
let format = AvroFormat {};
scan_format(state, &format, &store_root, file_name, projection, limit).await
scan_format(
state,
&format,
None,
&store_root,
file_name,
projection,
limit,
)
.await
}
}
4 changes: 3 additions & 1 deletion datafusion/core/src/datasource/file_format/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ mod tests {
let exec = scan_format(
&state,
&format,
None,
root,
"aggregate_test_100_with_nulls.csv",
projection,
Expand Down Expand Up @@ -300,6 +301,7 @@ mod tests {
let exec = scan_format(
&state,
&format,
None,
root,
"aggregate_test_100_with_nulls.csv",
projection,
Expand Down Expand Up @@ -582,7 +584,7 @@ mod tests {
) -> Result<Arc<dyn ExecutionPlan>> {
let root = format!("{}/csv", arrow_test_data());
let format = CsvFormat::default().with_has_header(has_header);
scan_format(state, &format, &root, file_name, projection, limit).await
scan_format(state, &format, None, &root, file_name, projection, limit).await
}

#[tokio::test]
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/file_format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ mod tests {
) -> Result<Arc<dyn ExecutionPlan>> {
let filename = "tests/data/2.json";
let format = JsonFormat::default();
scan_format(state, &format, ".", filename, projection, limit).await
scan_format(state, &format, None, ".", filename, projection, limit).await
}

#[tokio::test]
Expand Down
15 changes: 10 additions & 5 deletions datafusion/core/src/datasource/file_format/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,19 +36,20 @@ pub use datafusion_datasource::write;

#[cfg(test)]
pub(crate) mod test_util {
use std::sync::Arc;

use arrow_schema::SchemaRef;
use datafusion_catalog::Session;
use datafusion_common::Result;
use datafusion_datasource::file_scan_config::FileScanConfigBuilder;
use datafusion_datasource::{file_format::FileFormat, PartitionedFile};
use datafusion_execution::object_store::ObjectStoreUrl;
use std::sync::Arc;

use crate::test::object_store::local_unpartitioned_file;

pub async fn scan_format(
state: &dyn Session,
format: &dyn FileFormat,
schema: Option<SchemaRef>,
store_root: &str,
file_name: &str,
projection: Option<Vec<usize>>,
Expand All @@ -57,9 +58,13 @@ pub(crate) mod test_util {
let store = Arc::new(object_store::local::LocalFileSystem::new()) as _;
let meta = local_unpartitioned_file(format!("{store_root}/{file_name}"));

let file_schema = format
.infer_schema(state, &store, std::slice::from_ref(&meta))
.await?;
let file_schema = if let Some(file_schema) = schema {
file_schema
} else {
format
.infer_schema(state, &store, std::slice::from_ref(&meta))
.await?
};

let statistics = format
.infer_stats(state, &store, file_schema.clone(), &meta)
Expand Down
5 changes: 4 additions & 1 deletion datafusion/core/src/datasource/file_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,10 @@ mod tests {
.map(|factory| factory.create(state, &Default::default()).unwrap())
.unwrap_or(Arc::new(ParquetFormat::new()));

scan_format(state, &*format, &testdata, file_name, projection, limit).await
scan_format(
state, &*format, None, &testdata, file_name, projection, limit,
)
.await
}

/// Test that 0-byte files don't break while reading
Expand Down
89 changes: 88 additions & 1 deletion datafusion/core/src/datasource/physical_plan/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod tests {
use crate::prelude::{ParquetReadOptions, SessionConfig, SessionContext};
use crate::test::object_store::local_unpartitioned_file;
use arrow::array::{
ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
ArrayRef, AsArray, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
StructArray,
};
use arrow::datatypes::{DataType, Field, Fields, Schema, SchemaBuilder};
Expand Down Expand Up @@ -1109,6 +1109,7 @@ mod tests {
let parquet_exec = scan_format(
&state,
&ParquetFormat::default(),
None,
&testdata,
filename,
Some(vec![0, 1, 2]),
Expand Down Expand Up @@ -1141,6 +1142,92 @@ mod tests {
Ok(())
}

#[tokio::test]
async fn parquet_exec_with_int96_from_spark() -> Result<()> {
// arrow-rs relies on the chrono library to convert between timestamps and strings, so
// instead compare as Int64. The underlying type should be a PrimitiveArray of Int64
// anyway, so this should be a zero-copy non-modifying cast at the SchemaAdapter.

let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int64, true)]));
let testdata = datafusion_common::test_util::parquet_test_data();
let filename = "int96_from_spark.parquet";
let session_ctx = SessionContext::new();
let state = session_ctx.state();
let task_ctx = state.task_ctx();

let time_units_and_expected = vec![
(
None, // Same as "ns" time_unit
Arc::new(Int64Array::from(vec![
Some(1704141296123456000), // Reads as nanosecond fine (note 3 extra 0s)
Some(1704070800000000000), // Reads as nanosecond fine (note 3 extra 0s)
Some(-4852191831933722624), // Cannot be represented with nanos timestamp (year 9999)
Some(1735599600000000000), // Reads as nanosecond fine (note 3 extra 0s)
None,
Some(-4864435138808946688), // Cannot be represented with nanos timestamp (year 290000)
])),
),
(
Some("ns".to_string()),
Arc::new(Int64Array::from(vec![
Some(1704141296123456000),
Some(1704070800000000000),
Some(-4852191831933722624),
Some(1735599600000000000),
None,
Some(-4864435138808946688),
])),
),
(
Some("us".to_string()),
Arc::new(Int64Array::from(vec![
Some(1704141296123456),
Some(1704070800000000),
Some(253402225200000000),
Some(1735599600000000),
None,
Some(9089380393200000000),
])),
),
];

for (time_unit, expected) in time_units_and_expected {
let parquet_exec = scan_format(
&state,
&ParquetFormat::default().with_coerce_int96(time_unit.clone()),
Some(schema.clone()),
&testdata,
filename,
Some(vec![0]),
None,
)
.await
.unwrap();
assert_eq!(parquet_exec.output_partitioning().partition_count(), 1);

let mut results = parquet_exec.execute(0, task_ctx.clone())?;
let batch = results.next().await.unwrap()?;

assert_eq!(6, batch.num_rows());
assert_eq!(1, batch.num_columns());

assert_eq!(batch.num_columns(), 1);
let column = batch.column(0);

assert_eq!(column.len(), expected.len());

column
.as_primitive::<arrow::datatypes::Int64Type>()
.iter()
.zip(expected.iter())
.for_each(|(lhs, rhs)| {
assert_eq!(lhs, rhs);
});
}

Ok(())
}

#[tokio::test]
async fn parquet_exec_with_range() -> Result<()> {
fn file_range(meta: &ObjectMeta, start: i64, end: i64) -> PartitionedFile {
Expand Down
69 changes: 61 additions & 8 deletions datafusion/datasource-parquet/src/file_format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,18 @@ use std::ops::Range;
use std::sync::Arc;

use arrow::array::RecordBatch;
use arrow::datatypes::{Fields, Schema, SchemaRef, TimeUnit};
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};

use datafusion_datasource::file_format::{
FileFormat, FileFormatFactory, FilePushdownSupport,
};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;

use arrow::compute::sum;
use arrow::datatypes::{DataType, Field, FieldRef};
use arrow::datatypes::{Fields, Schema, SchemaRef};
use datafusion_common::config::{ConfigField, ConfigFileType, TableParquetOptions};
use datafusion_common::parsers::CompressionTypeVariant;
use datafusion_common::stats::Precision;
Expand All @@ -38,15 +47,8 @@ use datafusion_common::{HashMap, Statistics};
use datafusion_common_runtime::{JoinSet, SpawnedTask};
use datafusion_datasource::display::FileGroupDisplay;
use datafusion_datasource::file::FileSource;
use datafusion_datasource::file_compression_type::FileCompressionType;
use datafusion_datasource::file_format::{
FileFormat, FileFormatFactory, FilePushdownSupport,
};
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
use datafusion_datasource::file_sink_config::{FileSink, FileSinkConfig};
use datafusion_datasource::sink::{DataSink, DataSinkExec};
use datafusion_datasource::write::demux::DemuxedStreamReceiver;
use datafusion_datasource::write::{create_writer, get_writer_schema, SharedBuffer};
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryPool, MemoryReservation};
use datafusion_execution::{SendableRecordBatchStream, TaskContext};
use datafusion_expr::dml::InsertOp;
Expand Down Expand Up @@ -76,11 +78,13 @@ use parquet::arrow::arrow_writer::{
};
use parquet::arrow::async_reader::MetadataFetch;
use parquet::arrow::{parquet_to_arrow_schema, ArrowSchemaConverter, AsyncArrowWriter};
use parquet::basic::Type;
use parquet::errors::ParquetError;
use parquet::file::metadata::{ParquetMetaData, ParquetMetaDataReader, RowGroupMetaData};
use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
use parquet::file::writer::SerializedFileWriter;
use parquet::format::FileMetaData;
use parquet::schema::types::SchemaDescriptor;
use tokio::io::{AsyncWrite, AsyncWriteExt};
use tokio::sync::mpsc::{self, Receiver, Sender};

Expand Down Expand Up @@ -268,6 +272,15 @@ impl ParquetFormat {
self.options.global.binary_as_string = binary_as_string;
self
}

pub fn coerce_int96(&self) -> Option<String> {
self.options.global.coerce_int96.clone()
}

pub fn with_coerce_int96(mut self, time_unit: Option<String>) -> Self {
self.options.global.coerce_int96 = time_unit;
self
}
}

/// Clears all metadata (Schema level and field level) on an iterator
Expand Down Expand Up @@ -569,6 +582,46 @@ pub fn apply_file_schema_type_coercions(
))
}

/// Coerces the file schema's Timestamps to the provided TimeUnit if Parquet schema contains INT96.
pub fn coerce_int96_to_resolution(
parquet_schema: &SchemaDescriptor,
file_schema: &Schema,
time_unit: &TimeUnit,
) -> Option<Schema> {
let mut transform = false;
let parquet_fields: HashMap<_, _> = parquet_schema
.columns()
.iter()
.map(|f| {
let dt = f.physical_type();
if dt.eq(&Type::INT96) {
transform = true;
}
(f.name(), dt)
})
.collect();

if !transform {
return None;
}

let transformed_fields: Vec<Arc<Field>> = file_schema
.fields
.iter()
.map(|field| match parquet_fields.get(field.name().as_str()) {
Some(Type::INT96) => {
field_with_new_type(field, DataType::Timestamp(*time_unit, None))
}
_ => Arc::clone(field),
})
.collect();

Some(Schema::new_with_metadata(
transformed_fields,
file_schema.metadata.clone(),
))
}

/// Coerces the file schema if the table schema uses a view type.
#[deprecated(
since = "47.0.0",
Expand Down
Loading