Skip to content

Commit

Permalink
[GLUTEN-3884][CH][Parquet] support page pruning on first column, CH part
Browse files Browse the repository at this point in the history
  • Loading branch information
binmahone committed Nov 29, 2023
1 parent b8ba0ae commit 4b3a04b
Show file tree
Hide file tree
Showing 4 changed files with 149 additions and 23 deletions.
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
url = https://github.com/ClickHouse/boost
[submodule "contrib/arrow"]
path = contrib/arrow
url = https://github.com/ClickHouse/arrow
url = https://github.com/binmahone/arrow
[submodule "contrib/thrift"]
path = contrib/thrift
url = https://github.com/apache/thrift
Expand Down
163 changes: 142 additions & 21 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <parquet/arrow/reader.h>
#include <parquet/arrow/schema.h>
#include <parquet/file_reader.h>
#include <parquet/page_index.h>
#include <parquet/statistics.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
Expand Down Expand Up @@ -235,7 +236,10 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
/// Range of values for each column, based on statistics in the Parquet metadata.
/// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just
/// missing in the metadata.
static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
static std::vector<Range> getHyperrectangleFromStatistics(
const Block & header,
const FormatSettings & format_settings,
const std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> & name_to_statistics)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
Expand All @@ -244,23 +248,6 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return column_name;
};

std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);

std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
for (int i = 0; i < row_group->num_columns(); ++i)
{
auto c = row_group->ColumnChunk(i);
auto s = c->statistics();
if (!s)
continue;

auto path = c->path_in_schema()->ToDotVector();
if (path.size() != 1)
continue; // compound types not supported

name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
}

/// +-----+
/// / /|
/// +-----+ |
Expand Down Expand Up @@ -364,6 +351,81 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}


static std::vector<Range> getHyperrectangleForRowGroup(
const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
if (format_settings.parquet.case_insensitive_column_matching)
boost::to_lower(column_name);
return column_name;
};

std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);

std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
for (int i = 0; i < row_group->num_columns(); ++i)
{
auto c = row_group->ColumnChunk(i);
auto s = c->statistics();
if (!s)
continue;

auto path = c->path_in_schema()->ToDotVector();
if (path.size() != 1)
continue; // compound types not supported

name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
}

return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
}

static std::vector<Range> getHyperrectangleForPage(
const bool is_null_wanted,
const bool is_null_page,
const std::string & min_value,
const std::string & max_value,
const Block & header,
const FormatSettings & format_settings,
const parquet::ColumnDescriptor * descr)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
if (format_settings.parquet.case_insensitive_column_matching)
boost::to_lower(column_name);
return column_name;
};

if (is_null_page)
{
std::vector ret(header.columns(), Range::createWholeUniverse());
ret.at(0) = Range(Null::Value::NegativeInfinity, true, Null::Value::NegativeInfinity, true);
return ret;
}

std::shared_ptr<parquet::Statistics> stats;
/// Page index does not contain enough statistics. E.g. we don't know whether a page contains NULL or not.
/// So we have to create a fake one.
if (is_null_wanted)
{
// if null is wanted, we have to assume that the page contains null
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 1, 1, true, true, true);
}
else
{
// if null is not wanted, we can assume this page does not contain null
// so that getHyperrectangleFromStatistics will return a much narrower range
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 0, 1, true, true, true);
}
std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
name_to_statistics.emplace(column_name_for_lookup(descr->name()), stats);

return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
}


ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand All @@ -388,6 +450,59 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
pool->wait();
}


