Skip to content

Commit

Permalink
feat(Parquet): Add Int64 Timestamp support in reader
Browse files Browse the repository at this point in the history
  • Loading branch information
zuyu committed Dec 19, 2024
1 parent 2af045d commit 06f1ed1
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 15 deletions.
3 changes: 3 additions & 0 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,9 @@ inline T IntDecoder<isSigned>::readInt() {
return readLittleEndianFromBigEndian<T>();
} else {
if constexpr (std::is_same_v<T, int128_t>) {
if (numBytes_ == 8) {
return readLongLE();
}
if (numBytes_ == 12) {
VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded.");
return readInt96();
Expand Down
13 changes: 13 additions & 0 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,10 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
auto numVeloxBytes = dictionary_.numValues * veloxTypeLength;
dictionary_.values =
AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
} else if (type_->type()->isTimestamp()) {
const auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t);
dictionary_.values =
AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
} else {
dictionary_.values = AlignedBuffer::allocate<char>(numBytes, &pool_);
}
Expand All @@ -374,6 +378,15 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
// We start from the end to allow in-place expansion.
values[i] = parquetValues[i];
}
} else if (type_->type()->isTimestamp()) {
VELOX_DCHECK_EQ(parquetType, thrift::Type::INT64);
auto values = dictionary_.values->asMutable<int128_t>();
auto parquetValues = dictionary_.values->asMutable<int64_t>();
for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
values[i] = parquetValues[i];
}
}
break;
}
Expand Down
20 changes: 17 additions & 3 deletions velox/dwio/parquet/reader/ParquetColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "velox/dwio/parquet/reader/StringColumnReader.h"
#include "velox/dwio/parquet/reader/StructColumnReader.h"
#include "velox/dwio/parquet/reader/TimestampColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {

