Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 70 additions & 61 deletions be/src/vec/exec/format/orc/vorc_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,9 @@ void OrcReader::_collect_profile_before_close() {
COUNTER_UPDATE(_orc_profile.set_fill_column_time, _statistics.set_fill_column_time);
COUNTER_UPDATE(_orc_profile.decode_value_time, _statistics.decode_value_time);
COUNTER_UPDATE(_orc_profile.decode_null_map_time, _statistics.decode_null_map_time);
COUNTER_UPDATE(_orc_profile.filter_block_time, _statistics.filter_block_time);
COUNTER_UPDATE(_orc_profile.predicate_filter_time, _statistics.predicate_filter_time);
COUNTER_UPDATE(_orc_profile.dict_filter_rewrite_time, _statistics.dict_filter_rewrite_time);
COUNTER_UPDATE(_orc_profile.lazy_read_filtered_rows, _statistics.lazy_read_filtered_rows);

if (_file_input_stream != nullptr) {
_file_input_stream->collect_profile_before_close();
Expand Down Expand Up @@ -260,8 +262,12 @@ void OrcReader::_init_profile() {
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeValueTime", orc_profile, 1);
_orc_profile.decode_null_map_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DecodeNullMapTime", orc_profile, 1);
_orc_profile.filter_block_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "FilterBlockTime", orc_profile, 1);
_orc_profile.predicate_filter_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "PredicateFilterTime", orc_profile, 1);
_orc_profile.dict_filter_rewrite_time =
ADD_CHILD_TIMER_WITH_LEVEL(_profile, "DictFilterRewriteTime", orc_profile, 1);
_orc_profile.lazy_read_filtered_rows =
ADD_COUNTER_WITH_LEVEL(_profile, "FilteredRowsByLazyRead", TUnit::UNIT, 1);
_orc_profile.selected_row_group_count =
ADD_COUNTER_WITH_LEVEL(_profile, "SelectedRowGroupCount", TUnit::UNIT, 1);
_orc_profile.evaluated_row_group_count =
Expand Down Expand Up @@ -2006,15 +2012,18 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
*read_rows = 0;
return Status::OK();
}
_execute_filter_position_delete_rowids(*_filter);
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_execute_filter_position_delete_rowids(*_filter);
{
SCOPED_RAW_TIMER(&_statistics.decode_null_map_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, *_filter));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
} else {
uint64_t rr;
SCOPED_RAW_TIMER(&_statistics.column_read_time);
Expand Down Expand Up @@ -2091,63 +2100,60 @@ Status OrcReader::get_next_block_impl(Block* block, size_t* read_rows, bool* eof
return Status::OK();
}

_build_delete_row_filter(block, _batch->numElements);

std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
{
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
_build_delete_row_filter(block, _batch->numElements);

std::vector<uint32_t> columns_to_filter;
int column_to_keep = block->columns();
columns_to_filter.resize(column_to_keep);
for (uint32_t i = 0; i < column_to_keep; ++i) {
columns_to_filter[i] = i;
}
if (!_lazy_read_ctx.conjuncts.empty()) {
VExprContextSPtrs filter_conjuncts;
filter_conjuncts.insert(filter_conjuncts.end(), _filter_conjuncts.begin(),
_filter_conjuncts.end());
for (auto& conjunct : _dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
Block::erase_useless_column(block, column_to_keep);
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter);
{
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
for (auto& conjunct : _non_dict_filter_conjuncts) {
filter_conjuncts.emplace_back(conjunct);
}
std::vector<IColumn::Filter*> filters;
if (_delete_rows_filter_ptr) {
filters.push_back(_delete_rows_filter_ptr.get());
}
IColumn::Filter result_filter(block->rows(), 1);
bool can_filter_all = false;
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(VExprContext::execute_conjuncts(
filter_conjuncts, &filters, block, &result_filter, &can_filter_all));
if (can_filter_all) {
for (auto& col : columns_to_filter) {
std::move(*block->get_by_position(col).column).assume_mutable()->clear();
}
Block::erase_useless_column(block, column_to_keep);
return _convert_dict_cols_to_string_cols(block, &batch_vec);
}
_execute_filter_position_delete_rowids(result_filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, result_filter));
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
} else {
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter,
(*_delete_rows_filter_ptr)));
Block::erase_useless_column(block, column_to_keep);
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
SCOPED_RAW_TIMER(&_statistics.filter_block_time);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
if (_delete_rows_filter_ptr) {
_execute_filter_position_delete_rowids(*_delete_rows_filter_ptr);
RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(
block, columns_to_filter, (*_delete_rows_filter_ptr)));
} else {
std::unique_ptr<IColumn::Filter> filter(new IColumn::Filter(block->rows(), 1));
_execute_filter_position_delete_rowids(*filter);
RETURN_IF_CATCH_EXCEPTION(
Block::filter_block_internal(block, columns_to_filter, (*filter)));
}
Block::erase_useless_column(block, column_to_keep);
}
Block::erase_useless_column(block, column_to_keep);
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
}
RETURN_IF_ERROR(_convert_dict_cols_to_string_cols(block, &batch_vec));
*read_rows = block->rows();
}
return Status::OK();
Expand Down Expand Up @@ -2189,6 +2195,7 @@ void OrcReader::_build_delete_row_filter(const Block* block, size_t rows) {
}

Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t size, void* arg) {
SCOPED_RAW_TIMER(&_statistics.predicate_filter_time);
auto* block = (Block*)arg;
size_t origin_column_num = block->columns();

Expand Down Expand Up @@ -2289,6 +2296,7 @@ Status OrcReader::filter(orc::ColumnVectorBatch& data, uint16_t* sel, uint16_t s
sel[new_size] = i;
new_size += result_filter_data[i] ? 1 : 0;
}
_statistics.lazy_read_filtered_rows += static_cast<int64_t>(size - new_size);
data.numElements = new_size;
return Status::OK();
}
Expand Down Expand Up @@ -2363,6 +2371,7 @@ bool OrcReader::_can_filter_by_dict(int slot_id) {
Status OrcReader::on_string_dicts_loaded(
std::unordered_map<std::string, orc::StringDictionary*>& file_column_name_to_dict_map,
bool* is_stripe_filtered) {
SCOPED_RAW_TIMER(&_statistics.dict_filter_rewrite_time);
*is_stripe_filtered = false;
for (auto it = _dict_filter_cols.begin(); it != _dict_filter_cols.end();) {
std::string& dict_filter_col_name = it->first;
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/exec/format/orc/vorc_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,9 @@ class OrcReader : public GenericReader {
int64_t set_fill_column_time = 0;
int64_t decode_value_time = 0;
int64_t decode_null_map_time = 0;
int64_t filter_block_time = 0;
int64_t predicate_filter_time = 0;
int64_t dict_filter_rewrite_time = 0;
int64_t lazy_read_filtered_rows = 0;
};

OrcReader(RuntimeProfile* profile, RuntimeState* state, const TFileScanRangeParams& params,
Expand Down Expand Up @@ -222,7 +224,9 @@ class OrcReader : public GenericReader {
RuntimeProfile::Counter* set_fill_column_time = nullptr;
RuntimeProfile::Counter* decode_value_time = nullptr;
RuntimeProfile::Counter* decode_null_map_time = nullptr;
RuntimeProfile::Counter* filter_block_time = nullptr;
RuntimeProfile::Counter* predicate_filter_time = nullptr;
RuntimeProfile::Counter* dict_filter_rewrite_time = nullptr;
RuntimeProfile::Counter* lazy_read_filtered_rows = nullptr;
RuntimeProfile::Counter* selected_row_group_count = nullptr;
RuntimeProfile::Counter* evaluated_row_group_count = nullptr;
};
Expand Down
Loading
Loading