Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 18 additions & 23 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr = std::make_shared<CloudTabletCalcDeleteBitmapTask>(
_engine, this, tablet_id, transaction_id, version);
_engine, tablet_id, transaction_id, version);
if (has_compaction_stats) {
tablet_calc_delete_bitmap_ptr->set_compaction_stats(
partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i],
Expand All @@ -90,10 +90,13 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
if (has_tablet_states) {
tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]);
}
auto submit_st = token->submit_func([=]() {
auto submit_st = token->submit_func([tablet_id, tablet_calc_delete_bitmap_ptr, this]() {
auto st = tablet_calc_delete_bitmap_ptr->handle();
if (!st.ok()) {
if (st.ok()) {
add_succ_tablet_id(tablet_id);
} else {
LOG(WARNING) << "handle calc delete bitmap fail, st=" << st.to_string();
add_error_tablet_id(tablet_id, st);
}
});
VLOG_DEBUG << "submit TabletCalcDeleteBitmapTask for tablet=" << tablet_id;
Expand All @@ -113,11 +116,11 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
return _res;
}

CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(
CloudStorageEngine& engine, CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
int64_t transaction_id, int64_t version)
CloudTabletCalcDeleteBitmapTask::CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
int64_t tablet_id,
int64_t transaction_id,
int64_t version)
: _engine(engine),
_engine_calc_delete_bitmap_task(engine_task),
_tablet_id(tablet_id),
_transaction_id(transaction_id),
_version(version) {
Expand Down Expand Up @@ -146,10 +149,8 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_ptr<CloudTablet> tablet = std::dynamic_pointer_cast<CloudTablet>(base_tablet);
if (tablet == nullptr) {
LOG(WARNING) << "can't get tablet when calculate delete bitmap. tablet_id=" << _tablet_id;
auto error_st = Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
return Status::Error<ErrorCode::PUSH_TABLE_NOT_EXIST>(
"can't get tablet when calculate delete bitmap. tablet_id={}", _tablet_id);
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
return error_st;
}
int64_t max_version = tablet->max_version_unlocked();
int64_t t2 = MonotonicMicros();
Expand Down Expand Up @@ -177,18 +178,14 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << sync_st;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, sync_st);
return sync_st;
}
if (tablet->tablet_state() != TABLET_RUNNING) [[unlikely]] {
_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "tablet is under alter process, delete bitmap will be calculated later, "
"tablet_id: "
<< _tablet_id << " txn_id: " << _transaction_id
<< ", request_version=" << _version;
return Status::Error<ErrorCode::INVALID_TABLET_STATE>(
"invalid tablet state {}. tablet_id={}", tablet->tablet_state(),
tablet->tablet_id());
return Status::OK();
}
}
auto sync_rowset_time_us = MonotonicMicros() - t2;
Expand All @@ -200,10 +197,7 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
LOG(WARNING) << "version not continuous, current max version=" << max_version
<< ", request_version=" << _version << " tablet_id=" << _tablet_id;
}
auto error_st =
Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>("version not continuous");
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, error_st);
return error_st;
return Status::Error<ErrorCode::DELETE_BITMAP_LOCK_ERROR, false>("version not continuous");
}

RowsetSharedPtr rowset;
Expand All @@ -219,7 +213,6 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

Expand Down Expand Up @@ -270,6 +263,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
// delete bitmap cache missed, should re-calculate delete bitmaps between segments
std::vector<segment_v2::SegmentSharedPtr> segments;
RETURN_IF_ERROR(std::static_pointer_cast<BetaRowset>(rowset)->load_segments(&segments));
DBUG_EXECUTE_IF("_handle_rowset.inject.before.calc_between_segments", {
LOG_INFO("inject error when CloudTabletCalcDeleteBitmapTask::_handle_rowset");
return Status::MemoryLimitExceeded("injected MemoryLimitExceeded error");
});
RETURN_IF_ERROR(
tablet->calc_delete_bitmap_between_segments(rowset, segments, delete_bitmap));
}
Expand All @@ -281,19 +278,17 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
<< ", status=" << status;
_engine_calc_delete_bitmap_task->add_error_tablet_id(_tablet_id, status);
return status;
}

_engine_calc_delete_bitmap_task->add_succ_tablet_id(_tablet_id);
LOG(INFO) << "calculate delete bitmap successfully on tablet"
<< ", table_id=" << tablet->table_id() << ", transaction_id=" << _transaction_id
<< ", tablet_id=" << tablet->tablet_id() << ", num_rows=" << rowset->num_rows()
<< ", get_tablet_time_us=" << get_tablet_time_us
<< ", sync_rowset_time_us=" << sync_rowset_time_us
<< ", update_delete_bitmap_time_us=" << update_delete_bitmap_time_us
<< ", res=" << status;
return status;
return Status::OK();
}