Expand Down Expand Up @@ -75,9 +76,22 @@ std::unique_ptr<dwio::common::SelectiveColumnReader> ParquetColumnReader::build(
return std::make_unique<BooleanColumnReader>(
requestedType, fileType, params, scanSpec);

case TypeKind::TIMESTAMP:
return std::make_unique<TimestampColumnReader>(
requestedType, fileType, params, scanSpec);
case TypeKind::TIMESTAMP: {
const auto parquetType =
std::static_pointer_cast<const ParquetTypeWithId>(fileType)
->parquetType_;
VELOX_CHECK(parquetType);
switch (parquetType.value()) {
case thrift::Type::INT64:
return std::make_unique<TimestampColumnReader<int64_t>>(
requestedType, fileType, params, scanSpec);
case thrift::Type::INT96:
return std::make_unique<TimestampColumnReader<int128_t>>(
requestedType, fileType, params, scanSpec);
default:
VELOX_UNREACHABLE();
}
}

default:
VELOX_FAIL(
Expand Down
5 changes: 5 additions & 0 deletions velox/dwio/parquet/reader/ParquetReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,6 +813,11 @@ TypePtr ReaderBase::convertType(
case thrift::Type::type::INT32:
return INTEGER();
case thrift::Type::type::INT64:
// For Int64 Timestamp in nano precision
if (schemaElement.__isset.logicalType &&
schemaElement.logicalType.__isset.TIMESTAMP) {
return TIMESTAMP();
}
return BIGINT();
case thrift::Type::type::INT96:
return TIMESTAMP(); // INT96 only maps to a timestamp
Expand Down
92 changes: 80 additions & 12 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,46 +18,102 @@

#include "velox/dwio/parquet/reader/IntegerColumnReader.h"
#include "velox/dwio/parquet/reader/ParquetColumnReader.h"
#include "velox/dwio/parquet/thrift/ParquetThriftTypes.h"

namespace facebook::velox::parquet {
namespace {

Timestamp toInt64Timestamp(int64_t value, const thrift::TimeUnit& unit) {
if (unit.__isset.MILLIS) {
return Timestamp::fromMillis(value);
}
if (unit.__isset.MICROS) {
return Timestamp::fromMicros(value);
}
if (unit.__isset.NANOS) {
return Timestamp::fromNanos(value);
}
VELOX_UNREACHABLE();
}

Timestamp toInt96Timestamp(const int128_t& value) {
// Convert int128_t to Int96 Timestamp by extracting days and nanos.
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
return Timestamp::fromDaysAndNanos(days, nanos);
}

// Range filter for Parquet Int96 Timestamp.
class ParquetInt96TimestampRange final : public common::TimestampRange {
// Range filter for Parquet Timestamp.
template <typename T>
class ParquetTimestampRange final : public common::TimestampRange {
public:
// Use int128_t for Int96
static_assert(std::is_same_v<T, int64_t> || std::is_same_v<T, int128_t>);

// @param lower Lower end of the range, inclusive.
// @param upper Upper end of the range, inclusive.
// @param nullAllowed Null values are passing the filter if true.
ParquetInt96TimestampRange(
// @param timestampUnit Unit of the Int64 Timestamp.
ParquetTimestampRange(
const Timestamp& lower,
const Timestamp& upper,
bool nullAllowed)
: TimestampRange(lower, upper, nullAllowed) {}
bool nullAllowed,
const thrift::TimeUnit& timestampUnit)
: TimestampRange(lower, upper, nullAllowed),
timestampUnit_(timestampUnit) {}

bool testInt128(const int128_t& value) const final {
const auto ts = toInt96Timestamp(value);
Timestamp ts;
if constexpr (std::is_same_v<T, int64_t>) {
ts = toInt64Timestamp(value, timestampUnit_);
} else if constexpr (std::is_same_v<T, int128_t>) {
ts = toInt96Timestamp(value);
}
return ts >= this->lower() && ts <= this->upper();
}

private:
// Only used when T is int64_t.
const thrift::TimeUnit timestampUnit_;
};

} // namespace

template <typename T>
class TimestampColumnReader : public IntegerColumnReader {
public:
// Use int128_t for Int96
static_assert(std::is_same_v<T, int64_t> || std::is_same_v<T, int128_t>);

TimestampColumnReader(
const TypePtr& requestedType,
std::shared_ptr<const dwio::common::TypeWithId> fileType,
ParquetParams& params,
common::ScanSpec& scanSpec)
: IntegerColumnReader(requestedType, fileType, params, scanSpec),
timestampPrecision_(params.timestampPrecision()) {}
timestampPrecision_(params.timestampPrecision()) {
if constexpr (std::is_same_v<T, int64_t>) {
const auto logicalType =
std::static_pointer_cast<const ParquetTypeWithId>(fileType_)
->logicalType_;
VELOX_CHECK(logicalType);
VELOX_CHECK(logicalType->__isset.TIMESTAMP);
timestampUnit_ = logicalType->TIMESTAMP.unit;

if (timestampUnit_.__isset.MILLIS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kMilliseconds;
} else if (timestampUnit_.__isset.MICROS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kMicroseconds;
} else if (timestampUnit_.__isset.NANOS) {
needsConversion_ =
timestampPrecision_ != TimestampPrecision::kNanoseconds;
} else {
VELOX_UNREACHABLE();
}
}
}

bool hasBulkPath() const override {
return false;
Expand All @@ -79,7 +135,15 @@ class TimestampColumnReader : public IntegerColumnReader {
}

const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
rawValues[i] = toInt96Timestamp(encoded).toPrecision(timestampPrecision_);
if constexpr (std::is_same_v<T, int64_t>) {
rawValues[i] = toInt64Timestamp(encoded, timestampUnit_);
if (needsConversion_) {
rawValues[i] = rawValues[i].toPrecision(timestampPrecision_);
}
} else if constexpr (std::is_same_v<T, int128_t>) {
rawValues[i] =
toInt96Timestamp(encoded).toPrecision(timestampPrecision_);
}
}
}

Expand All @@ -93,14 +157,13 @@ class TimestampColumnReader : public IntegerColumnReader {
const RowSet& rows,
ExtractValues extractValues) {
if (auto* range = dynamic_cast<common::TimestampRange*>(filter)) {
// Convert TimestampRange to ParquetInt96TimestampRange.
ParquetInt96TimestampRange newRange = ParquetInt96TimestampRange(
range->lower(), range->upper(), range->nullAllowed());
ParquetTimestampRange<T> newRange{
range->lower(), range->upper(), range->nullAllowed(), timestampUnit_};
this->readWithVisitor(
rows,
dwio::common::ColumnVisitor<
int128_t,
ParquetInt96TimestampRange,
common::TimestampRange,
ExtractValues,
isDense>(newRange, this, rows, extractValues));
} else {
Expand Down Expand Up @@ -129,6 +192,11 @@ class TimestampColumnReader : public IntegerColumnReader {
// The requested precision can be specified from HiveConfig to read timestamp
// from Parquet.
const TimestampPrecision timestampPrecision_;

// Only set when T is int64_t.
thrift::TimeUnit timestampUnit_;
// Whether Int64 Timestamp needs to be converted to the requested precision.
bool needsConversion_ = false;
};

} // namespace facebook::velox::parquet
25 changes: 25 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,31 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampInt64Direct) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, timestampInt64Dictionary) {
options_.dataPageSize = 4 * 1024;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, timestampInt96Direct) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
Expand Down
16 changes: 16 additions & 0 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -827,6 +827,22 @@ TEST_F(ParquetTableScanTest, sessionTimezone) {
assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai");
}

TEST_F(ParquetTableScanTest, timestampInt64Dictionary) {
WriterOptions options;
options.writeInt96AsTimestamp = false;
options.enableDictionary = true;
options.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds;
testTimestampRead(options);
}

TEST_F(ParquetTableScanTest, timestampInt64Plain) {
WriterOptions options;
options.writeInt96AsTimestamp = false;
options.enableDictionary = false;
options.parquetWriteTimestampUnit = TimestampPrecision::kMicroseconds;
testTimestampRead(options);
}

TEST_F(ParquetTableScanTest, timestampInt96Dictionary) {
WriterOptions options;
options.writeInt96AsTimestamp = true;
Expand Down

0 comments on commit 06f1ed1

Please sign in to comment.