Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions be/src/olap/block_column_predicate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,38 @@ void OrBlockColumnPredicate::evaluate_and(vectorized::MutableColumns& block, uin
}
}

bool OrBlockColumnPredicate::evaluate_and(
vectorized::ParquetPredicate::CachedPageIndexStat* statistic, RowRanges* row_ranges) const {
if (num_of_column_predicate() >= 1) {
_block_column_predicate_vec[0]->evaluate_and(statistic, row_ranges);
for (int i = 1; i < num_of_column_predicate(); ++i) {
RowRanges tmp_row_ranges;
_block_column_predicate_vec[i]->evaluate_and(statistic, &tmp_row_ranges);
RowRanges::ranges_union(*row_ranges, tmp_row_ranges, row_ranges);
}
}
return row_ranges->count() != 0;
}

bool AndBlockColumnPredicate::evaluate_and(
vectorized::ParquetPredicate::CachedPageIndexStat* statistic, RowRanges* row_ranges) const {
if (num_of_column_predicate() >= 1) {
for (int i = 0; i < num_of_column_predicate(); ++i) {
RowRanges tmp_row_ranges;
if (!_block_column_predicate_vec[i]->evaluate_and(statistic, &tmp_row_ranges)) {
return false;
}

if (i == 0) {
*row_ranges = tmp_row_ranges;
} else {
RowRanges::ranges_intersection(*row_ranges, tmp_row_ranges, row_ranges);
}
}
}
return true;
}

