diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc index 677458ce37eb8..6c82b8dee7892 100644 --- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc +++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc @@ -558,6 +558,35 @@ void ReadSingleColumnFileStatistics(std::unique_ptr file_reader, ASSERT_OK(StatisticsAsScalars(*statistics, min, max)); } +void DownsampleInt96RoundTrip(std::shared_ptr arrow_vector_in, + std::shared_ptr arrow_vector_out, + ::arrow::TimeUnit::type unit) { + // Create single input table of NS to be written to parquet with INT96 + auto input_schema = + ::arrow::schema({::arrow::field("f", ::arrow::timestamp(TimeUnit::NANO))}); + auto input = Table::Make(input_schema, {arrow_vector_in}); + + // Create an expected schema for each resulting table (one for each "downsampled" ts) + auto ex_schema = ::arrow::schema({::arrow::field("f", ::arrow::timestamp(unit))}); + auto ex_result = Table::Make(ex_schema, {arrow_vector_out}); + + std::shared_ptr result; + + ArrowReaderProperties arrow_reader_prop; + arrow_reader_prop.set_coerce_int96_timestamp_unit(unit); + + ASSERT_NO_FATAL_FAILURE(DoRoundtrip( + input, input->num_rows(), &result, default_writer_properties(), + ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build(), + arrow_reader_prop)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertSchemaEqual(*ex_result->schema(), + *result->schema(), + /*check_metadata=*/false)); + + ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); +} + // Non-template base class for TestParquetIO, to avoid code duplication class ParquetIOTestBase : public ::testing::Test { public: @@ -1671,6 +1700,33 @@ TEST(TestArrowReadWrite, UseDeprecatedInt96) { ASSERT_NO_FATAL_FAILURE(::arrow::AssertTablesEqual(*ex_result, *result)); } +TEST(TestArrowReadWrite, DownsampleDeprecatedInt96) { + using ::arrow::ArrayFromJSON; + using ::arrow::field; + using ::arrow::schema; + + // Timestamp values at 2000-01-01 00:00:00, + // then with increment unit of 1ns, 1us, 1ms and 1s. + auto a_nano = + ArrayFromJSON(timestamp(TimeUnit::NANO), + "[946684800000000000, 946684800000000001, 946684800000001000, " + "946684800001000000, 946684801000000000]"); + auto a_micro = ArrayFromJSON(timestamp(TimeUnit::MICRO), + "[946684800000000, 946684800000000, 946684800000001, " + "946684800001000, 946684801000000]"); + auto a_milli = ArrayFromJSON( + timestamp(TimeUnit::MILLI), + "[946684800000, 946684800000, 946684800000, 946684800001, 946684801000]"); + auto a_second = + ArrayFromJSON(timestamp(TimeUnit::SECOND), + "[946684800, 946684800, 946684800, 946684800, 946684801]"); + + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_nano, TimeUnit::NANO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_micro, TimeUnit::MICRO)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_milli, TimeUnit::MILLI)); + ASSERT_NO_FATAL_FAILURE(DownsampleInt96RoundTrip(a_nano, a_second, TimeUnit::SECOND)); +} + TEST(TestArrowReadWrite, CoerceTimestamps) { using ::arrow::ArrayFromVector; using ::arrow::field; diff --git a/cpp/src/parquet/arrow/reader_internal.cc b/cpp/src/parquet/arrow/reader_internal.cc index 1410a5f89e2de..0ffa3e8997007 100644 --- a/cpp/src/parquet/arrow/reader_internal.cc +++ b/cpp/src/parquet/arrow/reader_internal.cc @@ -353,7 +353,8 @@ Status TransferBool(RecordReader* reader, MemoryPool* pool, Datum* out) { } Status TransferInt96(RecordReader* reader, MemoryPool* pool, - const std::shared_ptr& type, Datum* out) { + const std::shared_ptr& type, Datum* out, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { int64_t length = reader->values_written(); auto values = reinterpret_cast(reader->values()); ARROW_ASSIGN_OR_RAISE(auto data, @@ -365,7 +366,20 @@ Status TransferInt96(RecordReader* reader, MemoryPool* pool, // isn't representable as a 64-bit Unix timestamp. *data_ptr++ = 0; } else { - *data_ptr++ = Int96GetNanoSeconds(values[i]); + switch (int96_arrow_time_unit) { + case ::arrow::TimeUnit::NANO: + *data_ptr++ = Int96GetNanoSeconds(values[i]); + break; + case ::arrow::TimeUnit::MICRO: + *data_ptr++ = Int96GetMicroSeconds(values[i]); + break; + case ::arrow::TimeUnit::MILLI: + *data_ptr++ = Int96GetMilliSeconds(values[i]); + break; + case ::arrow::TimeUnit::SECOND: + *data_ptr++ = Int96GetSeconds(values[i]); + break; + } } } *out = std::make_shared(type, length, std::move(data), @@ -742,20 +756,19 @@ Status TransferColumnData(RecordReader* reader, std::shared_ptr value_ case ::arrow::Type::TIMESTAMP: { const ::arrow::TimestampType& timestamp_type = checked_cast<::arrow::TimestampType&>(*value_type); - switch (timestamp_type.unit()) { - case ::arrow::TimeUnit::MILLI: - case ::arrow::TimeUnit::MICRO: { - result = TransferZeroCopy(reader, value_type); - } break; - case ::arrow::TimeUnit::NANO: { - if (descr->physical_type() == ::parquet::Type::INT96) { - RETURN_NOT_OK(TransferInt96(reader, pool, value_type, &result)); - } else { + if (descr->physical_type() == ::parquet::Type::INT96) { + RETURN_NOT_OK( + TransferInt96(reader, pool, value_type, &result, timestamp_type.unit())); + } else { + switch (timestamp_type.unit()) { + case ::arrow::TimeUnit::MILLI: + case ::arrow::TimeUnit::MICRO: + case ::arrow::TimeUnit::NANO: result = TransferZeroCopy(reader, value_type); - } - } break; - default: - return Status::NotImplemented("TimeUnit not supported"); + break; + default: + return Status::NotImplemented("TimeUnit not supported"); + } } } break; default: diff --git a/cpp/src/parquet/arrow/schema.cc b/cpp/src/parquet/arrow/schema.cc index 7610ce176058c..eb7fd628dfc94 100644 --- a/cpp/src/parquet/arrow/schema.cc +++ b/cpp/src/parquet/arrow/schema.cc @@ -454,7 +454,9 @@ bool IsDictionaryReadSupported(const ArrowType& type) { ::arrow::Result> GetTypeForNode( int column_index, const schema::PrimitiveNode& primitive_node, SchemaTreeContext* ctx) { - ASSIGN_OR_RAISE(std::shared_ptr storage_type, GetArrowType(primitive_node)); + ASSIGN_OR_RAISE( + std::shared_ptr storage_type, + GetArrowType(primitive_node, ctx->properties.coerce_int96_timestamp_unit())); if (ctx->properties.read_dictionary(column_index) && IsDictionaryReadSupported(*storage_type)) { return ::arrow::dictionary(::arrow::int32(), storage_type); diff --git a/cpp/src/parquet/arrow/schema_internal.cc b/cpp/src/parquet/arrow/schema_internal.cc index fbdfa09a04099..064bf4f55cc7e 100644 --- a/cpp/src/parquet/arrow/schema_internal.cc +++ b/cpp/src/parquet/arrow/schema_internal.cc @@ -179,9 +179,9 @@ Result> FromInt64(const LogicalType& logical_type) { } } -Result> GetArrowType(Type::type physical_type, - const LogicalType& logical_type, - int type_length) { +Result> GetArrowType( + Type::type physical_type, const LogicalType& logical_type, int type_length, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { if (logical_type.is_invalid() || logical_type.is_null()) { return ::arrow::null(); } @@ -194,7 +194,7 @@ Result> GetArrowType(Type::type physical_type, case ParquetType::INT64: return FromInt64(logical_type); case ParquetType::INT96: - return ::arrow::timestamp(::arrow::TimeUnit::NANO); + return ::arrow::timestamp(int96_arrow_time_unit); case ParquetType::FLOAT: return ::arrow::float32(); case ParquetType::DOUBLE: @@ -211,14 +211,11 @@ Result> GetArrowType(Type::type physical_type, } } -Result> GetArrowType(const schema::PrimitiveNode& primitive) { +Result> GetArrowType( + const schema::PrimitiveNode& primitive, + const ::arrow::TimeUnit::type int96_arrow_time_unit) { return GetArrowType(primitive.physical_type(), *primitive.logical_type(), - primitive.type_length()); -} - -Result> GetArrowType(const ColumnDescriptor& descriptor) { - return GetArrowType(descriptor.physical_type(), *descriptor.logical_type(), - descriptor.type_length()); + primitive.type_length(), int96_arrow_time_unit); } } // namespace arrow diff --git a/cpp/src/parquet/arrow/schema_internal.h b/cpp/src/parquet/arrow/schema_internal.h index ec0d9571304a9..fb837c3ee6cab 100644 --- a/cpp/src/parquet/arrow/schema_internal.h +++ b/cpp/src/parquet/arrow/schema_internal.h @@ -40,9 +40,12 @@ Result> GetArrowType(Type::type physical_type int type_length); Result> GetArrowType( - const schema::PrimitiveNode& primitive); + Type::type physical_type, const LogicalType& logical_type, int type_length, + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); + Result> GetArrowType( - const ColumnDescriptor& descriptor); + const schema::PrimitiveNode& primitive, + ::arrow::TimeUnit::type int96_arrow_time_unit = ::arrow::TimeUnit::NANO); } // namespace arrow } // namespace parquet diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h index 5018fff9531f5..d217b8efa5294 100644 --- a/cpp/src/parquet/properties.h +++ b/cpp/src/parquet/properties.h @@ -575,7 +575,8 @@ class PARQUET_EXPORT ArrowReaderProperties { read_dict_indices_(), batch_size_(kArrowDefaultBatchSize), pre_buffer_(false), - cache_options_(::arrow::io::CacheOptions::Defaults()) {} + cache_options_(::arrow::io::CacheOptions::Defaults()), + coerce_int96_timestamp_unit_(::arrow::TimeUnit::NANO) {} void set_use_threads(bool use_threads) { use_threads_ = use_threads; } @@ -620,6 +621,16 @@ class PARQUET_EXPORT ArrowReaderProperties { const ::arrow::io::IOContext& io_context() const { return io_context_; } + /// Set timestamp unit to use for deprecated INT96-encoded timestamps + /// (default is NANO). + void set_coerce_int96_timestamp_unit(::arrow::TimeUnit::type unit) { + coerce_int96_timestamp_unit_ = unit; + } + + ::arrow::TimeUnit::type coerce_int96_timestamp_unit() const { + return coerce_int96_timestamp_unit_; + } + private: bool use_threads_; std::unordered_set read_dict_indices_; @@ -627,6 +638,7 @@ class PARQUET_EXPORT ArrowReaderProperties { bool pre_buffer_; ::arrow::io::IOContext io_context_; ::arrow::io::CacheOptions cache_options_; + ::arrow::TimeUnit::type coerce_int96_timestamp_unit_; }; /// EXPERIMENTAL: Constructs the default ArrowReaderProperties diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h index 4529dbe613325..6bd67f1ee5fec 100644 --- a/cpp/src/parquet/types.h +++ b/cpp/src/parquet/types.h @@ -591,15 +591,46 @@ static inline void Int96SetNanoSeconds(parquet::Int96& i96, int64_t nanoseconds) std::memcpy(&i96.value, &nanoseconds, sizeof(nanoseconds)); } -static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { +struct DecodedInt96 { + uint64_t days_since_epoch; + uint64_t nanoseconds; +}; + +static inline DecodedInt96 DecodeInt96Timestamp(const parquet::Int96& i96) { // We do the computations in the unsigned domain to avoid unsigned behaviour // on overflow. - uint64_t days_since_epoch = - i96.value[2] - static_cast(kJulianToUnixEpochDays); - uint64_t nanoseconds = 0; + DecodedInt96 result; + result.days_since_epoch = i96.value[2] - static_cast(kJulianToUnixEpochDays); + result.nanoseconds = 0; + + memcpy(&result.nanoseconds, &i96.value, sizeof(uint64_t)); + return result; +} + +static inline int64_t Int96GetNanoSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + return static_cast(decoded.days_since_epoch * kNanosecondsPerDay + + decoded.nanoseconds); +} + +static inline int64_t Int96GetMicroSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t microseconds = decoded.nanoseconds / static_cast(1000); + return static_cast(decoded.days_since_epoch * kMicrosecondsPerDay + + microseconds); +} + +static inline int64_t Int96GetMilliSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t milliseconds = decoded.nanoseconds / static_cast(1000000); + return static_cast(decoded.days_since_epoch * kMillisecondsPerDay + + milliseconds); +} - memcpy(&nanoseconds, &i96.value, sizeof(uint64_t)); - return static_cast(days_since_epoch * kNanosecondsPerDay + nanoseconds); +static inline int64_t Int96GetSeconds(const parquet::Int96& i96) { + const auto decoded = DecodeInt96Timestamp(i96); + uint64_t seconds = decoded.nanoseconds / static_cast(1000000000); + return static_cast(decoded.days_since_epoch * kSecondsPerDay + seconds); } static inline std::string Int96ToString(const Int96& a) {