Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-3884][CH][Parquet] support page pruning on first column, CH part #469

Draft
wants to merge 1 commit into
base: rebase_ch/20231122
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
url = https://github.com/ClickHouse/boost
[submodule "contrib/arrow"]
path = contrib/arrow
url = https://github.com/ClickHouse/arrow
url = https://github.com/binmahone/arrow
[submodule "contrib/thrift"]
path = contrib/thrift
url = https://github.com/apache/thrift
Expand Down
163 changes: 142 additions & 21 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <parquet/arrow/reader.h>
#include <parquet/arrow/schema.h>
#include <parquet/file_reader.h>
#include <parquet/page_index.h>
#include <parquet/statistics.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
Expand Down Expand Up @@ -235,7 +236,10 @@ static std::optional<Field> decodePlainParquetValueSlow(const std::string & data
/// Range of values for each column, based on statistics in the Parquet metadata.
/// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just
/// missing in the metadata.
static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
static std::vector<Range> getHyperrectangleFromStatistics(
const Block & header,
const FormatSettings & format_settings,
const std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> & name_to_statistics)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
Expand All @@ -244,23 +248,6 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return column_name;
};

std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);

std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
for (int i = 0; i < row_group->num_columns(); ++i)
{
auto c = row_group->ColumnChunk(i);
auto s = c->statistics();
if (!s)
continue;

auto path = c->path_in_schema()->ToDotVector();
if (path.size() != 1)
continue; // compound types not supported

name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
}

/// +-----+
/// / /|
/// +-----+ |
Expand Down Expand Up @@ -364,6 +351,81 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}


static std::vector<Range> getHyperrectangleForRowGroup(
const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
if (format_settings.parquet.case_insensitive_column_matching)
boost::to_lower(column_name);
return column_name;
};

std::unique_ptr<parquet::RowGroupMetaData> row_group = file.RowGroup(row_group_idx);

std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
for (int i = 0; i < row_group->num_columns(); ++i)
{
auto c = row_group->ColumnChunk(i);
auto s = c->statistics();
if (!s)
continue;

auto path = c->path_in_schema()->ToDotVector();
if (path.size() != 1)
continue; // compound types not supported

name_to_statistics.emplace(column_name_for_lookup(path[0]), s);
}

return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
}

static std::vector<Range> getHyperrectangleForPage(
const bool is_null_wanted,
const bool is_null_page,
const std::string & min_value,
const std::string & max_value,
const Block & header,
const FormatSettings & format_settings,
const parquet::ColumnDescriptor * descr)
{
auto column_name_for_lookup = [&](std::string column_name) -> std::string
{
if (format_settings.parquet.case_insensitive_column_matching)
boost::to_lower(column_name);
return column_name;
};

if (is_null_page)
{
std::vector ret(header.columns(), Range::createWholeUniverse());
ret.at(0) = Range(Null::Value::NegativeInfinity, true, Null::Value::NegativeInfinity, true);
return ret;
}

std::shared_ptr<parquet::Statistics> stats;
/// Page index does not contain enough statistics. E.g. we don't know whether a page contains NULL or not.
/// So we have to create a fake one.
if (is_null_wanted)
{
// if null is wanted, we have to assume that the page contains null
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 1, 1, true, true, true);
}
else
{
// if null is not wanted, we can assume this page does not contain null
// so that getHyperrectangleFromStatistics will return a much narrower range
stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 0, 1, true, true, true);
}
std::unordered_map<std::string, std::shared_ptr<parquet::Statistics>> name_to_statistics;
name_to_statistics.emplace(column_name_for_lookup(descr->name()), stats);

return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics);
}


ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand All @@ -388,6 +450,59 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
pool->wait();
}


