Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 38 additions & 24 deletions be/src/olap/cumulative_compaction_time_series_policy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
}
int64_t compaction_file_count =
tablet->tablet_meta()->time_series_compaction_file_count_threshold();
int64_t compaction_time_threshold =
int64_t compaction_time_threshold_seconds =
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds();
int64_t earliest_rowset_creation_time = INT64_MAX;

int64_t level0_total_size = 0;
RowsetMetaSharedPtr first_meta;
Expand Down Expand Up @@ -79,6 +80,9 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
} else {
checked_rs_metas.push_back(rs_meta);
}
if (rs_meta->creation_time() < earliest_rowset_creation_time) {
earliest_rowset_creation_time = rs_meta->creation_time();
}
}
}

Expand All @@ -103,24 +107,22 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return score;
}

int64_t now = UnixMillis();
int64_t last_cumu = tablet->last_cumu_compaction_success_time();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
// current time in seconds
int64_t now = time(nullptr);

if (earliest_rowset_creation_time < now) {
int64_t cumu_interval = now - earliest_rowset_creation_time;

// Condition 3: the time interval between compactions exceeds the value specified by parameter _compaction_time_threshold_second
if (cumu_interval > (compaction_time_threshold * 1000) && score > 0) {
if (cumu_interval > compaction_time_threshold_seconds && score > 0) {
return score;
}
} else if (score > 0) {
// If the compaction process has not been successfully executed,
// the condition for triggering compaction based on the last successful compaction time (condition 3) will never be met
tablet->set_last_cumu_compaction_success_time(now);
}

if (compaction_level >= 2) {
int64_t continuous_size = 0;
std::vector<RowsetMetaSharedPtr> level1_rowsets;
int64_t earliest_level1_rowset_creation_time = INT64_MAX;
for (const auto& rs_meta : checked_rs_metas) {
if (rs_meta->compaction_level() == 0) {
break;
Expand All @@ -137,12 +139,15 @@ uint32_t TimeSeriesCumulativeCompactionPolicy::calc_cumulative_compaction_score(
return level1_rowsets.size();
}
}
if (rs_meta->creation_time() < earliest_level1_rowset_creation_time) {
earliest_level1_rowset_creation_time = rs_meta->creation_time();
}
}

// Condition 5: level1 achieve compaction_time_threshold
if (last_cumu != 0 && level1_rowsets.size() >= 2) {
int64_t cumu_interval = now - last_cumu;
if (cumu_interval > compaction_time_threshold * 10 * 1000) {
// Condition 5: level1 achieve compaction_time_threshold_seconds
if (level1_rowsets.size() >= 2) {
int64_t cumu_interval = now - earliest_level1_rowset_creation_time;
if (cumu_interval > compaction_time_threshold_seconds * 10) {
return level1_rowsets.size();
}
}
Expand Down Expand Up @@ -196,7 +201,9 @@ void TimeSeriesCumulativeCompactionPolicy::calculate_cumulative_point(
CHECK((*base_rowset_meta)->start_version() == 0);

int64_t prev_version = -1;
int64_t now = UnixSeconds();

// current time in seconds
int64_t now = time(nullptr);
for (const RowsetMetaSharedPtr& rs : existing_rss) {
if (rs->version().first > prev_version + 1) {
// There is a hole, do not continue
Expand Down Expand Up @@ -272,7 +279,7 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
}
int64_t compaction_file_count =
tablet->tablet_meta()->time_series_compaction_file_count_threshold();
int64_t compaction_time_threshold =
int64_t compaction_time_threshold_seconds =
tablet->tablet_meta()->time_series_compaction_time_threshold_seconds();

int transient_size = 0;
Expand Down Expand Up @@ -347,10 +354,17 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
}

// Condition 3: the time interval between compactions exceeds the value specified by parameter compaction_time_threshold_second
int64_t now = UnixMillis();
if (last_cumu != 0) {
int64_t cumu_interval = now - last_cumu;
if (cumu_interval > (compaction_time_threshold * 1000) && transient_size > 0) {
// current time in seconds
int64_t now = time(nullptr);
if (!input_rowsets->empty()) {
LOG_EVERY_N(INFO, 1000) << "tablet is: " << tablet->tablet_id() << ", now: " << now
<< ", earliest rowset creation time: "
<< input_rowsets->front()->rowset_meta()->creation_time()
<< ", compaction_time_threshold_seconds: "
<< compaction_time_threshold_seconds
<< ", rowset count: " << transient_size;
int64_t cumu_interval = now - input_rowsets->front()->rowset_meta()->creation_time();
if (cumu_interval > compaction_time_threshold_seconds && transient_size > 0) {
return transient_size;
}
}
Expand Down Expand Up @@ -385,10 +399,10 @@ int32_t TimeSeriesCumulativeCompactionPolicy::pick_input_rowsets(
}
})

// Condition 5: level1 achieve compaction_time_threshold
if (last_cumu != 0 && level1_rowsets.size() >= 2) {
int64_t cumu_interval = now - last_cumu;
if (cumu_interval > compaction_time_threshold * 10 * 1000) {
// Condition 5: level1 achieve compaction_time_threshold_seconds
if (level1_rowsets.size() >= 2) {
int64_t cumu_interval = now - level1_rowsets.front()->rowset_meta()->creation_time();
if (cumu_interval > compaction_time_threshold_seconds * 10) {
input_rowsets->swap(level1_rowsets);
return input_rowsets->size();
}
Expand Down
11 changes: 8 additions & 3 deletions be/test/olap/cumulative_compaction_time_series_policy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ class TestTimeSeriesCumulativeCompactionPolicy : public testing::Test {
json2pb::JsonToProtoMessage(_json_rowset_meta, &rowset_meta_pb);
rowset_meta_pb.set_start_version(start);
rowset_meta_pb.set_end_version(end);
rowset_meta_pb.set_creation_time(10000);
rowset_meta_pb.set_creation_time(time(nullptr));

pb1->init_from_pb(rowset_meta_pb);
pb1->set_total_disk_size(41);
Expand Down Expand Up @@ -536,8 +536,13 @@ TEST_F(TestTimeSeriesCumulativeCompactionPolicy, pick_input_rowsets_time_interva
new Tablet(_engine, _tablet_meta, nullptr, CUMULATIVE_TIME_SERIES_POLICY));
static_cast<void>(_tablet->init());
_tablet->calculate_cumulative_point();
int64_t now = UnixMillis();
_tablet->set_last_cumu_compaction_success_time(now - 3700 * 1000);

_tablet->_tablet_meta->set_time_series_compaction_time_threshold_seconds(1);
std::this_thread::sleep_for(std::chrono::seconds(2));

int score =
_tablet->_cumulative_compaction_policy->calc_cumulative_compaction_score(_tablet.get());
EXPECT_EQ(3, score);

auto candidate_rowsets = _tablet->pick_candidate_rowsets_to_cumulative_compaction();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,4 +120,34 @@ suite("test_time_series_compaction_polciy", "p0") {
rowsetCount = get_rowset_count.call(tablets);
assert (rowsetCount == 11 * replicaNum)
qt_sql_3 """ select count() from ${tableName}"""

sql """ DROP TABLE IF EXISTS ${tableName}; """
sql """
CREATE TABLE ${tableName} (
`id` int(11) NULL,
`name` varchar(255) NULL,
`hobbies` text NULL,
`score` int(11) NULL
) ENGINE=OLAP
DUPLICATE KEY(`id`)
COMMENT 'OLAP'
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_num" = "1",
"disable_auto_compaction" = "true",
"compaction_policy" = "time_series",
"time_series_compaction_time_threshold_seconds" = "70"
);
"""

sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """
sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """
sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """
sql """ INSERT INTO ${tableName} VALUES (1, "bason", "bason hate pear", 99); """
sql """ INSERT INTO ${tableName} VALUES (1, "andy", "andy love apple", 100); """
sql """ INSERT INTO ${tableName} VALUES (100, "andy", "andy love apple", 100); """
sql """ INSERT INTO ${tableName} VALUES (100, "bason", "bason hate pear", 99); """

Thread.sleep(75000)
trigger_and_wait_compaction(tableName, "cumulative")
}
Loading