diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 39c0575c8b16b3..a55d4f42f66eda 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -81,7 +81,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { for (size_t i = 0; i < partition.tablet_ids.size(); i++) { auto tablet_id = partition.tablet_ids[i]; auto tablet_calc_delete_bitmap_ptr = std::make_shared( - _engine, this, tablet_id, transaction_id, version); + _engine, tablet_id, transaction_id, version); if (has_compaction_stats) { tablet_calc_delete_bitmap_ptr->set_compaction_stats( partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i], @@ -90,10 +90,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { if (has_tablet_states) { tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]); } - auto submit_st = token->submit_func([=]() { + auto submit_st = token->submit_func([tablet_id, tablet_calc_delete_bitmap_ptr, this]() { auto st = tablet_calc_delete_bitmap_ptr->handle(); - if (!st.ok()) { + if (st.ok()) { + add_succ_tablet_id(tablet_id); + } else { LOG(WARNING) << "handle calc delete bitmap fail, st=" << st.to_string(); + add_error_tablet_id(tablet_id, st); } }); VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" << tablet_id; @@ -113,11 +116,11 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { return _res; } -CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask( - CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id, - int64_t transaction_id, int64_t version) +CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, + int64_t tablet_id, + int64_t transaction_id, + int64_t version) : _engine(engine), - _engine_calc_delete_bitmap_task(engine_task), _tablet_id(tablet_id), _transaction_id(transaction_id), _version(version) { @@ -146,10 +149,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { std::shared_ptr tablet = std::dynamic_pointer_cast(base_tablet); if (tablet == nullptr) { LOG(WARNING) << "can't get tablet when calculate delete bitmap. tablet_id=" << _tablet_id; - auto error_st = Status::Error( + return Status::Error( "can't get tablet when calculate delete bitmap. tablet_id={}", _tablet_id); - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st); - return error_st; } int64_t max_version = tablet->max_version_unlocked(); int64_t t2 = MonotonicMicros(); @@ -177,18 +178,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { if (!sync_st.ok()) { LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << sync_st; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st); return sync_st; } if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] { - _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " "tablet_id: " << _tablet_id << " txn_id: " << _transaction_id << ", request_version=" << _version; - return Status::Error( - "invalid tablet state {}. tablet_id={}", tablet->tablet_state(), - tablet->tablet_id()); + return Status::OK(); } } auto sync_rowset_time_us = MonotonicMicros() - t2; @@ -200,10 +197,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { LOG(WARNING) << "version not continuous, current max version=" << max_version << ", request_version=" << _version << " tablet_id=" << _tablet_id; } - auto error_st = - Status::Error("version not continuous"); - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st); - return error_st; + return Status::Error("version not continuous"); } RowsetSharedPtr rowset; @@ -219,7 +213,6 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status); return status; } @@ -270,6 +263,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { // delete bitmap cache missed, should re-calculate delete bitmaps between segments std::vector segments; RETURN_IF_ERROR(std::static_pointer_cast(rowset)->load_segments(&segments)); + DBUG_EXECUTE_IF("_handle_rowset.inject.before.calc_between_segments", { + LOG_INFO("inject error when CloudTabletCalcDeleteBitmapTask::_handle_rowset"); + return Status::MemoryLimitExceeded("injected MemoryLimitExceeded error"); + }); RETURN_IF_ERROR( tablet->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap)); } @@ -281,11 +278,9 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id() << ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << status; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status); return status; } - _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); LOG(INFO) << "calculate delete bitmap successfully on tablet" << ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id << ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << rowset->num_rows() @@ -293,7 +288,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { << ", sync_rowset_time_us=" << sync_rowset_time_us << ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us << ", res=" << status; - return status; + return Status::OK(); } #include "common/compile_check_end.h" diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h index 62bd91b0a8ab3c..cbff46a0b2cc71 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h @@ -33,8 +33,7 @@ class MemTrackerLimiter; class CloudTabletCalcDeleteBitmapTask { public: - CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, - CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id, + CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, int64_t tablet_id, int64_t transaction_id, int64_t version); ~CloudTabletCalcDeleteBitmapTask() = default; @@ -46,7 +45,6 @@ class CloudTabletCalcDeleteBitmapTask { private: CloudStorageEngine& _engine; - CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task; int64_t _tablet_id; int64_t _transaction_id; diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy index b741a6e998632b..e6166d6c41a128 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy @@ -58,6 +58,36 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr) } + def loadMultiSegmentData = { tableName, rows, succ, String err="" -> + // load data that will have multi segments and there are duplicate keys between segments + String content = "" + (1..rows).each { + content += "${it},${it},${it}\n" + } + content += content + streamLoad { + table "${tableName}" + set 'column_separator', ',' + inputStream new ByteArrayInputStream(content.getBytes()) + time 30000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + if (succ) { + assert "success" == json.Status.toLowerCase() + assert rows*2 == json.NumberTotalRows + assert 0 == json.NumberFilteredRows + } else { + assert "fail" == json.Status.toLowerCase() + assert json.Message.contains(err) + } + } + } + } + // to cause multi segments def customBeConfig = [ doris_scanner_row_bytes : 1 @@ -75,33 +105,7 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { Thread.sleep(1000) - def t1 = Thread.start { - // load data that will have multi segments and there are duplicate keys between segments - String content = "" - (1..4096).each { - content += "${it},${it},${it}\n" - } - content += content - streamLoad { - table "${table1}" - set 'column_separator', ',' - inputStream new ByteArrayInputStream(content.getBytes()) - time 30000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - def json = parseJson(result) - assert "success" == json.Status.toLowerCase() - assert 8192 == json.NumberTotalRows - assert 0 == json.NumberFilteredRows - } - } - } - - - t1.join() + loadMultiSegmentData(table1, 4096, true) GetDebugPoint().clearDebugPointsForAllBEs() Thread.sleep(2000) @@ -120,4 +124,51 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllFEs() } } + + // abnormal case, fail when calc between segments + def table2 = "test_cloud_multi_segments_re_calc_in_publish_fail" + sql "DROP TABLE IF EXISTS ${table2} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table2} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table2} values(99999,99999,99999);" + sql "insert into ${table2} values(88888,88888,88888);" + sql "insert into ${table2} values(77777,77777,77777);" + sql "sync;" + qt_sql "select * from ${table2} order by k1;" + + setBeConfigTemporary(customBeConfig) { + try { + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss") + + // fail in calc_delete_bitmap_between_segments + GetDebugPoint().enableDebugPointForAllBEs("_handle_rowset.inject.before.calc_between_segments") + + Thread.sleep(1000) + + loadMultiSegmentData(table2, 4096, false, "injected MemoryLimitExceeded error") + + GetDebugPoint().clearDebugPointsForAllBEs() + Thread.sleep(2000) + + qt_sql "select count() from ${table2};" + + qt_dup_key_count "select count() from (select k1,count() as cnt from ${table2} group by k1 having cnt > 1) A;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } }