diff --git a/.gitmodules b/.gitmodules index 1a464ee1170f..29bc27aebe9c 100644 --- a/.gitmodules +++ b/.gitmodules @@ -45,7 +45,7 @@ url = https://github.com/ClickHouse/boost [submodule "contrib/arrow"] path = contrib/arrow - url = https://github.com/ClickHouse/arrow + url = https://github.com/binmahone/arrow [submodule "contrib/thrift"] path = contrib/thrift url = https://github.com/apache/thrift diff --git a/contrib/arrow b/contrib/arrow index ba5c67934e82..4733a012bff9 160000 --- a/contrib/arrow +++ b/contrib/arrow @@ -1 +1 @@ -Subproject commit ba5c67934e8274d649befcffab56731632dc5253 +Subproject commit 4733a012bff9257475c4e6c9745f2c89888de9e2 diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index d37c2dc11604..f98a5e0d6f31 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -14,6 +14,7 @@ #include #include #include +#include #include #include "ArrowBufferedStreams.h" #include "ArrowColumnToCHColumn.h" @@ -235,7 +236,10 @@ static std::optional decodePlainParquetValueSlow(const std::string & data /// Range of values for each column, based on statistics in the Parquet metadata. /// This is lower/upper bounds, not necessarily exact min and max, e.g. the min/max can be just /// missing in the metadata. -static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings) +static std::vector getHyperrectangleFromStatistics( + const Block & header, + const FormatSettings & format_settings, + const std::unordered_map> & name_to_statistics) { auto column_name_for_lookup = [&](std::string column_name) -> std::string { @@ -244,23 +248,6 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return column_name; }; - std::unique_ptr row_group = file.RowGroup(row_group_idx); - - std::unordered_map> name_to_statistics; - for (int i = 0; i < row_group->num_columns(); ++i) - { - auto c = row_group->ColumnChunk(i); - auto s = c->statistics(); - if (!s) - continue; - - auto path = c->path_in_schema()->ToDotVector(); - if (path.size() != 1) - continue; // compound types not supported - - name_to_statistics.emplace(column_name_for_lookup(path[0]), s); - } - /// +-----+ /// / /| /// +-----+ | @@ -364,6 +351,81 @@ static std::vector getHyperrectangleForRowGroup(const parquet::FileMetaDa return hyperrectangle; } + +static std::vector getHyperrectangleForRowGroup( + const parquet::FileMetaData & file, int row_group_idx, const Block & header, const FormatSettings & format_settings) +{ + auto column_name_for_lookup = [&](std::string column_name) -> std::string + { + if (format_settings.parquet.case_insensitive_column_matching) + boost::to_lower(column_name); + return column_name; + }; + + std::unique_ptr row_group = file.RowGroup(row_group_idx); + + std::unordered_map> name_to_statistics; + for (int i = 0; i < row_group->num_columns(); ++i) + { + auto c = row_group->ColumnChunk(i); + auto s = c->statistics(); + if (!s) + continue; + + auto path = c->path_in_schema()->ToDotVector(); + if (path.size() != 1) + continue; // compound types not supported + + name_to_statistics.emplace(column_name_for_lookup(path[0]), s); + } + + return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics); +} + +static std::vector getHyperrectangleForPage( + const bool is_null_wanted, + const bool is_null_page, + const std::string & min_value, + const std::string & max_value, + const Block & header, + const FormatSettings & format_settings, + const parquet::ColumnDescriptor * descr) +{ + auto column_name_for_lookup = [&](std::string column_name) -> std::string + { + if (format_settings.parquet.case_insensitive_column_matching) + boost::to_lower(column_name); + return column_name; + }; + + if (is_null_page) + { + std::vector ret(header.columns(), Range::createWholeUniverse()); + ret.at(0) = Range(Null::Value::NegativeInfinity, true, Null::Value::NegativeInfinity, true); + return ret; + } + + std::shared_ptr stats; + /// Page index does not contain enough statistics. E.g. we don't know whether a page contains NULL or not. + /// So we have to create a fake one. + if (is_null_wanted) + { + // if null is wanted, we have to assume that the page contains null + stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 1, 1, true, true, true); + } + else + { + // if null is not wanted, we can assume this page does not contain null + // so that getHyperrectangleFromStatistics will return a much narrower range + stats = parquet::Statistics::Make(descr, min_value, max_value, 1, 0, 1, true, true, true); + } + std::unordered_map> name_to_statistics; + name_to_statistics.emplace(column_name_for_lookup(descr->name()), stats); + + return getHyperrectangleFromStatistics(header, format_settings, name_to_statistics); +} + + ParquetBlockInputFormat::ParquetBlockInputFormat( ReadBuffer & buf, const Block & header_, @@ -388,6 +450,59 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat() pool->wait(); } + +// Apply page index of first column if first column is sorted. We only consider first column because: +// 1. when first column is sorted, it is likely that other columns are not sorted and lack selectivity +// 2. it's more complex to calcute page index for multiple columns with KeyCondition, which assumes column +// rows are aligned among pages, but unfortunatedly it's not. +void ParquetBlockInputFormat::applyRowRangesFromPageIndex(const std::unique_ptr & parquet_reader, int row_group) +{ + if (metadata->RowGroup(row_group)->ColumnChunk(0)->path_in_schema()->ToDotVector().size() != 1) + return; // compound types not supported + + const auto pg_idx_reader = parquet_reader->GetPageIndexReader()->RowGroup(row_group); + if (pg_idx_reader == nullptr) + return; + + const auto first_col_idx = pg_idx_reader->GetColumnIndex(0); + const auto first_col_offsets = pg_idx_reader->GetOffsetIndex(0); + if (first_col_idx != nullptr + && (first_col_idx->boundary_order() == parquet::BoundaryOrder::Ascending + || first_col_idx->boundary_order() == parquet::BoundaryOrder::Descending)) + { + auto row_ranges = std::make_shared(); + auto null_pages = first_col_idx->null_pages(); + const auto min_values = first_col_idx->encoded_min_values(); + const auto max_values = first_col_idx->encoded_max_values(); + + std::vector probe_range(getPort().getHeader().columns(), Range::createWholeUniverse()); + probe_range.at(0) = Range(NEGATIVE_INFINITY, true, NEGATIVE_INFINITY, true); + // probe_range limits the range of first column to be NULL only. + // If the probe result is true, it means where condition contains "first_col is NULL AND ...", + // it also means rows with first_col being null is wanted by where condition. + const bool null_wanted = key_condition->checkInHyperrectangle(probe_range, getPort().getHeader().getDataTypes()).can_be_true; + + for (size_t i = 0; i < null_pages.size(); ++i) + { + auto page_hyperrectangle = getHyperrectangleForPage( + null_wanted, + null_pages.at(i), + min_values.at(i), + max_values.at(i), + getPort().getHeader(), + format_settings, + metadata->schema()->Column(0)); + if (key_condition->checkInHyperrectangle(page_hyperrectangle, getPort().getHeader().getDataTypes()).can_be_true) + { + auto to = (i == null_pages.size() - 1) ? metadata->RowGroup(row_group)->num_rows() - 1 + : first_col_offsets->page_locations().at(i + 1).first_row_index - 1; + row_ranges->add(parquet::Range(first_col_offsets->page_locations().at(i).first_row_index, to), false); + } + } + row_group_batches.back().row_ranges_map.insert(std::make_pair(row_group, row_ranges)); + } +} + void ParquetBlockInputFormat::initializeIfNeeded() { if (std::exchange(is_initialized, true)) @@ -401,7 +516,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() if (is_stopped) return; - metadata = parquet::ReadMetaData(arrow_file); + const auto parquet_reader = parquet::ParquetFileReader::Open(arrow_file); + metadata = parquet_reader->metadata(); std::shared_ptr schema; THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema)); @@ -433,6 +549,8 @@ void ParquetBlockInputFormat::initializeIfNeeded() row_group_batches.back().row_groups_idxs.push_back(row_group); row_group_batches.back().total_rows += metadata->RowGroup(row_group)->num_rows(); row_group_batches.back().total_bytes_compressed += metadata->RowGroup(row_group)->total_compressed_size(); + + applyRowRangesFromPageIndex(parquet_reader, row_group); } } @@ -486,8 +604,11 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat // TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators. THROW_ARROW_NOT_OK(builder.Build(&row_group_batch.file_reader)); - THROW_ARROW_NOT_OK( - row_group_batch.file_reader->GetRecordBatchReader(row_group_batch.row_groups_idxs, column_indices, &row_group_batch.record_batch_reader)); + THROW_ARROW_NOT_OK(row_group_batch.file_reader->GetRecordBatchReader( + row_group_batch.row_groups_idxs, + column_indices, + std::make_shared>(std::move(row_group_batch.row_ranges_map)), + &row_group_batch.record_batch_reader)); row_group_batch.arrow_column_to_ch_column = std::make_unique( getPort().getHeader(), diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 7fdf03a0606e..b12577d900e8 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -7,6 +7,9 @@ #include #include +#include +#include + namespace parquet { class FileMetaData; } namespace parquet::arrow { class FileReader; } namespace arrow { class Buffer; class RecordBatchReader;} @@ -72,6 +75,7 @@ class ParquetBlockInputFormat : public IInputFormat is_stopped = 1; } + void applyRowRangesFromPageIndex(const std::unique_ptr & parquet_reader, int row_group); void initializeIfNeeded(); void initializeRowGroupBatchReader(size_t row_group_batch_idx); @@ -208,6 +212,7 @@ class ParquetBlockInputFormat : public IInputFormat size_t total_bytes_compressed = 0; std::vector row_groups_idxs; + std::map row_ranges_map; // These are only used by the decoding thread, so don't require locking the mutex. std::unique_ptr file_reader;