#include "common/compile_check_end.h"
Expand Down
4 changes: 1 addition & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,7 @@ class MemTrackerLimiter;

class CloudTabletCalcDeleteBitmapTask {
public:
CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine,
CloudEngineCalcDeleteBitmapTask* engine_task, int64_t tablet_id,
CloudTabletCalcDeleteBitmapTask(CloudStorageEngine& engine, int64_t tablet_id,
int64_t transaction_id, int64_t version);
~CloudTabletCalcDeleteBitmapTask() = default;

Expand All @@ -46,7 +45,6 @@ class CloudTabletCalcDeleteBitmapTask {

private:
CloudStorageEngine& _engine;
CloudEngineCalcDeleteBitmapTask* _engine_calc_delete_bitmap_task;

int64_t _tablet_id;
int64_t _transaction_id;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,36 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") {
assert lastRowsetSegmentNum == Integer.parseInt(segmentNumStr)
}

def loadMultiSegmentData = { tableName, rows, succ, String err="" ->
// load data that will have multi segments and there are duplicate keys between segments
String content = ""
(1..rows).each {
content += "${it},${it},${it}\n"
}
content += content
streamLoad {
table "${tableName}"
set 'column_separator', ','
inputStream new ByteArrayInputStream(content.getBytes())
time 30000

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
if (succ) {
assert "success" == json.Status.toLowerCase()
assert rows*2 == json.NumberTotalRows
assert 0 == json.NumberFilteredRows
} else {
assert "fail" == json.Status.toLowerCase()
assert json.Message.contains(err)
}
}
}
}

// to cause multi segments
def customBeConfig = [
doris_scanner_row_bytes : 1
Expand All @@ -75,33 +105,7 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") {

Thread.sleep(1000)

def t1 = Thread.start {
// load data that will have multi segments and there are duplicate keys between segments
String content = ""
(1..4096).each {
content += "${it},${it},${it}\n"
}
content += content
streamLoad {
table "${table1}"
set 'column_separator', ','
inputStream new ByteArrayInputStream(content.getBytes())
time 30000 // limit inflight 10s

check { result, exception, startTime, endTime ->
if (exception != null) {
throw exception
}
def json = parseJson(result)
assert "success" == json.Status.toLowerCase()
assert 8192 == json.NumberTotalRows
assert 0 == json.NumberFilteredRows
}
}
}


t1.join()
loadMultiSegmentData(table1, 4096, true)

GetDebugPoint().clearDebugPointsForAllBEs()
Thread.sleep(2000)
Expand All @@ -120,4 +124,51 @@ suite("test_cloud_multi_segments_re_calc_in_publish", "nonConcurrent") {
GetDebugPoint().clearDebugPointsForAllFEs()
}
}

// abnormal case, fail when calc between segments
def table2 = "test_cloud_multi_segments_re_calc_in_publish_fail"
sql "DROP TABLE IF EXISTS ${table2} FORCE;"
sql """ CREATE TABLE IF NOT EXISTS ${table2} (
`k1` int NOT NULL,
`c1` int,
`c2` int
)UNIQUE KEY(k1)
DISTRIBUTED BY HASH(k1) BUCKETS 1
PROPERTIES (
"enable_unique_key_merge_on_write" = "true",
"disable_auto_compaction" = "true",
"replication_num" = "1"); """

sql "insert into ${table2} values(99999,99999,99999);"
sql "insert into ${table2} values(88888,88888,88888);"
sql "insert into ${table2} values(77777,77777,77777);"
sql "sync;"
qt_sql "select * from ${table2} order by k1;"

setBeConfigTemporary(customBeConfig) {
try {
GetDebugPoint().enableDebugPointForAllBEs("MemTable.need_flush")
GetDebugPoint().enableDebugPointForAllBEs("CloudTxnDeleteBitmapCache::get_delete_bitmap.cache_miss")

// fail in calc_delete_bitmap_between_segments
GetDebugPoint().enableDebugPointForAllBEs("_handle_rowset.inject.before.calc_between_segments")

Thread.sleep(1000)

loadMultiSegmentData(table2, 4096, false, "injected MemoryLimitExceeded error")

GetDebugPoint().clearDebugPointsForAllBEs()
Thread.sleep(2000)

qt_sql "select count() from ${table2};"

qt_dup_key_count "select count() from (select k1,count() as cnt from ${table2} group by k1 having cnt > 1) A;"
} catch(Exception e) {
logger.info(e.getMessage())
throw e
} finally {
GetDebugPoint().clearDebugPointsForAllBEs()
GetDebugPoint().clearDebugPointsForAllFEs()
}
}
}
Loading