Skip to content

Commit

Permalink
Merge branch 'master' into DORIS-12389
Browse files Browse the repository at this point in the history
  • Loading branch information
kaijchen authored Sep 4, 2024
2 parents 7b1cae2 + 5eab877 commit a73d971
Show file tree
Hide file tree
Showing 228 changed files with 4,071 additions and 5,200 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/code-checks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ jobs:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
sh_checker_comment: true
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest
sh_checker_exclude: .git .github ^docker ^thirdparty/src ^thirdparty/installed ^ui ^docs/node_modules ^tools/clickbench-tools ^extension ^output ^fs_brokers/apache_hdfs_broker/output (^|.*/)Dockerfile$ ^be/src/apache-orc ^be/src/clucene ^pytest ^samples

preparation:
name: "Clang Tidy Preparation"
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/scope-label.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
github.event_name == 'pull_request_target' &&
(github.event.action == 'opened' ||
github.event.action == 'synchronize')
uses: actions/labeler@v5.5.0
uses: actions/labeler@2.2.0
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
configuration-path: .github/workflows/labeler/scope-label-conf.yml
Expand Down
21 changes: 20 additions & 1 deletion be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -715,7 +715,10 @@ Status CloudMetaMgr::prepare_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "prepare rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();

{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::prepare_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -741,6 +744,10 @@ Status CloudMetaMgr::commit_rowset(const RowsetMeta& rs_meta,
RowsetMetaSharedPtr* existed_rs_meta) {
VLOG_DEBUG << "commit rowset, tablet_id: " << rs_meta.tablet_id()
<< ", rowset_id: " << rs_meta.rowset_id() << " txn_id: " << rs_meta.txn_id();
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_rowset", ret_st);
}
CreateRowsetRequest req;
CreateRowsetResponse resp;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down Expand Up @@ -841,6 +848,10 @@ static void send_stats_to_fe_async(const int64_t db_id, const int64_t txn_id,
Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
VLOG_DEBUG << "commit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label << ", is_2pc: " << is_2pc;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::commit_txn", ret_st);
}
CommitTxnRequest req;
CommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -860,6 +871,10 @@ Status CloudMetaMgr::commit_txn(const StreamLoadContext& ctx, bool is_2pc) {
Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "abort txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::abort_txn", ret_st);
}
AbortTxnRequest req;
AbortTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand All @@ -879,6 +894,10 @@ Status CloudMetaMgr::abort_txn(const StreamLoadContext& ctx) {
Status CloudMetaMgr::precommit_txn(const StreamLoadContext& ctx) {
VLOG_DEBUG << "precommit txn, db_id: " << ctx.db_id << ", txn_id: " << ctx.txn_id
<< ", label: " << ctx.label;
{
Status ret_st;
TEST_INJECTION_POINT_RETURN_WITH_VALUE("CloudMetaMgr::precommit_txn", ret_st);
}
PrecommitTxnRequest req;
PrecommitTxnResponse res;
req.set_cloud_unique_id(config::cloud_unique_id);
Expand Down
2 changes: 1 addition & 1 deletion be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
std::stringstream ss;
ss << "db_id=" << ctx->db_id << " txn_id=" << ctx->txn_id << " label=" << ctx->label;
std::string op_info = ss.str();
LOG(INFO) << "rollback stream laod txn " << op_info;
LOG(INFO) << "rollback stream load txn " << op_info;
TxnOpParamType topt = ctx->txn_id > 0 ? TxnOpParamType::WITH_TXN_ID
: !ctx->label.empty() ? TxnOpParamType::WITH_LABEL
: TxnOpParamType::ILLEGAL;
Expand Down
63 changes: 55 additions & 8 deletions be/src/cloud/injection_point_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,17 @@ void register_suites() {
*arg0 = Status::Corruption<false>("test_file_segment_cache_corruption injection error");
});
});
// curl be_ip:http_port/api/injection_point/apply_suite?name=test_cloud_meta_mgr_commit_txn'
suite_map.emplace("test_cloud_meta_mgr_commit_txn", [] {
auto* sp = SyncPoint::get_instance();
sp->set_call_back("CloudMetaMgr::commit_txn", [](auto&& args) {
LOG(INFO) << "injection CloudMetaMgr::commit_txn";
auto* arg0 = try_any_cast_ret<Status>(args);
arg0->first = Status::InternalError<false>(
"test_file_segment_cache_corruption injection error");
arg0->second = true;
});
});
}

