From fdfc09bebd0bdb7ea811eaa80f879289a759b468 Mon Sep 17 00:00:00 2001 From: rui-mo Date: Sat, 12 Oct 2024 15:49:43 +0800 Subject: [PATCH] Add ParquetInt96TimestampRange --- .../parquet/reader/TimestampColumnReader.h | 144 +++++++++++++++++- velox/type/Filter.h | 14 +- 2 files changed, 148 insertions(+), 10 deletions(-) diff --git a/velox/dwio/parquet/reader/TimestampColumnReader.h b/velox/dwio/parquet/reader/TimestampColumnReader.h index 040b03e39964c..f686917ec5979 100644 --- a/velox/dwio/parquet/reader/TimestampColumnReader.h +++ b/velox/dwio/parquet/reader/TimestampColumnReader.h @@ -20,6 +20,31 @@ #include "velox/dwio/parquet/reader/ParquetColumnReader.h" namespace facebook::velox::parquet { +namespace { + +// Range filter for Parquet Int96 Timestamp. +class ParquetInt96TimestampRange : public common::TimestampRange { + public: + // @param lower Lower end of the range, inclusive. + // @param upper Upper end of the range, inclusive. + // @param nullAllowed Null values are passing the filter if true. + ParquetInt96TimestampRange( + const Timestamp& lower, + const Timestamp& upper, + bool nullAllowed) + : TimestampRange(lower, upper, nullAllowed) {} + + // Int96 is read as int128_t value and converted to Timestamp by extracting + // days and nanos. + bool testInt128(int128_t value) const override { + const int32_t days = static_cast(value >> 64); + const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); + const auto ts = Timestamp::fromDaysAndNanos(days, nanos); + return ts >= this->lower() && ts <= this->upper(); + } +}; + +} // namespace class TimestampColumnReader : public IntegerColumnReader { public: @@ -71,6 +96,123 @@ class TimestampColumnReader : public IntegerColumnReader { } } + template < + typename Reader, + typename TFilter, + bool isDense, + typename ExtractValues> + void readHelperTimestamp( + velox::common::Filter* filter, + const RowSet& rows, + ExtractValues extractValues) { + if constexpr (std::is_same_v) { + // Convert TimestampRange to ParquetInt96TimestampRange. + auto* range = reinterpret_cast(filter); + ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange( + range->lower(), range->upper(), range->nullAllowed()); + reinterpret_cast(this)->Reader::readWithVisitor( + rows, + dwio::common::ColumnVisitor< + int128_t, + ParquetInt96TimestampRange, + ExtractValues, + isDense>(newRange, this, rows, extractValues)); + } else { + reinterpret_cast(this)->Reader::readWithVisitor( + rows, + dwio::common:: + ColumnVisitor( + *reinterpret_cast(filter), + this, + rows, + extractValues)); + } + return; + } + + template < + typename Reader, + bool isDense, + bool kEncodingHasNulls, + typename ExtractValues> + void processTimestampFilter( + velox::common::Filter* filter, + ExtractValues extractValues, + const RowSet& rows) { + if (filter == nullptr) { + readHelperTimestamp( + &dwio::common::alwaysTrue(), rows, extractValues); + return; + } + + switch (filter->kind()) { + case velox::common::FilterKind::kAlwaysTrue: + readHelperTimestamp( + filter, rows, extractValues); + break; + case velox::common::FilterKind::kIsNull: + if constexpr (kEncodingHasNulls) { + filterNulls( + rows, + true, + !std:: + is_same_v); + } else { + readHelperTimestamp( + filter, rows, extractValues); + } + break; + case velox::common::FilterKind::kIsNotNull: + if constexpr ( + kEncodingHasNulls && + std::is_same_v) { + filterNulls(rows, false, false); + } else { + readHelperTimestamp( + filter, rows, extractValues); + } + break; + case velox::common::FilterKind::kTimestampRange: + readHelperTimestamp( + filter, rows, extractValues); + break; + default: + VELOX_UNREACHABLE(); + } + } + + template + void readTimestamp(const RowSet& rows) { + const bool isDense = rows.back() == rows.size() - 1; + velox::common::Filter* filter = + scanSpec_->filter() ? scanSpec_->filter() : &dwio::common::alwaysTrue(); + if (scanSpec_->keepValues()) { + if (scanSpec_->valueHook()) { + if (isDense) { + processValueHook(rows, scanSpec_->valueHook()); + } else { + processValueHook(rows, scanSpec_->valueHook()); + } + } else { + if (isDense) { + processTimestampFilter( + filter, dwio::common::ExtractToReader(this), rows); + } else { + processTimestampFilter( + filter, dwio::common::ExtractToReader(this), rows); + } + } + } else { + if (isDense) { + processTimestampFilter( + filter, dwio::common::DropValues(), rows); + } else { + processTimestampFilter( + filter, dwio::common::DropValues(), rows); + } + } + } + void read( int64_t offset, const RowSet& rows, @@ -78,7 +220,7 @@ class TimestampColumnReader : public IntegerColumnReader { auto& data = formatData_->as(); // Use int128_t as a workaround. Timestamp in Velox is of 16-byte length. prepareRead(offset, rows, nullptr); - readCommon(rows); + readTimestamp(rows); readOffset_ += rows.back() + 1; } diff --git a/velox/type/Filter.h b/velox/type/Filter.h index 89b95a31ec230..d1fc73e00b9c1 100644 --- a/velox/type/Filter.h +++ b/velox/type/Filter.h @@ -1788,7 +1788,7 @@ class NegatedBytesRange final : public Filter { /// Open ranges can be implemented by using the value to the left /// or right of the end of the range, e.g. a < timestamp '2023-07-19 /// 17:00:00.777' is equivalent to a <= timestamp '2023-07-19 17:00:00.776'. -class TimestampRange final : public Filter { +class TimestampRange : public Filter { public: /// @param lower Lower end of the range, inclusive. /// @param upper Upper end of the range, inclusive. @@ -1824,14 +1824,6 @@ class TimestampRange final : public Filter { nullAllowed_ ? "with nulls" : "no nulls"); } - bool testInt128(int128_t value) const final { - // Convert int128_t to Timestamp by extracting days and nanos. - const int32_t days = static_cast(value >> 64); - const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1); - const auto ts = Timestamp::fromDaysAndNanos(days, nanos); - return ts >= lower_ && ts <= upper_; - } - bool testTimestamp(Timestamp value) const override { return value >= lower_ && value <= upper_; } @@ -1859,6 +1851,10 @@ class TimestampRange final : public Filter { return upper_; } + const bool nullAllowed() const { + return nullAllowed_; + } + bool testingEquals(const Filter& other) const final; private: