Skip to content

Commit

Permalink
Add support to read plain encoded INT96 timestamp from Parquet file
Browse files Browse the repository at this point in the history
  • Loading branch information
mskapilks authored and rui-mo committed Oct 14, 2024
1 parent b00751e commit d6ef013
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 80 deletions.
17 changes: 17 additions & 0 deletions velox/dwio/common/IntDecoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,9 @@ class IntDecoder {
template <typename T>
T readInt();

// Reads Int96 timestamp composed of days and nanos as int128_t.
int128_t readInt96();

template <typename T>
T readVInt();

Expand Down Expand Up @@ -438,12 +441,26 @@ inline T IntDecoder<isSigned>::readInt() {
return readLittleEndianFromBigEndian<T>();
} else {
if constexpr (std::is_same_v<T, int128_t>) {
if (numBytes_ == 12) {
VELOX_DCHECK(!useVInts_, "Int96 should not be VInt encoded.");
return readInt96();
}
VELOX_NYI();
}
return readLongLE();
}
}

template <bool isSigned>
inline int128_t IntDecoder<isSigned>::readInt96() {
int128_t result = 0;
for (int i = 0; i < 12; ++i) {
auto ch = readByte();
result |= static_cast<uint128_t>(ch & BASE_256_MASK) << (i * 8);
}
return result;
}

template <bool isSigned>
template <typename T>
inline T IntDecoder<isSigned>::readVInt() {
Expand Down
19 changes: 6 additions & 13 deletions velox/dwio/parquet/reader/PageReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,7 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
break;
}
case thrift::Type::INT96: {
auto numVeloxBytes = dictionary_.numValues * sizeof(Timestamp);
auto numVeloxBytes = dictionary_.numValues * sizeof(int128_t);
dictionary_.values = AlignedBuffer::allocate<char>(numVeloxBytes, &pool_);
auto numBytes = dictionary_.numValues * sizeof(Int96Timestamp);
if (pageData_) {
Expand All @@ -392,23 +392,16 @@ void PageReader::prepareDictionary(const PageHeader& pageHeader) {
}
// Expand the Parquet type length values to Velox type length.
// We start from the end to allow in-place expansion.
auto values = dictionary_.values->asMutable<Timestamp>();
auto values = dictionary_.values->asMutable<int128_t>();
auto parquetValues = dictionary_.values->asMutable<char>();

for (auto i = dictionary_.numValues - 1; i >= 0; --i) {
// Convert the timestamp into seconds and nanos since the Unix epoch,
// 00:00:00.000000 on 1 January 1970.
int64_t nanos;
int128_t result = 0;
memcpy(
&nanos,
&result,
parquetValues + i * sizeof(Int96Timestamp),
sizeof(int64_t));
int32_t days;
memcpy(
&days,
parquetValues + i * sizeof(Int96Timestamp) + sizeof(int64_t),
sizeof(int32_t));
values[i] = Timestamp::fromDaysAndNanos(days, nanos);
sizeof(Int96Timestamp));
values[i] = result;
}
break;
}
Expand Down
12 changes: 9 additions & 3 deletions velox/dwio/parquet/reader/TimestampColumnReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,14 @@ class TimestampColumnReader : public IntegerColumnReader {
if (resultVector->isNullAt(i)) {
continue;
}
const auto timestamp = rawValues[i];
uint64_t nanos = timestamp.getNanos();

// Convert int128_t to Timestamp by extracting days and nanos.
const int128_t encoded = reinterpret_cast<int128_t&>(rawValues[i]);
const int32_t days = static_cast<int32_t>(encoded >> 64);
uint64_t nanos = encoded & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto timestamp = Timestamp::fromDaysAndNanos(days, nanos);

nanos = timestamp.getNanos();
switch (timestampPrecision_) {
case TimestampPrecision::kMilliseconds:
nanos = nanos / 1'000'000 * 1'000'000;
Expand All @@ -70,7 +76,7 @@ class TimestampColumnReader : public IntegerColumnReader {
const RowSet& rows,
const uint64_t* /*incomingNulls*/) override {
auto& data = formatData_->as<ParquetData>();
// Use int128_t as a workaroud. Timestamp in Velox is of 16-byte length.
// Use int128_t as a workaround. Timestamp in Velox is of 16-byte length.
prepareRead<int128_t>(offset, rows, nullptr);
readCommon<IntegerColumnReader, true>(rows);
readOffset_ += rows.back() + 1;
Expand Down
Binary file not shown.
14 changes: 14 additions & 0 deletions velox/dwio/parquet/tests/reader/E2EFilterTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,6 +256,20 @@ TEST_F(E2EFilterTest, integerDictionary) {
20);
}

TEST_F(E2EFilterTest, timestampDirect) {
options_.enableDictionary = false;
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;

testWithTypes(
"timestamp_val_0:timestamp,"
"timestamp_val_1:timestamp",
[&]() {},
true,
{"timestamp_val_0", "timestamp_val_1"},
20);
}

TEST_F(E2EFilterTest, timestampDictionary) {
options_.dataPageSize = 4 * 1024;
options_.writeInt96AsTimestamp = true;
Expand Down
134 changes: 71 additions & 63 deletions velox/dwio/parquet/tests/reader/ParquetTableScanTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,72 @@ class ParquetTableScanTest : public HiveConnectorTestBase {
writer->close();
}

void testInt96TimestampRead(const std::string& fileName) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433187296, 0),
Timestamp(1433273696, 0),
Timestamp(981171246, 0),
Timestamp(888739266, 0),
Timestamp(1671767761, 0),
Timestamp(317521387, 0),
Timestamp(944660366, 0),
Timestamp(1682068174, 0),
Timestamp(968798189, 0),
Timestamp(1197433676, 0)});