// Apply page index of first column if first column is sorted. We only consider first column because:
// 1. when first column is sorted, it is likely that other columns are not sorted and lack selectivity
// 2. it's more complex to calcute page index for multiple columns with KeyCondition, which assumes column
// rows are aligned among pages, but unfortunatedly it's not.
void ParquetBlockInputFormat::applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group)
{
if (metadata->RowGroup(row_group)->ColumnChunk(0)->path_in_schema()->ToDotVector().size() != 1)
return; // compound types not supported

const auto pg_idx_reader = parquet_reader->GetPageIndexReader()->RowGroup(row_group);
if (pg_idx_reader == nullptr)
return;

const auto first_col_idx = pg_idx_reader->GetColumnIndex(0);
const auto first_col_offsets = pg_idx_reader->GetOffsetIndex(0);
if (first_col_idx != nullptr
&& (first_col_idx->boundary_order() == parquet::BoundaryOrder::Ascending
|| first_col_idx->boundary_order() == parquet::BoundaryOrder::Descending))
{
auto row_ranges = std::make_shared<parquet::RowRanges>();
auto null_pages = first_col_idx->null_pages();
const auto min_values = first_col_idx->encoded_min_values();
const auto max_values = first_col_idx->encoded_max_values();

std::vector<Range> probe_range(getPort().getHeader().columns(), Range::createWholeUniverse());
probe_range.at(0) = Range(Null::Value::NegativeInfinity, true, Null::Value::NegativeInfinity, true);
// probe_range limits the range of first column to be NULL only.
// If the probe result is true, it means where condition contains "first_col is NULL AND ...",
// it also means rows with first_col being null is wanted by where condition.
const bool null_wanted = key_condition->checkInHyperrectangle(probe_range, getPort().getHeader().getDataTypes()).can_be_true;

for (size_t i = 0; i < null_pages.size(); ++i)
{
auto page_hyperrectangle = getHyperrectangleForPage(
null_wanted,
null_pages.at(i),
min_values.at(i),
max_values.at(i),
getPort().getHeader(),
format_settings,
metadata->schema()->Column(0));
if (key_condition->checkInHyperrectangle(page_hyperrectangle, getPort().getHeader().getDataTypes()).can_be_true)
{
auto to = (i == null_pages.size() - 1) ? metadata->RowGroup(row_group)->num_rows() - 1
: first_col_offsets->page_locations().at(i + 1).first_row_index - 1;
row_ranges->add(parquet::Range(first_col_offsets->page_locations().at(i).first_row_index, to), false);
}
}
row_group_batches.back().row_ranges_map.insert(std::make_pair(row_group, row_ranges));
}
}

void ParquetBlockInputFormat::initializeIfNeeded()
{
if (std::exchange(is_initialized, true))
Expand All @@ -401,7 +516,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
const auto parquet_reader = parquet::ParquetFileReader::Open(arrow_file);
metadata = parquet_reader->metadata();

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
Expand Down Expand Up @@ -433,6 +549,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();

applyRowRangesFromPageIndex(parquet_reader, row_group);
}
}

Expand Down Expand Up @@ -486,8 +604,11 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader));

THROW_ARROW_NOT_OK(
row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader));
THROW_ARROW_NOT_OK(row_group_batch.file_reader->GetRecordBatchReader(
row_group_batch.row_groups_idxs,
column_indices,
std::make_shared<std::map<int, parquet::RowRangesPtr>>(std::move(row_group_batch.row_ranges_map)),
&row_group_batch.record_batch_reader));

row_group_batch.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <Formats/FormatSettings.h>
#include <Storages/MergeTree/KeyCondition.h>

#include <parquet/column_reader.h>
#include <parquet/file_reader.h>

namespace parquet { class FileMetaData; }
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; class RecordBatchReader;}
Expand Down Expand Up @@ -72,6 +75,7 @@ class ParquetBlockInputFormat : public IInputFormat
is_stopped = 1;
}

void applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group);
void initializeIfNeeded();
void initializeRowGroupBatchReader(size_t row_group_batch_idx);

Expand Down Expand Up @@ -208,6 +212,7 @@ class ParquetBlockInputFormat : public IInputFormat
size_t total_bytes_compressed = 0;

std::vector<int> row_groups_idxs;
std::map<int, parquet::RowRangesPtr> row_ranges_map;

// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
Expand Down

0 comments on commit 4b3a04b

Please sign in to comment.