Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 17 additions & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <random>
#include <thread>
#include <type_traits>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
Expand Down Expand Up @@ -75,6 +76,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
bool has_compaction_stats = partition.__isset.base_compaction_cnts &&
partition.__isset.cumulative_compaction_cnts &&
partition.__isset.cumulative_points;
bool has_tablet_states = partition.__isset.tablet_states;
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr = std::make_shared<CloudTabletCalcDeleteBitmapTask>(
Expand All @@ -84,6 +86,9 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i],
partition.cumulative_points[i]);
}
if (has_tablet_states) {
tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]);
}
auto submit_st = token->submit_func([=]() {
auto st = tablet_calc_delete_bitmap_ptr->handle();
if (!st.ok()) {
Expand Down Expand Up @@ -128,6 +133,9 @@ void CloudTabletCalcDeleteBitmapTask::set_compaction_stats(int64_t ms_base_compa
_ms_cumulative_compaction_cnt = ms_cumulative_compaction_cnt;
_ms_cumulative_point = ms_cumulative_point;
}
void CloudTabletCalcDeleteBitmapTask::set_tablet_state(int64_t tablet_state) {
_ms_tablet_state = tablet_state;
}

Status CloudTabletCalcDeleteBitmapTask::handle() const {
VLOG_DEBUG << "start calculate delete bitmap on tablet " << _tablet_id;
Expand All @@ -146,7 +154,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
int64_t max_version = tablet->max_version_unlocked();
int64_t t2 = MonotonicMicros();

auto should_sync_rowsets_produced_by_compaction = [&]() {
auto should_sync_rowsets = [&]() {
if (_version != max_version + 1) {
return true;
}
if (_ms_base_compaction_cnt == -1) {
return true;
}
Expand All @@ -156,9 +167,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_lock rlock(tablet->get_header_lock());
return _ms_base_compaction_cnt > tablet->base_compaction_cnt() ||
_ms_cumulative_compaction_cnt > tablet->cumulative_compaction_cnt() ||
_ms_cumulative_point > tablet->cumulative_layer_point();
_ms_cumulative_point > tablet->cumulative_layer_point() ||
(_ms_tablet_state.has_value() &&
_ms_tablet_state.value() != // an SC job finished on other BEs during this load job
static_cast<std::underlying_type_t<TabletState>>(tablet->tablet_state()));
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
if (should_sync_rowsets()) {
auto sync_st = tablet->sync_rowsets();
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <memory>
#include <optional>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
Expand All @@ -39,6 +40,7 @@ class CloudTabletCalcDeleteBitmapTask {

void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t ms_cumulative_compaction_cnt,
int64_t ms_cumulative_point);
void set_tablet_state(int64_t tablet_state);

Status handle() const;

Expand All @@ -53,6 +55,7 @@ class CloudTabletCalcDeleteBitmapTask {
int64_t _ms_base_compaction_cnt {-1};
int64_t _ms_cumulative_compaction_cnt {-1};
int64_t _ms_cumulative_point {-1};
std::optional<int64_t> _ms_tablet_state;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

Expand Down
34 changes: 31 additions & 3 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,7 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
}

for (const auto& tablet_idx : request->tablet_indexes()) {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
Expand Down Expand Up @@ -2343,16 +2344,43 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());

// 2. get tablet states
std::string tablet_meta_key =
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string tablet_meta_val;
err = txn->get(tablet_meta_key, &tablet_meta_val);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
<< " key=" << hex(tablet_meta_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}

read_stats_sw.pause();
LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);
LOG(INFO) << fmt::format(
"tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);

DeleteBitmapUpdateLockPB lock_info_tmp;
if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(),
request->initiator(), lock_key, lock_info_tmp)) {
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id="
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet "
"states, table_id="
<< table_id << " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " code=" << code
<< " msg=" << msg;
Expand Down
23 changes: 17 additions & 6 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const s
commit_txn(meta_service, db_id, txn_id, label);
}

static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instance_id,
static void add_tablet_metas(MetaServiceProxy* meta_service, std::string instance_id,
int64_t table_id, int64_t index_id,
const std::vector<std::array<int64_t, 2>>& tablet_idxes) {
std::unique_ptr<Transaction> txn;
Expand All @@ -293,6 +293,17 @@ static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instanc
stats.set_cumulative_compaction_cnt(20);
stats.set_cumulative_point(30);
txn->put(stats_key, stats.SerializeAsString());

doris::TabletMetaCloudPB tablet_pb;
tablet_pb.set_table_id(table_id);
tablet_pb.set_index_id(index_id);
tablet_pb.set_partition_id(partition_id);
tablet_pb.set_tablet_id(tablet_id);
tablet_pb.set_tablet_state(doris::TabletStatePB::PB_RUNNING);
auto tablet_meta_key =
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
auto tablet_meta_val = tablet_pb.SerializeAsString();
txn->put(tablet_meta_key, tablet_meta_val);
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
Expand Down Expand Up @@ -4659,7 +4670,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsNormal) {
// [(partition_id, tablet_id)]
std::vector<std::array<int64_t, 2>> tablet_idxes {{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, tablet_idxes,
Expand Down Expand Up @@ -4713,7 +4724,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4754,7 +4765,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4796,7 +4807,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4841,7 +4852,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) {
tablet_idxes.push_back({partition_id, tablet_id});
}

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,17 +780,26 @@ private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> getCalcDeleteBitmapInfo(
if (!lockContext.getBaseCompactionCnts().isEmpty()
&& !lockContext.getCumulativeCompactionCnts().isEmpty()
&& !lockContext.getCumulativePoints().isEmpty()) {
boolean hasTabletStats = !lockContext.getTabletStates().isEmpty();

List<Long> reqBaseCompactionCnts = Lists.newArrayList();
List<Long> reqCumulativeCompactionCnts = Lists.newArrayList();
List<Long> reqCumulativePoints = Lists.newArrayList();
List<Long> reqTabletStates = Lists.newArrayList();
for (long tabletId : tabletList) {
reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId));
reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId));
reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId));
if (hasTabletStats) {
reqTabletStates.add(lockContext.getTabletStates().get(tabletId));
}
}
partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
partitionInfo.setCumulativePoints(reqCumulativePoints);
if (hasTabletStats) {
partitionInfo.setTabletStates(reqTabletStates);
}
}
partitionInfos.add(partitionInfo);
}
Expand Down Expand Up @@ -917,19 +926,24 @@ private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable> mowTa
List<Long> respBaseCompactionCnts = response.getBaseCompactionCntsList();
List<Long> respCumulativeCompactionCnts = response.getCumulativeCompactionCntsList();
List<Long> respCumulativePoints = response.getCumulativePointsList();
List<Long> respTabletStates = response.getTabletStatesList();
int size1 = respBaseCompactionCnts.size();
int size2 = respCumulativeCompactionCnts.size();
int size3 = respCumulativePoints.size();
if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) {
int size4 = respTabletStates.size();
if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()
|| (size4 > 0 && size4 != tabletList.size())) {
throw new UserException("The size of returned compaction cnts can't match the size of tabletList, "
+ "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1
+ ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3);
+ ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3
+ ", respTabletStates.size()=" + size4);
}
for (int i = 0; i < tabletList.size(); i++) {
long tabletId = tabletList.get(i);
lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i));
lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i));
lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i));
lockContext.getTabletStates().put(tabletId, respTabletStates.get(i));
}
totalRetryTime += retryTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DeleteBitmapUpdateLockContext {
private Map<Long, Long> baseCompactionCnts;
private Map<Long, Long> cumulativeCompactionCnts;
private Map<Long, Long> cumulativePoints;
private Map<Long, Long> tabletStates;
private Map<Long, Set<Long>> tableToPartitions;
private Map<Long, Partition> partitions;
private Map<Long, Map<Long, List<Long>>> backendToPartitionTablets;
Expand All @@ -40,6 +41,7 @@ public DeleteBitmapUpdateLockContext() {
baseCompactionCnts = Maps.newHashMap();
cumulativeCompactionCnts = Maps.newHashMap();
cumulativePoints = Maps.newHashMap();
tabletStates = Maps.newHashMap();
tableToPartitions = Maps.newHashMap();
partitions = Maps.newHashMap();
backendToPartitionTablets = Maps.newHashMap();
Expand All @@ -63,6 +65,10 @@ public Map<Long, Long> getCumulativePoints() {
return cumulativePoints;
}

public Map<Long, Long> getTabletStates() {
return tabletStates;
}

public Map<Long, Map<Long, List<Long>>> getBackendToPartitionTablets() {
return backendToPartitionTablets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.cooldown.CooldownConf;
Expand Down Expand Up @@ -593,7 +594,22 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
LOG.info("finished to handle tablet report from backend[{}] cost: {} ms", backendId, (end - start));
}

private static void debugBlock() {
if (DebugPointUtil.isEnable("ReportHandler.block")) {
LOG.info("debug point: block at ReportHandler.block");
while (DebugPointUtil.isEnable("ReportHandler.block")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("error ", e);
}
}
LOG.info("debug point: leave ReportHandler.block");
}
}

private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
debugBlock();
if (LOG.isDebugEnabled()) {
LOG.debug("begin to handle task report from backend {}", backendId);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,7 @@ message GetDeleteBitmapUpdateLockResponse {
repeated int64 base_compaction_cnts = 2;
repeated int64 cumulative_compaction_cnts = 3;
repeated int64 cumulative_points = 4;
repeated int64 tablet_states = 5;
}

message RemoveDeleteBitmapUpdateLockRequest {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ struct TCalcDeleteBitmapPartitionInfo {
4: optional list<i64> base_compaction_cnts
5: optional list<i64> cumulative_compaction_cnts
6: optional list<i64> cumulative_points
7: optional list<i64> sub_txn_ids
8: optional list<i64> tablet_states
}

struct TCalcDeleteBitmapRequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 1 1
2 2 2
3 3 3
10 88 88

-- !dup_key_count --

-- !sql --
1 \N 99
2 2 2
3 3 3
10 \N 88

Loading
Loading