diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp index 53638abffc2d95..ecf33c19d6eb7e 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp @@ -81,7 +81,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { for (size_t i = 0; i < partition.tablet_ids.size(); i++) { auto tablet_id = partition.tablet_ids[i]; auto tablet_calc_delete_bitmap_ptr = std::make_shared( - _engine, this, tablet_id, transaction_id, version, partition.sub_txn_ids); + _engine, tablet_id, transaction_id, version, partition.sub_txn_ids); if (has_compaction_stats) { tablet_calc_delete_bitmap_ptr->set_compaction_stats( partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i], @@ -90,10 +90,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { if (has_tablet_states) { tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]); } - auto submit_st = token->submit_func([=]() { + auto submit_st = token->submit_func([tablet_id, tablet_calc_delete_bitmap_ptr, this]() { auto st = tablet_calc_delete_bitmap_ptr->handle(); - if (!st.ok()) { + if (st.ok()) { + add_succ_tablet_id(tablet_id); + } else { LOG(WARNING) << "handle calc delete bitmap fail, st=" << st.to_string(); + add_error_tablet_id(tablet_id, st); } }); VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" << tablet_id; @@ -114,10 +117,9 @@ Status CloudEngineCalcDeleteBitmapTask::execute() { } CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask( - CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id, - int64_t transaction_id, int64_t version, const std::vector& sub_txn_ids) + CloudStorageEngine& engine, int64_t tablet_id, int64_t transaction_id, int64_t version, + const std::vector& sub_txn_ids) : _engine(engine), - _engine_calc_delete_bitmap_task(engine_task), _tablet_id(tablet_id), _transaction_id(transaction_id), _version(version), @@ -146,11 +148,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { auto get_tablet_time_us = MonotonicMicros() - t1; std::shared_ptr tablet = std::dynamic_pointer_cast(base_tablet); if (tablet == nullptr) { - LOG(WARNING) << "can't get tablet when calculate delete bitmap. tablet_id=" << _tablet_id; - auto error_st = Status::Error( + return Status::Error( "can't get tablet when calculate delete bitmap. tablet_id={}", _tablet_id); - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st); - return error_st; } int64_t max_version = tablet->max_version_unlocked(); int64_t t2 = MonotonicMicros(); @@ -178,18 +177,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { if (!sync_st.ok()) { LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id << ", status=" << sync_st; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st); return sync_st; } if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] { - _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, " "tablet_id: " << _tablet_id << " txn_id: " << _transaction_id << ", request_version=" << _version; - return Status::Error( - "invalid tablet state {}. tablet_id={}", tablet->tablet_state(), - tablet->tablet_id()); + return Status::OK(); } } auto sync_rowset_time_us = MonotonicMicros() - t2; @@ -201,10 +196,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const { LOG(WARNING) << "version not continuous, current max version=" << max_version << ", request_version=" << _version << " tablet_id=" << _tablet_id; } - auto error_st = - Status::Error("version not continuous"); - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st); - return error_st; + return Status::Error("version not continuous"); } int64_t t3 = MonotonicMicros(); @@ -286,7 +278,6 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset( if (status != Status::OK()) { LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id << ", " << txn_str << ", status=" << status; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status); return status; } @@ -326,6 +317,10 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset( // delete bitmap cache missed, should re-calculate delete bitmaps between segments std::vector segments; RETURN_IF_ERROR(std::static_pointer_cast(rowset)->load_segments(&segments)); + DBUG_EXECUTE_IF("_handle_rowset.inject.before.calc_between_segments", { + LOG_INFO("inject error when CloudTabletCalcDeleteBitmapTask::_handle_rowset"); + return Status::MemoryLimitExceeded("injected MemoryLimitExceeded error"); + }); RETURN_IF_ERROR(tablet->calc_delete_bitmap_between_segments(rowset->rowset_id(), segments, delete_bitmap)); } @@ -345,11 +340,9 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset( if (status != Status::OK()) { LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id() << ", tablet_id=" << _tablet_id << ", " << txn_str << ", status=" << status; - _engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status); return status; } - _engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id); if (invisible_rowsets != nullptr) { invisible_rowsets->push_back(rowset); // see CloudTablet::save_delete_bitmap @@ -361,7 +354,7 @@ Status CloudTabletCalcDeleteBitmapTask::_handle_rowset( } } } - return status; + return Status::OK(); } #include "common/compile_check_end.h" diff --git a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h index 7bdba4992cbe9b..f5cbfe5a0b0dfa 100644 --- a/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h +++ b/be/src/cloud/cloud_engine_calc_delete_bitmap_task.h @@ -33,8 +33,7 @@ class MemTrackerLimiter; class CloudTabletCalcDeleteBitmapTask { public: - CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, - CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id, + CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, int64_t tablet_id, int64_t transaction_id, int64_t version, const std::vector& sub_txn_ids); ~CloudTabletCalcDeleteBitmapTask() = default; @@ -52,7 +51,6 @@ class CloudTabletCalcDeleteBitmapTask { DeleteBitmapPtr tablet_delete_bitmap = nullptr) const; CloudStorageEngine& _engine; - CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task; int64_t _tablet_id; int64_t _transaction_id; diff --git a/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out b/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out index f9e767c3d20f3e..908ea49d7ddced 100644 --- a/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out +++ b/regression-test/data/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.out @@ -10,3 +10,14 @@ -- !dup_key_count -- 0 +-- !sql -- +77777 77777 77777 +88888 88888 88888 +99999 99999 99999 + +-- !sql -- +3 + +-- !dup_key_count -- +0 + diff --git a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy index b741a6e998632b..eeee9cc38facdf 100644 --- a/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy +++ b/regression-test/suites/fault_injection_p0/cloud/test_cloud_multi_segments_re_calc_in_publish.groovy @@ -58,6 +58,36 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr) } + def loadMultiSegmentData = { tableName, rows, succ, String err="" -> + // load data that will have multi segments and there are duplicate keys between segments + String content = "" + (1..rows).each { + content += "${it},${it},${it}\n" + } + content += content + streamLoad { + table "${tableName}" + set 'column_separator', ',' + inputStream new ByteArrayInputStream(content.getBytes()) + time 30000 + + check { result, exception, startTime, endTime -> + if (exception != null) { + throw exception + } + def json = parseJson(result) + if (succ) { + assert "success" == json.Status.toLowerCase() + assert rows*2 == json.NumberTotalRows + assert 0 == json.NumberFilteredRows + } else { + assert "fail" == json.Status.toLowerCase() + assert json.Message.contains(err) + } + } + } + } + // to cause multi segments def customBeConfig = [ doris_scanner_row_bytes : 1 @@ -75,33 +105,7 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { Thread.sleep(1000) - def t1 = Thread.start { - // load data that will have multi segments and there are duplicate keys between segments - String content = "" - (1..4096).each { - content += "${it},${it},${it}\n" - } - content += content - streamLoad { - table "${table1}" - set 'column_separator', ',' - inputStream new ByteArrayInputStream(content.getBytes()) - time 30000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - if (exception != null) { - throw exception - } - def json = parseJson(result) - assert "success" == json.Status.toLowerCase() - assert 8192 == json.NumberTotalRows - assert 0 == json.NumberFilteredRows - } - } - } - - - t1.join() + loadMultiSegmentData(table1, 4096, true) GetDebugPoint().clearDebugPointsForAllBEs() Thread.sleep(2000) @@ -120,4 +124,51 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") { GetDebugPoint().clearDebugPointsForAllFEs() } } + + // abnormal case, fail when calc between segments + def table2 = "test_cloud_multi_segments_re_calc_in_publish_fail" + sql "DROP TABLE IF EXISTS ${table2} FORCE;" + sql """ CREATE TABLE IF NOT EXISTS ${table2} ( + `k1` int NOT NULL, + `c1` int, + `c2` int + )UNIQUE KEY(k1) + DISTRIBUTED BY HASH(k1) BUCKETS 1 + PROPERTIES ( + "enable_unique_key_merge_on_write" = "true", + "disable_auto_compaction" = "true", + "replication_num" = "1"); """ + + sql "insert into ${table2} values(99999,99999,99999);" + sql "insert into ${table2} values(88888,88888,88888);" + sql "insert into ${table2} values(77777,77777,77777);" + sql "sync;" + qt_sql "select * from ${table2} order by k1;" + + setBeConfigTemporary(customBeConfig) { + try { + GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush") + GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss") + + // fail in calc_delete_bitmap_between_segments + GetDebugPoint().enableDebugPointForAllBEs("_handle_rowset.inject.before.calc_between_segments") + + Thread.sleep(1000) + + loadMultiSegmentData(table2, 4096, false, "injected MemoryLimitExceeded error") + + GetDebugPoint().clearDebugPointsForAllBEs() + Thread.sleep(2000) + + qt_sql "select count() from ${table2};" + + qt_dup_key_count "select count() from (select k1,count() as cnt from ${table2} group by k1 having cnt > 1) A;" + } catch(Exception e) { + logger.info(e.getMessage()) + throw e + } finally { + GetDebugPoint().clearDebugPointsForAllBEs() + GetDebugPoint().clearDebugPointsForAllFEs() + } + } }