Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions be/src/cloud/cloud_cumulative_compaction_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,17 @@ int64_t CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point(
int64_t last_cumulative_point) {
TEST_INJECTION_POINT_RETURN_WITH_VALUE("new_cumulative_point", int64_t(0), output_rowset.get(),
last_cumulative_point);
DBUG_EXECUTE_IF("CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
auto cumu_point = dp->param<int64_t>("cumu_point", -1);
if (target_tablet_id == tablet->tablet_id() && cumu_point != -1) {
LOG_INFO(
"[CloudSizeBasedCumulativeCompactionPolicy::new_cumulative_point] "
"tablet_id={}, cumu_point={}",
target_tablet_id, cumu_point);
return cumu_point;
}
});
// for MoW table, if there's too many versions, the delete bitmap will grow to
// a very big size, which may cause the tablet meta too big and the `save_meta`
// operation too slow.
Expand Down
11 changes: 11 additions & 0 deletions be/src/cloud/cloud_full_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,16 @@ Status CloudFullCompaction::pick_rowsets_to_compact() {
}

Status CloudFullCompaction::execute_compact() {
DBUG_EXECUTE_IF("CloudFullCompaction::execute_compact.block", {
auto target_tablet_id = dp->param<int64_t>("tablet_id", -1);
LOG_INFO(
"[verbose] CloudFullCompaction::execute_compact.block, target_tablet_id={}, "
"tablet_id={}",
target_tablet_id, cloud_tablet()->tablet_id());
if (target_tablet_id == cloud_tablet()->tablet_id()) {
DBUG_BLOCK;
}
});
TEST_SYNC_POINT_RETURN_WITH_VALUE("CloudFullCompaction::execute_compact_impl", Status::OK(),
this);
#ifndef __APPLE__
Expand Down Expand Up @@ -272,6 +282,7 @@ Status CloudFullCompaction::modify_rowsets() {
cloud_tablet()->delete_rowsets(_input_rowsets, wrlock);
cloud_tablet()->add_rowsets({_output_rowset}, false, wrlock);
cloud_tablet()->set_base_compaction_cnt(stats.base_compaction_cnt());
cloud_tablet()->set_full_compaction_cnt(stats.full_compaction_cnt());
cloud_tablet()->set_cumulative_layer_point(stats.cumulative_point());
if (output_rowset_delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap()->merge(*output_rowset_delete_bitmap);
Expand Down
2 changes: 2 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
}
req.set_base_compaction_cnt(tablet->base_compaction_cnt());
req.set_cumulative_compaction_cnt(tablet->cumulative_compaction_cnt());
req.set_full_compaction_cnt(tablet->full_compaction_cnt());
req.set_cumulative_point(tablet->cumulative_layer_point());
}
req.set_end_version(-1);
Expand Down Expand Up @@ -772,6 +773,7 @@ Status CloudMetaMgr::sync_tablet_rowsets_unlocked(CloudTablet* tablet,
tablet->last_cumu_compaction_success_time_ms = stats.last_cumu_compaction_time_ms();
tablet->set_base_compaction_cnt(stats.base_compaction_cnt());
tablet->set_cumulative_compaction_cnt(stats.cumulative_compaction_cnt());
tablet->set_full_compaction_cnt(stats.full_compaction_cnt());
tablet->set_cumulative_layer_point(stats.cumulative_point());
tablet->reset_approximate_stats(stats.num_rowsets(), stats.num_segments(),
stats.num_rows(), stats.data_size());
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,14 @@ class CloudTablet final : public BaseTablet {
int64_t max_version_unlocked() const override { return _max_version; }
int64_t base_compaction_cnt() const { return _base_compaction_cnt; }
int64_t cumulative_compaction_cnt() const { return _cumulative_compaction_cnt; }
int64_t full_compaction_cnt() const { return _full_compaction_cnt; }
int64_t cumulative_layer_point() const {
return _cumulative_point.load(std::memory_order_relaxed);
}

void set_base_compaction_cnt(int64_t cnt) { _base_compaction_cnt = cnt; }
void set_cumulative_compaction_cnt(int64_t cnt) { _cumulative_compaction_cnt = cnt; }
void set_full_compaction_cnt(int64_t cnt) { _full_compaction_cnt = cnt; }
void set_cumulative_layer_point(int64_t new_point);

int64_t last_cumu_compaction_failure_time() { return _last_cumu_compaction_failure_millis; }
Expand Down Expand Up @@ -322,6 +324,7 @@ class CloudTablet final : public BaseTablet {

int64_t _base_compaction_cnt = 0;
int64_t _cumulative_compaction_cnt = 0;
int64_t _full_compaction_cnt = 0;
int64_t _max_version = -1;
int64_t _base_size = 0;
int64_t _alter_version = -1;
Expand Down
22 changes: 17 additions & 5 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ void internal_get_rowset(Transaction* txn, int64_t start, int64_t end,
std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t req_bc_cnt, int64_t bc_cnt,
int64_t req_cc_cnt, int64_t cc_cnt,
int64_t req_cp, int64_t cp,
int64_t req_fc_cnt, int64_t fc_cnt,
int64_t req_start, int64_t req_end) {
using Version = std::pair<int64_t, int64_t>;
// combine `v1` `v2` to `v1`, return true if success
Expand All @@ -1628,8 +1629,8 @@ std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t req_bc_cnt,

if (req_cc_cnt < cc_cnt) {
Version cc_version;
if (req_cp < cp && req_cc_cnt + 1 == cc_cnt) {
// * only one CC happened and CP changed
if (req_cp < cp && req_cc_cnt + 1 == cc_cnt && req_fc_cnt == fc_cnt) {
// * only one CC happened and CP changed, and no full compaction happened
// BE [=][=][=][=][=====][=][=]
// ^~~~~ req_cp
// MS [=][=][=][=][xxxxxxxxxxxxxx][=======][=][=]
Expand All @@ -1653,6 +1654,13 @@ std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(int64_t req_bc_cnt,
// ^_____________________^ related_versions: [req_cp, max]
// there may be holes if we don't return all version
// after ms_cp, however it can be optimized.
// * one CC happened and CP changed, and full compaction happened
// BE [=][=][=][=][=][=][=][=][=][=]
// ^~~~~ req_cp
// MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
// ^~~~~~~ ms_cp
// ^___________________________^ related_versions: [req_cp, max]
//
cc_version = {req_cp, std::numeric_limits<int64_t>::max() - 1};
}
if (versions.empty() || !combine_if_overlapping(versions.front(), cc_version)) {
Expand Down Expand Up @@ -1723,6 +1731,7 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
}
int64_t req_bc_cnt = request->base_compaction_cnt();
int64_t req_cc_cnt = request->cumulative_compaction_cnt();
int64_t req_fc_cnt = request->has_full_compaction_cnt() ? request->full_compaction_cnt() : 0;
int64_t req_cp = request->cumulative_point();

do {
Expand Down Expand Up @@ -1807,6 +1816,8 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,

int64_t bc_cnt = tablet_stat.base_compaction_cnt();
int64_t cc_cnt = tablet_stat.cumulative_compaction_cnt();
int64_t fc_cnt =
tablet_stat.has_full_compaction_cnt() ? tablet_stat.full_compaction_cnt() : 0;
int64_t cp = tablet_stat.cumulative_point();

response->mutable_stats()->CopyFrom(tablet_stat);
Expand All @@ -1818,17 +1829,18 @@ void MetaServiceImpl::get_rowset(::google::protobuf::RpcController* controller,
//==========================================================================
// Find version ranges to be synchronized due to compaction
//==========================================================================
if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp) {
if (req_bc_cnt > bc_cnt || req_cc_cnt > cc_cnt || req_cp > cp || req_fc_cnt > fc_cnt) {
code = MetaServiceCode::INVALID_ARGUMENT;
ss << "no valid compaction_cnt or cumulative_point given. req_bc_cnt=" << req_bc_cnt
<< ", bc_cnt=" << bc_cnt << ", req_cc_cnt=" << req_cc_cnt << ", cc_cnt=" << cc_cnt
<< ", req_cp=" << req_cp << ", cp=" << cp << " tablet_id=" << tablet_id;
<< " req_fc_cnt=" << req_fc_cnt << ", fc_cnt=" << fc_cnt << ", req_cp=" << req_cp
<< ", cp=" << cp << " tablet_id=" << tablet_id;
msg = ss.str();
LOG(WARNING) << msg;
return;
}
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
req_start, req_end);
req_fc_cnt, fc_cnt, req_start, req_end);
for (auto [start, end] : versions) {
internal_get_rowset(txn.get(), start, end, instance_id, tablet_id, code, msg, response);
if (code != MetaServiceCode::OK) {
Expand Down
1 change: 1 addition & 0 deletions cloud/src/meta-service/meta_service_job.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ void process_compaction_job(MetaServiceCode& code, std::string& msg, std::string
} else if (compaction.type() == TabletCompactionJobPB::FULL) {
// clang-format off
stats->set_base_compaction_cnt(stats->base_compaction_cnt() + 1);
stats->set_full_compaction_cnt(stats->has_full_compaction_cnt() ? stats->full_compaction_cnt() + 1 : 1);
if (compaction.output_cumulative_point() > stats->cumulative_point()) {
// After supporting parallel cumu compaction, compaction with older cumu point may be committed after
// new cumu point has been set, MUST NOT set cumu point back to old value
Expand Down
55 changes: 43 additions & 12 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3781,7 +3781,7 @@ TEST(MetaServiceTest, FilterCopyFilesTest) {

extern std::vector<std::pair<int64_t, int64_t>> calc_sync_versions(
int64_t req_bc_cnt, int64_t bc_cnt, int64_t req_cc_cnt, int64_t cc_cnt, int64_t req_cp,
int64_t cp, int64_t req_start, int64_t req_end);
int64_t cp, int64_t req_fc_cnt, int64_t fc_cnt, int64_t req_start, int64_t req_end);

TEST(MetaServiceTest, CalcSyncVersionsTest) {
using Versions = std::vector<std::pair<int64_t, int64_t>>;
Expand All @@ -3797,7 +3797,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{8, 12}}));
}
Expand All @@ -3813,7 +3813,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 12}})); // [5, 9] v [8, 12]
}
Expand All @@ -3822,7 +3822,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, 14}})); // [5, 14] v [8, 12]
}
Expand All @@ -3839,7 +3839,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12]
}
Expand All @@ -3855,7 +3855,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12]
}
Expand All @@ -3870,7 +3870,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 3};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{5, INT64_MAX - 1}})); // [5, max] v [8, 12]
}
Expand All @@ -3886,7 +3886,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 4}, {8, 12}}));
}
Expand All @@ -3895,7 +3895,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 1};
auto [req_cp, cp] = std::tuple {8, 8};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 7] v [8, 12]
}
Expand All @@ -3904,7 +3904,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 10};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 12}})); // [0, 4] v [5, 9] v [8, 12]
}
Expand All @@ -3913,7 +3913,7 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 15};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, 14}})); // [0, 4] v [5, 14] v [8, 12]
}
Expand All @@ -3922,11 +3922,42 @@ TEST(MetaServiceTest, CalcSyncVersionsTest) {
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {5, 5};
auto [req_start, req_end] = std::tuple {8, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp,
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
// [0, 4] v [5, max] v [8, 12]
ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
}

{
// when there exists full compaction, we can't optimize by "* only one CC happened and CP changed"

// * one CC happened and CP changed, and full compaction happened
// BE [=][=][=][=][=][=][=][=][=][=]
// ^~~~~ req_cp
// MS [xxxxxxxxxxxxxx][xxxxxxxxxxxxxx][=======][=][=]
// ^~~~~~~ ms_cp
// ^___________________________^ related_versions: [req_cp, max]
//
auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {4, 7};
auto [req_start, req_end] = std::tuple {9, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 1,
req_start, req_end);
ASSERT_EQ(versions, (Versions {{0, INT64_MAX - 1}}));
}

{
// abnormal case:
auto [req_bc_cnt, bc_cnt] = std::tuple {0, 1};
auto [req_cc_cnt, cc_cnt] = std::tuple {1, 2};
auto [req_cp, cp] = std::tuple {4, 7};
auto [req_start, req_end] = std::tuple {9, 12};
auto versions = calc_sync_versions(req_bc_cnt, bc_cnt, req_cc_cnt, cc_cnt, req_cp, cp, 0, 0,
req_start, req_end);
// when not considering full compaction, the returned versions is wrong becasue rowsets in [7-8] are missed
ASSERT_EQ(versions, (Versions {{0, 6}, {9, 12}}));
}
}

