Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

24.8.8 Backport of #70807 parquet page header v2 native reader #558

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
181 changes: 138 additions & 43 deletions src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPage()
{
// refer to: ColumnReaderImplBase::ReadNewPage in column_reader.cc
// this is where decompression happens
auto cur_page = parquet_page_reader->NextPage();
switch (cur_page->type())
{
Expand Down Expand Up @@ -409,46 +410,13 @@ void ParquetLeafColReader<TColumn>::readPage()
}

template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
void ParquetLeafColReader<TColumn>::initDataReader(
parquet::Encoding::type enconding_type,
const uint8_t * buffer,
std::size_t max_size,
std::unique_ptr<RleValuesReader> && def_level_reader)
{
static parquet::LevelDecoder repetition_level_decoder;

cur_page_values = page.num_values();

// refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc
if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding());
}
const auto * buffer = page.data();
auto max_size = page.size();

if (col_descriptor.max_repetition_level() > 0)
{
auto rep_levels_bytes = repetition_level_decoder.SetData(
page.repetition_level_encoding(), col_descriptor.max_repetition_level(), 0, buffer, max_size);
buffer += rep_levels_bytes;
max_size -= rep_levels_bytes;
}

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

switch (page.encoding())
switch (enconding_type)
{
case parquet::Encoding::PLAIN:
{
Expand Down Expand Up @@ -489,17 +457,144 @@ void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
case parquet::Encoding::DELTA_BINARY_PACKED:
case parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY:
case parquet::Encoding::DELTA_BYTE_ARRAY:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.encoding());
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", enconding_type);

default:
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", page.encoding());
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unknown encoding type: {}", enconding_type);
}
}

template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV1(const parquet::DataPageV1 & page)
{
cur_page_values = page.num_values();

// refer to: VectorizedColumnReader::readPageV1 in Spark and LevelDecoder::SetData in column_reader.cc
if (page.definition_level_encoding() != parquet::Encoding::RLE && col_descriptor.max_definition_level() != 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Unsupported encoding: {}", page.definition_level_encoding());
}

const auto * buffer = page.data();
auto max_size = static_cast<std::size_t>(page.size());

if (col_descriptor.max_repetition_level() > 0)
{
if (max_size < sizeof(int32_t))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);

if (num_bytes < 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?");
}

if (num_bytes + 4u > max_size)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

// not constructing level reader because we are not using it atm
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
}

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);

if (max_size < sizeof(int32_t))
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto num_bytes = ::arrow::util::SafeLoadAs<int32_t>(buffer);

if (num_bytes < 0)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Number of bytes for dl is negative, corrupt?");
}

if (num_bytes + 4u > max_size)
{
throw Exception(ErrorCodes::PARQUET_EXCEPTION, "Not enough bytes in parquet page buffer, corrupt?");
}

auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer + 4, num_bytes);
num_bytes += 4;
buffer += num_bytes;
max_size -= num_bytes;
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

initDataReader(page.encoding(), buffer, max_size, std::move(def_level_reader));
}

/*
* As far as I understand, the difference between page v1 and page v2 lies primarily on the below:
* 1. repetition and definition levels are not compressed;
* 2. size of repetition and definition levels is present in the header;
* 3. the encoding is always RLE
*
* Therefore, this method leverages the existing `parquet::LevelDecoder::SetDataV2` method to build the repetition level decoder.
* The data buffer is "offset-ed" by rl bytes length and then dl decoder is built using RLE decoder. Since dl bytes length was present in the header,
* there is no need to read it and apply an offset like in page v1.
* */
template <typename TColumn>
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & /*page*/)
void ParquetLeafColReader<TColumn>::readPageV2(const parquet::DataPageV2 & page)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "read page V2 is not implemented yet");
cur_page_values = page.num_values();

const auto * buffer = page.data();

if (page.repetition_levels_byte_length() < 0 || page.definition_levels_byte_length() < 0)
{
throw Exception(
ErrorCodes::PARQUET_EXCEPTION, "Either RL or DL is negative, this should not happen. Most likely corrupt file or parsing issue");
}

const int64_t total_levels_length =
static_cast<int64_t>(page.repetition_levels_byte_length()) +
page.definition_levels_byte_length();

if (total_levels_length > page.size())
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS, "Data page too small for levels (corrupt header?)");
}

// ARROW-17453: Even if max_rep_level_ is 0, there may still be
// repetition level bytes written and/or reported in the header by
// some writers (e.g. Athena)
buffer += page.repetition_levels_byte_length();

assert(col_descriptor.max_definition_level() >= 0);
std::unique_ptr<RleValuesReader> def_level_reader;
if (col_descriptor.max_definition_level() > 0)
{
auto bit_width = arrow::bit_util::Log2(col_descriptor.max_definition_level() + 1);
auto num_bytes = page.definition_levels_byte_length();
auto bit_reader = std::make_unique<arrow::bit_util::BitReader>(buffer, num_bytes);
def_level_reader = std::make_unique<RleValuesReader>(std::move(bit_reader), bit_width);
}
else
{
def_level_reader = std::make_unique<RleValuesReader>(page.num_values());
}

buffer += page.definition_levels_byte_length();

initDataReader(page.encoding(), buffer, page.size() - total_levels_length, std::move(def_level_reader));
}

template <typename TColumn>
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Formats/Impl/Parquet/ParquetLeafColReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ class ParquetLeafColReader : public ParquetColumnReader
void readPage();
void readPageV1(const parquet::DataPageV1 & page);
void readPageV2(const parquet::DataPageV2 & page);
void initDataReader(parquet::Encoding::type enconding_type,
const uint8_t * buffer,
std::size_t max_size,
std::unique_ptr<RleValuesReader> && def_level_reader);

std::unique_ptr<ParquetDataValuesReader> createDictReader(
std::unique_ptr<RleValuesReader> def_level_reader, std::unique_ptr<RleValuesReader> rle_data_reader);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
abc 2
abc 2
abc 3
abc 4
\N 5
abc 2
abc 2
abc 3
abc 4
\N 5
23 changes: 23 additions & 0 deletions tests/queries/0_stateless/03251_parquet_page_v2_native_reader.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/usr/bin/env bash
# Tags: no-ubsan, no-fasttest

CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh

USER_FILES_PATH=$($CLICKHOUSE_CLIENT_BINARY --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}')

WORKING_DIR="${USER_FILES_PATH}/${CLICKHOUSE_TEST_UNIQUE_NAME}"

mkdir -p "${WORKING_DIR}"

DATA_FILE="${CUR_DIR}/data_parquet/datapage_v2.snappy.parquet"

DATA_FILE_USER_PATH="${WORKING_DIR}/datapage_v2.snappy.parquet"

cp ${DATA_FILE} ${DATA_FILE_USER_PATH}

# Not reading all columns because some data types and encodings are not supported by native reader yet
# TODO read all columns once implemented
${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=false;"
${CLICKHOUSE_CLIENT} --query="select a, c from file('${DATA_FILE_USER_PATH}', Parquet) order by c SETTINGS input_format_parquet_use_native_reader=true;"
Loading