Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,11 @@ CONF_mInt64(total_permits_for_compaction_score, "10000");
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")

// Compaction task number per disk.
// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each.
CONF_mInt32(compaction_task_num_per_disk, "2");
CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool {
return config >= 2;
});

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand Down
77 changes: 62 additions & 15 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
#endif
LOG(INFO) << "try to start compaction producer process!";

std::vector<TTabletId> tablet_submitted;
std::map<TTabletId, CompactionType> tablet_submitted;
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
Expand All @@ -325,18 +325,37 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int round = 0;
CompactionType compaction_type;

// Used to record the time when the score metric was last updated.
// The update of the score metric is accompanied by the logic of selecting the tablet.
// If there is no slot available, the logic of selecting the tablet will be terminated,
// which causes the score metric update to be terminated.
// In order to avoid this situation, we need to update the score regularly.
int64_t last_cumulative_score_update_time = 0;
int64_t last_base_score_update_time = 0;
static const int64_t check_score_interval_ms = 5000; // 5 secs

int64_t interval = config::generate_compaction_tasks_min_interval_ms;
do {
if (!config::disable_auto_compaction) {
bool check_score = false;
int64_t cur_time = UnixMillis();
if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) {
compaction_type = CompactionType::CUMULATIVE_COMPACTION;
round++;
if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) {
check_score = true;
last_cumulative_score_update_time = cur_time;
}
} else {
compaction_type = CompactionType::BASE_COMPACTION;
round = 0;
if (cur_time - last_base_score_update_time >= check_score_interval_ms) {
check_score = true;
last_base_score_update_time = cur_time;
}
}
std::vector<TabletSharedPtr> tablets_compaction =
_compaction_tasks_generator(compaction_type, data_dirs);
_compaction_tasks_generator(compaction_type, data_dirs, check_score);
if (tablets_compaction.size() == 0) {
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 0;
Expand All @@ -356,7 +375,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
if (permits > 0 && _permit_limiter.request(permits)) {
// Push to _tablet_submitted_compaction before submitting task
_push_tablet_into_submitted_compaction(tablet);
_push_tablet_into_submitted_compaction(tablet, compaction_type);
auto st =_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
Expand Down Expand Up @@ -384,15 +403,43 @@ void StorageEngine::_compaction_tasks_producer_callback() {
}

std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
CompactionType compaction_type, std::vector<DataDir*>& data_dirs, bool check_score) {
std::vector<TabletSharedPtr> tablets_compaction;
uint32_t max_compaction_score = 0;
bool need_pick_tablet = true;
std::random_shuffle(data_dirs.begin(), data_dirs.end());
for (auto data_dir : data_dirs) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
// We need to reserve at least one Slot for cumulative compaction.
// So when there is only one Slot, we have to judge whether there is a cumulative compaction
// in the currently submitted task.
// If so, the last Slot can be assigned to Base compaction,
// otherwise, this Slot needs to be reserved for cumulative compaction.
if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) {
continue;
// Return if no available slot
need_pick_tablet = false;
if (!check_score) {
continue;
}
} else if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk - 1) {
// Only one slot left, check if it can be assigned to base compaction task.
if (compaction_type == CompactionType::BASE_COMPACTION) {
bool has_cumu_submitted = false;
for (const auto& submitted : _tablet_submitted_compaction[data_dir]) {
if (submitted.second == CompactionType::CUMULATIVE_COMPACTION) {
has_cumu_submitted = true;
break;
}
}
if (!has_cumu_submitted) {
need_pick_tablet = false;
if (!check_score) {
continue;
}
}
}
}

if (!data_dir->reach_capacity_limit(0)) {
uint32_t disk_max_score = 0;
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
Expand All @@ -403,31 +450,31 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
}
}
}

if (!tablets_compaction.empty()) {
if (compaction_type == CompactionType::BASE_COMPACTION) {
DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(max_compaction_score);
} else {
DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(max_compaction_score);
}
}
if (!need_pick_tablet) {
// This is just for updating the compaction score metric, no need to return tablet.
tablets_compaction.clear();
}
return tablets_compaction;
}

void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) {
void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
tablet->tablet_id());
_tablet_submitted_compaction[tablet->data_dir()].emplace(
tablet->tablet_id(), compaction_type);
}

void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
int removed = _tablet_submitted_compaction[tablet->data_dir()].erase(tablet->tablet_id());
if (removed == 1) {
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
Expand Down
7 changes: 4 additions & 3 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,9 @@ class StorageEngine {

void _compaction_tasks_producer_callback();
vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType compaction_type,
std::vector<DataDir*> data_dirs);
void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
std::vector<DataDir*>& data_dirs,
bool check_score);
void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type);
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);

private:
Expand Down Expand Up @@ -340,7 +341,7 @@ class StorageEngine {
CompactionPermitLimiter _permit_limiter;

std::mutex _tablet_submitted_compaction_mutex;
std::map<DataDir*, vector<TTabletId>> _tablet_submitted_compaction;
std::map<DataDir*, std::map<TTabletId, CompactionType>> _tablet_submitted_compaction;

AtomicInt32 _wakeup_producer_flag;

Expand Down
9 changes: 4 additions & 5 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {

TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
std::vector<TTabletId>& tablet_submitted_compaction, uint32_t* score) {
const std::map<TTabletId, CompactionType>& tablet_submitted_compaction, uint32_t* score) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
Expand All @@ -698,12 +698,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
ReadLock rlock(tablets_shard.lock.get());
for (const auto& tablet_map : tablets_shard.tablet_map) {
for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
std::vector<TTabletId>::iterator it_tablet =
find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(),
tablet_ptr->tablet_id());
if (it_tablet != tablet_submitted_compaction.end()) {
auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id());
if (search != tablet_submitted_compaction.end()) {
continue;
}

AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task();
if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED &&
cur_alter_task->alter_state() != ALTER_FAILED) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class TabletManager {

TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
DataDir* data_dir,
vector<TTabletId>& tablet_submitted_compaction,
const std::map<TTabletId, CompactionType>& tablet_submitted_compaction,
uint32_t* score);

TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
Expand Down