TEST(MetaServiceTest, StageTest) {
Expand Down
3 changes: 2 additions & 1 deletion gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ message TabletStatsPB {
optional int64 cumulative_point = 9;
optional int64 last_base_compaction_time_ms = 10;
optional int64 last_cumu_compaction_time_ms = 11;
optional int64 full_compaction_cnt = 12;
optional int64 full_compaction_cnt = 12; // used by calc_sync_versions() only
optional int64 last_full_compaction_time_ms = 13;
optional int64 index_size = 14;
optional int64 segment_size = 15;
Expand Down Expand Up @@ -1043,6 +1043,7 @@ message GetRowsetRequest {
// for compability reason we use FILL_WITH_DICT as default
optional SchemaOp schema_op = 8 [default = FILL_WITH_DICT];
optional string request_ip = 9;
optional int64 full_compaction_cnt = 10;
}

message GetRowsetResponse {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 10
2 20
3 30

-- !sql --
1 10
2 20
3 30

-- !write_cluster_new_write --
1 60
2 70
3 30
4 40
5 50

-- !read_cluster_query --
1 60
2 70
3 30
4 40
5 50

-- !write_cluster_full_compaction --
1 60
2 70
3 30
4 40
5 50

-- !write_cluster_cumu_compaction --
1 60
2 70
3 30
4 40
5 50

-- !write_cluster_new_write --
1 80
2 70
3 30
4 40
5 50

-- !read_cluster_check_dup_key --

-- !read_cluster_res --
1 80
2 70
3 30
4 40
5 50

Loading
Loading