loadData(
getExampleFilePath(fileName),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp");
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'");
}

private:
RowTypePtr getRowType(std::vector<std::string>&& outputColumnNames) const {
std::vector<TypePtr> types;
Expand Down Expand Up @@ -753,70 +819,12 @@ TEST_F(ParquetTableScanTest, sessionTimezone) {
assertSelectWithTimezone({"a"}, "SELECT a FROM tmp", "Asia/Shanghai");
}

TEST_F(ParquetTableScanTest, timestampFilter) {
// Timestamp-int96.parquet holds one column (t: TIMESTAMP) and
// 10 rows in one row group. Data is in SNAPPY compressed format.
// The values are:
// |t |
// +-------------------+
// |2015-06-01 19:34:56|
// |2015-06-02 19:34:56|
// |2001-02-03 03:34:06|
// |1998-03-01 08:01:06|
// |2022-12-23 03:56:01|
// |1980-01-24 00:23:07|
// |1999-12-08 13:39:26|
// |2023-04-21 09:09:34|
// |2000-09-12 22:36:29|
// |2007-12-12 04:27:56|
// +-------------------+
auto vector = makeFlatVector<Timestamp>(
{Timestamp(1433187296, 0),
Timestamp(1433273696, 0),
Timestamp(981171246, 0),
Timestamp(888739266, 0),
Timestamp(1671767761, 0),
Timestamp(317521387, 0),
Timestamp(944660366, 0),
Timestamp(1682068174, 0),
Timestamp(968798189, 0),
Timestamp(1197433676, 0)});

loadData(
getExampleFilePath("timestamp_int96.parquet"),
ROW({"t"}, {TIMESTAMP()}),
makeRowVector(
{"t"},
{
vector,
}));
TEST_F(ParquetTableScanTest, timestampInt96Dictionary) {
testInt96TimestampRead("timestamp_int96_dictionary.parquet");
}

assertSelectWithFilter({"t"}, {}, "", "SELECT t from tmp");
assertSelectWithFilter(
{"t"},
{},
"t < TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t < TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t <= TIMESTAMP '2000-09-12 22:36:29'",
"SELECT t from tmp where t <= TIMESTAMP '2000-09-12 22:36:29'");
assertSelectWithFilter(
{"t"},
{},
"t > TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t > TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t >= TIMESTAMP '1980-01-24 00:23:07'",
"SELECT t from tmp where t >= TIMESTAMP '1980-01-24 00:23:07'");
assertSelectWithFilter(
{"t"},
{},
"t == TIMESTAMP '2022-12-23 03:56:01'",
"SELECT t from tmp where t == TIMESTAMP '2022-12-23 03:56:01'");
TEST_F(ParquetTableScanTest, timestampInt96Plain) {
testInt96TimestampRead("timestamp_int96_plain.parquet");
}

TEST_F(ParquetTableScanTest, timestampPrecisionMicrosecond) {
Expand Down
5 changes: 4 additions & 1 deletion velox/type/Filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -1825,7 +1825,10 @@ class TimestampRange final : public Filter {
}

bool testInt128(int128_t value) const final {
const auto& ts = reinterpret_cast<const Timestamp&>(value);
// Convert int128_t to Timestamp by extracting days and nanos.
const int32_t days = static_cast<int32_t>(value >> 64);
const uint64_t nanos = value & ((((1ULL << 63) - 1ULL) << 1) + 1);
const auto ts = Timestamp::fromDaysAndNanos(days, nanos);
return ts >= lower_ && ts <= upper_;
}

Expand Down

0 comments on commit d6ef013

Please sign in to comment.