// Apply page index of first column if first column is sorted. We only consider first column because:
// 1. when first column is sorted, it is likely that other columns are not sorted and lack selectivity
// 2. it's more complex to calcute page index for multiple columns with KeyCondition, which assumes column
// rows are aligned among pages, but unfortunatedly it's not.
void ParquetBlockInputFormat::applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group)
{
if (metadata->RowGroup(row_group)->ColumnChunk(0)->path_in_schema()->ToDotVector().size() != 1)
return; // compound types not supported

const auto pg_idx_reader = parquet_reader->GetPageIndexReader()->RowGroup(row_group);
if (pg_idx_reader == nullptr)
return;

const auto first_col_idx = pg_idx_reader->GetColumnIndex(0);
const auto first_col_offsets = pg_idx_reader->GetOffsetIndex(0);
if (first_col_idx != nullptr
&& (first_col_idx->boundary_order() == parquet::BoundaryOrder::Ascending
|| first_col_idx->boundary_order() == parquet::BoundaryOrder::Descending))
{
auto row_ranges = std::make_shared<parquet::RowRanges>();
auto null_pages = first_col_idx->null_pages();
const auto min_values = first_col_idx->encoded_min_values();
const auto max_values = first_col_idx->encoded_max_values();

std::vector<Range> probe_range(getPort().getHeader().columns(), Range::createWholeUniverse());
probe_range.at(0) = Range(NEGATIVE_INFINITY, true, NEGATIVE_INFINITY, true);
// probe_range limits the range of first column to be NULL only.
// If the probe result is true, it means where condition contains "first_col is NULL AND ...",
// it also means rows with first_col being null is wanted by where condition.
const bool null_wanted = key_condition->checkInHyperrectangle(probe_range, getPort().getHeader().getDataTypes()).can_be_true;

for (size_t i = 0; i < null_pages.size(); ++i)
{
auto page_hyperrectangle = getHyperrectangleForPage(
null_wanted,
null_pages.at(i),
min_values.at(i),
max_values.at(i),
getPort().getHeader(),
format_settings,
metadata->schema()->Column(0));
if (key_condition->checkInHyperrectangle(page_hyperrectangle, getPort().getHeader().getDataTypes()).can_be_true)
{
auto to = (i == null_pages.size() - 1) ? metadata->RowGroup(row_group)->num_rows() - 1
: first_col_offsets->page_locations().at(i + 1).first_row_index - 1;
row_ranges->add(parquet::Range(first_col_offsets->page_locations().at(i).first_row_index, to), false);
}
}
row_group_batches.back().row_ranges_map.insert(std::make_pair(row_group, row_ranges));
}
}

void ParquetBlockInputFormat::initializeIfNeeded()
{
if (std::exchange(is_initialized, true))
Expand All @@ -401,7 +516,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
const auto parquet_reader = parquet::ParquetFileReader::Open(arrow_file);
metadata = parquet_reader->metadata();

std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
Expand Down Expand Up @@ -433,6 +549,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().row_groups_idxs.push_back(row_group);
row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows();
row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size();

applyRowRangesFromPageIndex(parquet_reader, row_group);
}
}

Expand Down Expand Up @@ -486,8 +604,11 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader));

THROW_ARROW_NOT_OK(
row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader));
THROW_ARROW_NOT_OK(row_group_batch.file_reader->GetRecordBatchReader(
row_group_batch.row_groups_idxs,
column_indices,
std::make_shared<std::map<int, parquet::RowRangesPtr>>(std::move(row_group_batch.row_ranges_map)),
&row_group_batch.record_batch_reader));

row_group_batch.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
Expand Down
5 changes: 5 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
#include <Formats/FormatSettings.h>
#include <Storages/MergeTree/KeyCondition.h>

#include <parquet/column_reader.h>
#include <parquet/file_reader.h>

namespace parquet { class FileMetaData; }
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; class RecordBatchReader;}
Expand Down Expand Up @@ -72,6 +75,7 @@ class ParquetBlockInputFormat : public IInputFormat
is_stopped = 1;
}

void applyRowRangesFromPageIndex(const std::unique_ptr<parquet::ParquetFileReader> & parquet_reader, int row_group);
void initializeIfNeeded();
void initializeRowGroupBatchReader(size_t row_group_batch_idx);

Expand Down Expand Up @@ -208,6 +212,7 @@ class ParquetBlockInputFormat : public IInputFormat
size_t total_bytes_compressed = 0;

std::vector<int> row_groups_idxs;
std::map<int, parquet::RowRangesPtr> row_ranges_map;

// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
Expand Down