Skip to content

Commit

Permalink
Merge branch 'master' into fix_paimon_props
Browse files Browse the repository at this point in the history
  • Loading branch information
wsjz authored Jul 16, 2024
2 parents 43b4dfa + a23cf3a commit 8cf1bab
Show file tree
Hide file tree
Showing 123 changed files with 4,260 additions and 1,864 deletions.
35 changes: 28 additions & 7 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -642,9 +642,13 @@ Status Compaction::do_inverted_index_compaction() {
// format: rowsetId_segmentId
std::vector<std::unique_ptr<InvertedIndexFileWriter>> inverted_index_file_writers(
dest_segment_num);
for (int i = 0; i < dest_segment_num; ++i) {

// Some columns have already been indexed
// key: seg_id, value: inverted index file size
std::unordered_map<int, int64_t> compacted_idx_file_size;
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
std::string index_path_prefix {
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(i))};
InvertedIndexDescriptor::get_index_file_path_prefix(ctx.segment_path(seg_id))};
auto inverted_index_file_reader = std::make_unique<InvertedIndexFileReader>(
ctx.fs(), index_path_prefix,
_cur_tablet_schema->get_inverted_index_storage_format());
Expand All @@ -654,16 +658,31 @@ Status Compaction::do_inverted_index_compaction() {
if (st.ok()) {
auto index_not_need_to_compact =
DORIS_TRY(inverted_index_file_reader->get_all_directories());
// V1: each index is a separate file
// V2: all indexes are in a single file
if (_cur_tablet_schema->get_inverted_index_storage_format() !=
doris::InvertedIndexStorageFormatPB::V1) {
int64_t fsize = 0;
st = ctx.fs()->file_size(
InvertedIndexDescriptor::get_index_file_path_v2(index_path_prefix), &fsize);
if (!st.ok()) {
LOG(ERROR) << "file size error in index compaction, error:" << st.msg();
return st;
}
compacted_idx_file_size[seg_id] = fsize;
}
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
RETURN_IF_ERROR(inverted_index_file_writer->initialize(index_not_need_to_compact));
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
} else if (st.is<ErrorCode::INVERTED_INDEX_FILE_NOT_FOUND>()) {
auto inverted_index_file_writer = std::make_unique<InvertedIndexFileWriter>(
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), i,
ctx.fs(), index_path_prefix, ctx.rowset_id.to_string(), seg_id,
_cur_tablet_schema->get_inverted_index_storage_format());
inverted_index_file_writers[i] = std::move(inverted_index_file_writer);
inverted_index_file_writers[seg_id] = std::move(inverted_index_file_writer);
// no index file
compacted_idx_file_size[seg_id] = 0;
} else {
LOG(ERROR) << "inverted_index_file_reader init failed in index compaction, error:"
<< st;
Expand Down Expand Up @@ -744,11 +763,13 @@ Status Compaction::do_inverted_index_compaction() {
}

uint64_t inverted_index_file_size = 0;
for (auto& inverted_index_file_writer : inverted_index_file_writers) {
for (int seg_id = 0; seg_id < dest_segment_num; ++seg_id) {
auto inverted_index_file_writer = inverted_index_file_writers[seg_id].get();
if (Status st = inverted_index_file_writer->close(); !st.ok()) {
status = Status::Error<INVERTED_INDEX_COMPACTION_ERROR>(st.msg());
} else {
inverted_index_file_size += inverted_index_file_writer->get_index_file_size();
inverted_index_file_size -= compacted_idx_file_size[seg_id];
}
}
// check index compaction status. If status is not ok, we should return error and end this compaction round.
Expand Down
21 changes: 13 additions & 8 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ bool SegmentIterator::_is_literal_node(const TExprNodeType::type& node_type) {
case TExprNodeType::DECIMAL_LITERAL:
case TExprNodeType::STRING_LITERAL:
case TExprNodeType::DATE_LITERAL:
case TExprNodeType::NULL_LITERAL:
return true;
default:
return false;
Expand Down Expand Up @@ -1925,7 +1926,8 @@ Status SegmentIterator::_read_columns(const std::vector<ColumnId>& column_ids,
}

Status SegmentIterator::_init_current_block(
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns) {
vectorized::Block* block, std::vector<vectorized::MutableColumnPtr>& current_columns,
uint32_t nrows_read_limit) {
block->clear_column_data(_schema->num_column_ids());

for (size_t i = 0; i < _schema->num_column_ids(); i++) {
Expand All @@ -1945,7 +1947,7 @@ Status SegmentIterator::_init_current_block(
column_desc->path() == nullptr ? "" : column_desc->path()->get_path());
// TODO reuse
current_columns[cid] = file_column_type->create_column();
current_columns[cid]->reserve(_opts.block_row_max);
current_columns[cid]->reserve(nrows_read_limit);
} else {
// the column in block must clear() here to insert new data
if (_is_pred_column[cid] ||
Expand All @@ -1964,7 +1966,7 @@ Status SegmentIterator::_init_current_block(
} else if (column_desc->type() == FieldType::OLAP_FIELD_TYPE_DATETIME) {
current_columns[cid]->set_datetime_type();
}
current_columns[cid]->reserve(_opts.block_row_max);
current_columns[cid]->reserve(nrows_read_limit);
}
}
}
Expand Down Expand Up @@ -2378,14 +2380,16 @@ Status SegmentIterator::_next_batch_internal(vectorized::Block* block) {
}
}
}
RETURN_IF_ERROR(_init_current_block(block, _current_return_columns));
_converted_column_ids.assign(_schema->columns().size(), 0);

_current_batch_rows_read = 0;
uint32_t nrows_read_limit = _opts.block_row_max;
if (_can_opt_topn_reads()) {
nrows_read_limit = std::min(static_cast<uint32_t>(_opts.topn_limit), nrows_read_limit);
}

RETURN_IF_ERROR(_init_current_block(block, _current_return_columns, nrows_read_limit));
_converted_column_ids.assign(_schema->columns().size(), 0);

_current_batch_rows_read = 0;
RETURN_IF_ERROR(_read_columns_by_index(
nrows_read_limit, _current_batch_rows_read,
_lazy_materialization_read || _opts.record_rowids || _is_need_expr_eval));
Expand Down Expand Up @@ -2808,7 +2812,8 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
} else if (_is_literal_node(node_type)) {
auto v_literal_expr = static_cast<const doris::vectorized::VLiteral*>(expr.get());
_column_predicate_info->query_values.insert(v_literal_expr->value());
} else {
} else if (node_type == TExprNodeType::BINARY_PRED || node_type == TExprNodeType::MATCH_PRED ||
node_type == TExprNodeType::IN_PRED) {
if (node_type == TExprNodeType::MATCH_PRED) {
_column_predicate_info->query_op = "match";
} else if (node_type == TExprNodeType::IN_PRED) {
Expand All @@ -2817,7 +2822,7 @@ void SegmentIterator::_calculate_pred_in_remaining_conjunct_root(
} else {
_column_predicate_info->query_op = "not_in";
}
} else if (node_type != TExprNodeType::COMPOUND_PRED) {
} else {
_column_predicate_info->query_op = expr->fn().name.function_name;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,8 @@ class SegmentIterator : public RowwiseIterator {
bool set_block_rowid);
void _replace_version_col(size_t num_rows);
Status _init_current_block(vectorized::Block* block,
std::vector<vectorized::MutableColumnPtr>& non_pred_vector);
std::vector<vectorized::MutableColumnPtr>& non_pred_vector,
uint32_t nrows_read_limit);
uint16_t _evaluate_vectorization_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
uint16_t _evaluate_short_circuit_predicate(uint16_t* sel_rowid_idx, uint16_t selected_size);
void _output_non_pred_columns(vectorized::Block* block);
Expand Down
22 changes: 22 additions & 0 deletions be/src/olap/task/index_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "olap/task/index_builder.h"

#include "common/status.h"
#include "gutil/integral_types.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset_writer_context.h"
Expand Down Expand Up @@ -124,7 +125,28 @@ Status IndexBuilder::update_inverted_index_info() {
}
}
_dropped_inverted_indexes.push_back(*index_meta);
// ATTN: DO NOT REMOVE INDEX AFTER OUTPUT_ROWSET_WRITER CREATED.
// remove dropped index_meta from output rowset tablet schema
output_rs_tablet_schema->remove_index(index_meta->index_id());
}
DBUG_EXECUTE_IF("index_builder.update_inverted_index_info.drop_index", {
auto indexes_count = DebugPoints::instance()->get_debug_param_or_default<int32_t>(
"index_builder.update_inverted_index_info.drop_index", "indexes_count", 0);
if (indexes_count < 0) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"indexes count cannot be negative");
}
int32_t indexes_size = 0;
for (auto index : output_rs_tablet_schema->indexes()) {
if (index.index_type() == IndexType::INVERTED) {
indexes_size++;
}
}
if (indexes_count != indexes_size) {
return Status::Error<ErrorCode::INTERNAL_ERROR>(
"indexes count not equal to expected");
}
})
} else {
// base on input rowset's tablet_schema to build
// output rowset's tablet_schema which only add
Expand Down
5 changes: 0 additions & 5 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -559,11 +559,6 @@ Status AnalyticLocalState::close(RuntimeState* state) {

std::vector<vectorized::MutableColumnPtr> tmp_result_window_columns;
_result_window_columns.swap(tmp_result_window_columns);
// Some kinds of source operators has a 1-1 relationship with a sink operator (such as AnalyticOperator).
// We must ensure AnalyticSinkOperator will not be blocked if AnalyticSourceOperator already closed.
if (_shared_state && _shared_state->sink_deps.size() == 1) {
_shared_state->sink_deps.front()->set_always_ready();
}
return PipelineXLocalState<AnalyticSharedState>::close(state);
}

Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -567,7 +567,11 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
} else if (!local_state._should_build_hash_table) {
DCHECK(_shared_hashtable_controller != nullptr);
DCHECK(_shared_hash_table_context != nullptr);
CHECK(_shared_hash_table_context->signaled);
// the instance which is not build hash table, it's should wait the signal of hash table build finished.
// but if it's running and signaled == false, maybe the source operator have closed caused by some short circuit,
if (!_shared_hash_table_context->signaled) {
return Status::Error<ErrorCode::END_OF_FILE>("source have closed");
}

if (!_shared_hash_table_context->status.ok()) {
return _shared_hash_table_context->status;
Expand Down
115 changes: 71 additions & 44 deletions be/src/pipeline/exec/multi_cast_data_streamer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,63 +23,97 @@

namespace doris::pipeline {

MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, size_t mem_size)
: _used_count(used_count), _mem_size(mem_size) {
MultiCastBlock::MultiCastBlock(vectorized::Block* block, int used_count, int un_finish_copy,
size_t mem_size)
: _used_count(used_count), _un_finish_copy(un_finish_copy), _mem_size(mem_size) {
_block = vectorized::Block::create_unique(block->get_columns_with_type_and_name());
block->clear();
}

Status MultiCastDataStreamer::pull(int sender_idx, doris::vectorized::Block* block, bool* eos) {
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
if (pos_to_pull != _multi_cast_blocks.end()) {
if (pos_to_pull->_used_count == 1) {
DCHECK(pos_to_pull == _multi_cast_blocks.begin());
pos_to_pull->_block->swap(*block);

_cumulative_mem_size -= pos_to_pull->_mem_size;
pos_to_pull++;
_multi_cast_blocks.pop_front();
} else {
pos_to_pull->_block->create_same_struct_block(0)->swap(*block);
RETURN_IF_ERROR(vectorized::MutableBlock(block).merge(*pos_to_pull->_block));
pos_to_pull->_used_count--;
pos_to_pull++;
int* un_finish_copy = nullptr;
int use_count = 0;
{
std::lock_guard l(_mutex);
auto& pos_to_pull = _sender_pos_to_read[sender_idx];
const auto end = _multi_cast_blocks.end();
DCHECK(pos_to_pull != end);

*block = *pos_to_pull->_block;

_cumulative_mem_size -= pos_to_pull->_mem_size;

pos_to_pull->_used_count--;
use_count = pos_to_pull->_used_count;
un_finish_copy = &pos_to_pull->_un_finish_copy;

pos_to_pull++;

if (pos_to_pull == end) {
_block_reading(sender_idx);
}

*eos = _eos and pos_to_pull == end;
}
*eos = _eos and pos_to_pull == _multi_cast_blocks.end();
if (pos_to_pull == _multi_cast_blocks.end()) {
_block_reading(sender_idx);

if (use_count == 0) {
// will clear _multi_cast_blocks
_wait_copy_block(block, *un_finish_copy);
} else {
_copy_block(block, *un_finish_copy);
}

return Status::OK();
}

void MultiCastDataStreamer::_copy_block(vectorized::Block* block, int& un_finish_copy) {
const auto rows = block->rows();
for (int i = 0; i < block->columns(); ++i) {
block->get_by_position(i).column = block->get_by_position(i).column->clone_resized(rows);
}

std::unique_lock l(_mutex);
un_finish_copy--;
if (un_finish_copy == 0) {
l.unlock();
_cv.notify_one();
}
}

void MultiCastDataStreamer::_wait_copy_block(vectorized::Block* block, int& un_finish_copy) {
std::unique_lock l(_mutex);
_cv.wait(l, [&]() { return un_finish_copy == 0; });
_multi_cast_blocks.pop_front();
}

Status MultiCastDataStreamer::push(RuntimeState* state, doris::vectorized::Block* block, bool eos) {
auto rows = block->rows();
COUNTER_UPDATE(_process_rows, rows);

auto block_mem_size = block->allocated_bytes();
std::lock_guard l(_mutex);
int need_process_count = _cast_sender_count - _closed_sender_count;
if (need_process_count == 0) {
return Status::EndOfFile("All data streamer is EOF");
}
// TODO: if the [queue back block rows + block->rows()] < batch_size, better
// do merge block. but need check the need_process_count and used_count whether
// equal
_multi_cast_blocks.emplace_back(block, need_process_count, block_mem_size);
const auto block_mem_size = block->allocated_bytes();
_cumulative_mem_size += block_mem_size;
COUNTER_SET(_peak_mem_usage, std::max(_cumulative_mem_size, _peak_mem_usage->value()));

auto end = _multi_cast_blocks.end();
end--;
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
{
std::lock_guard l(_mutex);
_multi_cast_blocks.emplace_back(block, _cast_sender_count, _cast_sender_count - 1,
block_mem_size);
// last elem
auto end = std::prev(_multi_cast_blocks.end());
for (int i = 0; i < _sender_pos_to_read.size(); ++i) {
if (_sender_pos_to_read[i] == _multi_cast_blocks.end()) {
_sender_pos_to_read[i] = end;
_set_ready_for_read(i);
}
}
_eos = eos;
}

if (_eos) {
for (auto* read_dep : _dependencies) {
read_dep->set_always_ready();
}
}
_eos = eos;
return Status::OK();
}

Expand All @@ -92,13 +126,6 @@ void MultiCastDataStreamer::_set_ready_for_read(int sender_idx) {
dep->set_ready();
}

void MultiCastDataStreamer::_set_ready_for_read() {
for (auto* dep : _dependencies) {
DCHECK(dep);
dep->set_ready();
}
}

void MultiCastDataStreamer::_block_reading(int sender_idx) {
if (_dependencies.empty()) {
return;
Expand Down
Loading

0 comments on commit 8cf1bab

Please sign in to comment.