void set_sleep(const std::string& point, HttpRequest* req) {
Expand All @@ -139,16 +150,18 @@ void set_sleep(const std::string& point, HttpRequest* req) {
}
}
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [duration](auto&& args) {
sp->set_call_back(point, [point, duration](auto&& args) {
LOG(INFO) << "injection point hit, point=" << point << " sleep milliseconds=" << duration;
std::this_thread::sleep_for(std::chrono::milliseconds(duration));
});
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void set_return(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return void";
auto pred = try_any_cast<bool*>(args.back());
*pred = true;
} catch (const std::bad_any_cast&) {
Expand All @@ -160,8 +173,9 @@ void set_return(const std::string& point, HttpRequest* req) {

void set_return_ok(const std::string& point, HttpRequest* req) {
auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [](auto&& args) {
sp->set_call_back(point, [point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return ok";
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::OK();
pair->second = true;
Expand All @@ -188,8 +202,9 @@ void set_return_error(const std::string& point, HttpRequest* req) {
}

auto sp = SyncPoint::get_instance();
sp->set_call_back(point, [code](auto&& args) {
sp->set_call_back(point, [code, point](auto&& args) {
try {
LOG(INFO) << "injection point hit, point=" << point << " return error code=" << code;
auto* pair = try_any_cast_ret<Status>(args);
pair->first = Status::Error<false>(code, "injected error");
pair->second = true;
Expand Down Expand Up @@ -243,7 +258,7 @@ void handle_clear(HttpRequest* req) {
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
}

void handle_suite(HttpRequest* req) {
void handle_apply_suite(HttpRequest* req) {
auto& suite = req->param("name");
if (suite.empty()) {
HttpChannel::send_reply(req, HttpStatus::BAD_REQUEST, "empty suite name");
Expand All @@ -253,10 +268,11 @@ void handle_suite(HttpRequest* req) {
std::call_once(register_suites_once, register_suites);
if (auto it = suite_map.find(suite); it != suite_map.end()) {
it->second(); // set injection callbacks
HttpChannel::send_reply(req, HttpStatus::OK, "OK");
HttpChannel::send_reply(req, HttpStatus::OK, "OK apply suite " + suite + "\n");
return;
}
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR, "unknown suite: " + suite);
HttpChannel::send_reply(req, HttpStatus::INTERNAL_SERVER_ERROR,
"unknown suite: " + suite + "\n");
}

void handle_enable(HttpRequest* req) {
Expand All @@ -273,6 +289,37 @@ void handle_disable(HttpRequest* req) {

InjectionPointAction::InjectionPointAction() = default;

//
// enable/disable injection point
// ```
// curl "be_ip:http_port/api/injection_point/enable"
// curl "be_ip:http_port/api/injection_point/disable"
// ```
//
// clear all injection points
// ```
// curl "be_ip:http_port/api/injection_point/clear"
// ```
//
// apply/activate specific suite with registered action, see `register_suites()` for more details
// ```
// curl "be_ip:http_port/api/injection_point/apply_suite?name=${suite_name}"
// ```
//
// set predifined action for specific injection point, supported actions are:
// * sleep: for injection point with callback, accepted param is `duration` in milliseconds
// * return: for injection point without return value (return void)
// * return_ok: for injection point with return value, always return Status::OK
// * return_error: for injection point with return value, accepted param is `code`,
// which is an int, valid values can be found in status.h, e.g. -235 or -230,
// if `code` is not present return Status::InternalError
// ```
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=sleep&duration=${x_millsec}" # sleep x millisecs
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return" # return void
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_ok" # return ok
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error" # internal error
// curl "be_ip:http_port/api/injection_point/set?name=${injection_point_name}&behavior=return_error&code=${code}" # -235
// ```
void InjectionPointAction::handle(HttpRequest* req) {
LOG(INFO) << "handle InjectionPointAction " << req->debug_string();
auto& op = req->param("op");
Expand All @@ -283,7 +330,7 @@ void InjectionPointAction::handle(HttpRequest* req) {
handle_clear(req);
return;
} else if (op == "apply_suite") {
handle_suite(req);
handle_apply_suite(req);
return;
} else if (op == "enable") {
handle_enable(req);
Expand Down
2 changes: 1 addition & 1 deletion be/src/common/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ DEFINE_mInt32(default_num_rows_per_column_file_block, "1024");
// pending data policy
DEFINE_mInt32(pending_data_expire_time_sec, "1800");
// inc_rowset snapshot rs sweep time interval
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "300");
DEFINE_mInt32(tablet_rowset_stale_sweep_time_sec, "600");
// tablet stale rowset sweep by threshold size
DEFINE_Bool(tablet_rowset_stale_sweep_by_size, "false");
DEFINE_mInt32(tablet_rowset_stale_sweep_threshold_size, "100");
Expand Down
9 changes: 5 additions & 4 deletions be/src/olap/rowset/segment_v2/segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,7 @@ Status Segment::_create_column_readers(const SegmentFooterPB& footer) {
vectorized::PathInData path;
path.from_protobuf(spase_column_pb.column_path_info());
// Read from root column, so reader is nullptr
_sparse_column_tree[column.unique_id()].add(
_sparse_column_tree[unique_id].add(
path.copy_pop_front(),
SubcolumnReader {nullptr,
vectorized::DataTypeFactory::instance().create_data_type(
Expand Down Expand Up @@ -617,9 +617,10 @@ Status Segment::new_column_iterator_with_path(const TabletColumn& tablet_column,
const auto* node = tablet_column.has_path_info()
? _sub_column_tree[unique_id].find_exact(relative_path)
: nullptr;
const auto* sparse_node = tablet_column.has_path_info()
? _sparse_column_tree[unique_id].find_exact(relative_path)
: nullptr;
const auto* sparse_node =
tablet_column.has_path_info() && _sparse_column_tree.contains(unique_id)
? _sparse_column_tree[unique_id].find_exact(relative_path)
: nullptr;
// Currently only compaction and checksum need to read flat leaves
// They both use tablet_schema_with_merged_max_schema_version as read schema
auto type_to_read_flat_leaves = [](ReaderType type) {
Expand Down
94 changes: 37 additions & 57 deletions be/src/olap/rowset/segment_v2/segment_iterator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -503,35 +503,41 @@ Status SegmentIterator::_get_row_ranges_by_column_conditions() {
}

RETURN_IF_ERROR(_apply_bitmap_index());
RETURN_IF_ERROR(_apply_inverted_index());
RETURN_IF_ERROR(_apply_index_expr());
size_t input_rows = _row_bitmap.cardinality();
for (auto it = _common_expr_ctxs_push_down.begin(); it != _common_expr_ctxs_push_down.end();) {
if ((*it)->all_expr_inverted_index_evaluated()) {
const auto* result =
(*it)->get_inverted_index_context()->get_inverted_index_result_for_expr(
(*it)->root().get());
if (result != nullptr) {
_row_bitmap &= *result->get_data_bitmap();
auto root = (*it)->root();
auto iter_find = std::find(_remaining_conjunct_roots.begin(),
_remaining_conjunct_roots.end(), root);
if (iter_find != _remaining_conjunct_roots.end()) {
_remaining_conjunct_roots.erase(iter_find);
{
if (_opts.runtime_state &&
_opts.runtime_state->query_options().enable_inverted_index_query) {
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
size_t input_rows = _row_bitmap.cardinality();
RETURN_IF_ERROR(_apply_inverted_index());
RETURN_IF_ERROR(_apply_index_expr());
for (auto it = _common_expr_ctxs_push_down.begin();
it != _common_expr_ctxs_push_down.end();) {
if ((*it)->all_expr_inverted_index_evaluated()) {
const auto* result =
(*it)->get_inverted_index_context()->get_inverted_index_result_for_expr(
(*it)->root().get());
if (result != nullptr) {
_row_bitmap &= *result->get_data_bitmap();
auto root = (*it)->root();
auto iter_find = std::find(_remaining_conjunct_roots.begin(),
_remaining_conjunct_roots.end(), root);
if (iter_find != _remaining_conjunct_roots.end()) {
_remaining_conjunct_roots.erase(iter_find);
}
it = _common_expr_ctxs_push_down.erase(it);
}
} else {
++it;
}
it = _common_expr_ctxs_push_down.erase(it);
}
} else {
++it;
}
}

_opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
for (auto cid : _schema->column_ids()) {
bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid);
_opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
for (auto cid : _schema->column_ids()) {
bool result_true = _check_all_conditions_passed_inverted_index_for_column(cid);

if (result_true) {
_need_read_data_indices[cid] = false;
if (result_true) {
_need_read_data_indices[cid] = false;
}
}
}
}
if (!_row_bitmap.isEmpty() &&
Expand Down Expand Up @@ -734,18 +740,7 @@ Status SegmentIterator::_extract_common_expr_columns(const vectorized::VExprSPtr
return Status::OK();
}

bool SegmentIterator::_check_apply_by_inverted_index(ColumnId col_id) {
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) {
return false;
}
if (_inverted_index_iterators[col_id] == nullptr) {
//this column without inverted index
return false;
}
return true;
}

bool SegmentIterator::_check_apply_by_inverted_index(ColumnPredicate* pred, bool pred_in_compound) {
bool SegmentIterator::_check_apply_by_inverted_index(ColumnPredicate* pred) {
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) {
return false;
}
Expand All @@ -765,11 +760,6 @@ bool SegmentIterator::_check_apply_by_inverted_index(ColumnPredicate* pred, bool
if (predicate_param->marked_by_runtime_filter) {
return false;
}
// the in_list or not_in_list value count cannot be greater than threshold
int32_t threshold = _opts.runtime_state->query_options().in_list_value_count_threshold;
if (pred_in_compound && predicate_param->values.size() > threshold) {
return false;
}
}

// UNTOKENIZED strings exceed ignore_above, they are written as null, causing range query errors
Expand All @@ -788,15 +778,11 @@ bool SegmentIterator::_check_apply_by_inverted_index(ColumnPredicate* pred, bool

bool handle_by_fulltext = _column_has_fulltext_index(pred_column_id);
if (handle_by_fulltext) {
// when predicate in compound condition which except leafNode of andNode,
// only can apply match query for fulltext index,
// when predicate is leafNode of andNode,
// can apply 'match qeury' and 'equal query' and 'list query' for fulltext index.
return (pred_in_compound ? pred->type() == PredicateType::MATCH
: (pred->type() == PredicateType::MATCH ||
pred->type() == PredicateType::IS_NULL ||
pred->type() == PredicateType::IS_NOT_NULL ||
PredicateTypeTraits::is_equal_or_list(pred->type())));
// can apply 'match query' and 'equal query' and 'list query' for fulltext index.
return pred->type() == PredicateType::MATCH || pred->type() == PredicateType::IS_NULL ||
pred->type() == PredicateType::IS_NOT_NULL ||
PredicateTypeTraits::is_equal_or_list(pred->type());
}

return true;
Expand Down Expand Up @@ -968,11 +954,6 @@ bool SegmentIterator::_is_target_expr_match_predicate(const vectorized::VExprSPt
}

Status SegmentIterator::_apply_inverted_index() {
SCOPED_RAW_TIMER(&_opts.stats->inverted_index_filter_timer);
if (_opts.runtime_state && !_opts.runtime_state->query_options().enable_inverted_index_query) {
return Status::OK();
}
size_t input_rows = _row_bitmap.cardinality();
std::vector<ColumnPredicate*> remaining_predicates;
std::set<const ColumnPredicate*> no_need_to_pass_column_predicate_set;

Expand All @@ -990,7 +971,6 @@ Status SegmentIterator::_apply_inverted_index() {
}

_col_predicates = std::move(remaining_predicates);
_opts.stats->rows_inverted_index_filtered += (input_rows - _row_bitmap.cardinality());
return Status::OK();
}

Expand Down
3 changes: 1 addition & 2 deletions be/src/olap/rowset/segment_v2/segment_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -298,8 +298,7 @@ class SegmentIterator : public RowwiseIterator {

void _convert_dict_code_for_predicate_if_necessary_impl(ColumnPredicate* predicate);

bool _check_apply_by_inverted_index(ColumnId col_id);
bool _check_apply_by_inverted_index(ColumnPredicate* pred, bool pred_in_compound = false);
bool _check_apply_by_inverted_index(ColumnPredicate* pred);

void _output_index_result_column_for_expr(uint16_t* sel_rowid_idx, uint16_t select_size,
vectorized::Block* block);
Expand Down
Loading

0 comments on commit a73d971

Please sign in to comment.