uint16_t AndBlockColumnPredicate::evaluate(vectorized::MutableColumns& block, uint16_t* sel,
uint16_t selected_size) const {
for (auto& block_column_predicate : _block_column_predicate_vec) {
Expand Down
23 changes: 23 additions & 0 deletions be/src/olap/block_column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ class BlockColumnPredicate {
throw Exception(Status::FatalError("should not reach here"));
}

/**
* For Parquet page indexes, since the number of rows filtered by each column's page index is not the same,
* a `RowRanges` is needed to represent the range of rows to be read after filtering. If no rows need to
* be read, it returns false; otherwise, it returns true. Because the page index needs to be
* parsed, `CachedPageIndexStat` is used to avoid repeatedly parsing the page index information
* of the same column.
*/
virtual bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const {
throw Exception(Status::FatalError("should not reach here"));
}

virtual bool evaluate_and(const segment_v2::BloomFilter* bf) const {
throw Exception(Status::FatalError("should not reach here"));
}
Expand Down Expand Up @@ -125,6 +137,11 @@ class SingleColumnBlockPredicate : public BlockColumnPredicate {
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
return _predicate->evaluate_and(statistic);
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
return _predicate->evaluate_and(statistic, row_ranges);
}
bool evaluate_and(const segment_v2::BloomFilter* bf) const override;
bool evaluate_and(const StringRef* dict_words, const size_t dict_num) const override;
void evaluate_or(vectorized::MutableColumns& block, uint16_t* sel, uint16_t selected_size,
Expand Down Expand Up @@ -201,6 +218,9 @@ class OrBlockColumnPredicate : public MutilColumnBlockPredicate {
}
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override;

// note(wb) we didnt't implement evaluate_vec method here, because storage layer only support AND predicate now;
};

Expand Down Expand Up @@ -232,6 +252,9 @@ class AndBlockColumnPredicate : public MutilColumnBlockPredicate {
return true;
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override;

bool can_do_bloom_filter(bool ngram) const override {
for (auto& pred : _block_column_predicate_vec) {
if (!pred->can_do_bloom_filter(ngram)) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/olap/column_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,13 @@ class ColumnPredicate {
return true;
}

virtual bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const {
throw Exception(ErrorCode::INTERNAL_ERROR,
"ParquetPredicate is not supported by this predicate!");
return true;
}

// used to evaluate pre read column in lazy materialization
// now only support integer/float
// a vectorized eval way
Expand Down
61 changes: 49 additions & 12 deletions be/src/olap/comparison_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,18 +150,8 @@ class ComparisonPredicateBase : public ColumnPredicate {
* 3. LT|LE: if `_value` is greater than min, return true to further compute each value in this page.
* 4. GT|GE: if `_value` is less than max, return true to further compute each value in this page.
*/
bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
}
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::get_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) {
return true;
};

bool camp_field(const vectorized::Field& min_field, const vectorized::Field& max_field) const {
T min_value;
T max_value;
if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
Expand Down Expand Up @@ -189,6 +179,53 @@ class ComparisonPredicateBase : public ColumnPredicate {
}
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
return true;
};

return camp_field(min_field, max_field);
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
return true;
}

for (int page_id = 0; page_id < stat->num_of_pages; page_id++) {
if (stat->is_all_null[page_id]) {
// all null page, not need read.
continue;
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
stat->col_schema, stat->encoded_min_value[page_id],
stat->encoded_max_value[page_id], *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
row_ranges->add(stat->ranges[page_id]);
continue;
};

if (camp_field(min_field, max_field)) {
row_ranges->add(stat->ranges[page_id]);
}
};
return row_ranges->count() > 0;
}

bool is_always_true(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
if (statistic.first->is_null() || statistic.second->is_null()) {
return false;
Expand Down
60 changes: 48 additions & 12 deletions be/src/olap/in_list_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,7 @@ class InListPredicateBase : public ColumnPredicate {
}
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
}
vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::get_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) {
return true;
};
bool camp_field(const vectorized::Field& min_field, const vectorized::Field& max_field) const {
T min_value;
T max_value;
if constexpr (is_int_or_bool(Type) || is_float_or_double(Type)) {
Expand All @@ -282,6 +271,53 @@ class InListPredicateBase : public ColumnPredicate {
}
}

bool evaluate_and(vectorized::ParquetPredicate::ColumnStat* statistic) const override {
if (!(*statistic->get_stat_func)(statistic, column_id())) {
return true;
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
statistic->col_schema, statistic->encoded_min_value,
statistic->encoded_max_value, *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
return true;
};

return camp_field(min_field, max_field);
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
return true;
}

for (int page_id = 0; page_id < stat->num_of_pages; page_id++) {
if (stat->is_all_null[page_id]) {
// all null page, not need read.
continue;
}

vectorized::Field min_field;
vectorized::Field max_field;
if (!vectorized::ParquetPredicate::parse_min_max_value(
stat->col_schema, stat->encoded_min_value[page_id],
stat->encoded_max_value[page_id], *statistic->ctz, &min_field, &max_field)
.ok()) [[unlikely]] {
row_ranges->add(stat->ranges[page_id]);
continue;
};

if (camp_field(min_field, max_field)) {
row_ranges->add(stat->ranges[page_id]);
}
};
return row_ranges->count() > 0;
}

bool evaluate_and(const StringRef* dict_words, const size_t count) const override {
for (size_t i = 0; i != count; ++i) {
const auto found = _values->find(dict_words[i].data, dict_words[i].size) ^ _opposite;
Expand Down
15 changes: 15 additions & 0 deletions be/src/olap/null_predicate.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "olap/rowset/segment_v2/bloom_filter.h"
#include "olap/schema.h"
#include "olap/wrapper_field.h"
#include "vec/exec/format/parquet/parquet_pred_cmp.h"

namespace roaring {
class Roaring;
Expand Down Expand Up @@ -77,6 +78,20 @@ class NullPredicate : public ColumnPredicate {
}
}

bool evaluate_and(vectorized::ParquetPredicate::CachedPageIndexStat* statistic,
RowRanges* row_ranges) const override {
vectorized::ParquetPredicate::PageIndexStat* stat = nullptr;
if (!(statistic->get_stat_func)(&stat, column_id())) {
return true;
}
for (int page_id = 0; page_id < stat->num_of_pages; page_id++) {
if (_is_null || !stat->is_all_null[page_id]) {
row_ranges->add(stat->ranges[page_id]);
}
};
return row_ranges->count() > 0;
}

bool evaluate_del(const std::pair<WrapperField*, WrapperField*>& statistic) const override {
// evaluate_del only use for delete condition to filter page, need use delete condition origin value,
// when opposite==true, origin value 'is null'->'is not null' and 'is not null'->'is null',
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -658,8 +658,8 @@ Status PushBrokerReader::_get_next_reader() {
_io_ctx.get(), _runtime_state.get());

init_status = parquet_reader->init_reader(
_all_col_names, &_col_name_to_block_idx, _colname_to_value_range, _push_down_exprs,
_real_tuple_desc, _default_val_row_desc.get(), _col_name_to_slot_id,
_all_col_names, &_col_name_to_block_idx, _push_down_exprs, _real_tuple_desc,
_default_val_row_desc.get(), _col_name_to_slot_id,
&_not_single_slot_filter_conjuncts, &_slot_id_to_filter_conjuncts,
vectorized::TableSchemaChangeHelper::ConstNode::get_instance(), false);
_cur_reader = std::move(parquet_reader);
Expand Down
1 change: 0 additions & 1 deletion be/src/olap/push_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ class PushBrokerReader {
// col names from _slot_descs
std::vector<std::string> _all_col_names;
std::unordered_map<std::string, uint32_t> _col_name_to_block_idx;
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
vectorized::VExprContextSPtrs _push_down_exprs;
const std::unordered_map<std::string, int>* _col_name_to_slot_id;
// single slot filter conjuncts
Expand Down
8 changes: 0 additions & 8 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,6 @@ Status OrcReader::_create_file_reader() {
Status OrcReader::init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid, const TupleDescriptor* tuple_descriptor,
const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand All @@ -367,7 +366,6 @@ Status OrcReader::init_reader(
const std::set<uint64_t>& column_ids, const std::set<uint64_t>& filter_column_ids) {
_table_column_names = column_names;
_col_name_to_block_idx = col_name_to_block_idx;
_colname_to_value_range = colname_to_value_range;
_lazy_read_ctx.conjuncts = conjuncts;
_is_acid = is_acid;
_tuple_descriptor = tuple_descriptor;
Expand Down Expand Up @@ -1418,12 +1416,6 @@ Status OrcReader::_fill_row_id_columns(Block* block) {
return Status::OK();
}

void OrcReader::_init_bloom_filter(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range) {
// generate bloom filter
// _reader->getBloomFilters()
}

void OrcReader::_init_system_properties() {
if (_scan_range.__isset.file_type) {
// for compatibility
Expand Down
4 changes: 0 additions & 4 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ class OrcReader : public GenericReader {
Status init_reader(
const std::vector<std::string>* column_names,
std::unordered_map<std::string, uint32_t>* col_name_to_block_idx,
const std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range,
const VExprContextSPtrs& conjuncts, bool is_acid,
const TupleDescriptor* tuple_descriptor, const RowDescriptor* row_descriptor,
const VExprContextSPtrs* not_single_slot_filter_conjuncts,
Expand Down Expand Up @@ -341,8 +340,6 @@ class OrcReader : public GenericReader {
Status _fill_missing_columns(
Block* block, uint64_t rows,
const std::unordered_map<std::string, VExprContextSPtr>& missing_columns);
void _init_bloom_filter(
std::unordered_map<std::string, ColumnValueRangeType>* colname_to_value_range);
void _init_system_properties();
void _init_file_description();

Expand Down Expand Up @@ -684,7 +681,6 @@ class OrcReader : public GenericReader {
std::vector<DecimalScaleParams> _decimal_scale_params;
size_t _decimal_scale_params_index;

const std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range = nullptr;
bool _is_acid = false;
std::unique_ptr<IColumn::Filter> _filter;
LazyReadContext _lazy_read_ctx;
Expand Down
18 changes: 3 additions & 15 deletions be/src/vec/exec/format/parquet/parquet_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,15 @@
#include <unordered_set>
#include <vector>

#include "olap/rowset/segment_v2/row_ranges.h"
#include "vec/columns/column_nullable.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
using level_t = int16_t;

struct RowRange {
RowRange() = default;
RowRange(int64_t first, int64_t last) : first_row(first), last_row(last) {}

int64_t first_row;
int64_t last_row;

bool operator<(const RowRange& range) const { return first_row < range.first_row; }

std::string debug_string() const {
std::stringstream ss;
ss << "[" << first_row << "," << last_row << ")";
return ss.str();
}
};
using segment_v2::RowRange;
using segment_v2::RowRanges;

#pragma pack(1)
struct ParquetInt96 {
Expand Down
Loading
Loading