diff --git a/be/src/common/config.h b/be/src/common/config.h index f1547f1180d7b0..ebc5c65541ead0 100644 --- a/be/src/common/config.h +++ b/be/src/common/config.h @@ -293,7 +293,11 @@ CONF_mInt64(total_permits_for_compaction_score, "10000"); CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10") // Compaction task number per disk. +// Must be greater than 2, because Base compaction and Cumulative compaction have at least one thread each. CONF_mInt32(compaction_task_num_per_disk, "2"); +CONF_Validator(compaction_task_num_per_disk, [](const int config) -> bool { + return config >= 2; +}); // How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation. CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9"); diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp index 97bbf8483a29fa..1acfe3e3866d63 100644 --- a/be/src/olap/olap_server.cpp +++ b/be/src/olap/olap_server.cpp @@ -315,7 +315,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { #endif LOG(INFO) << "try to start compaction producer process!"; - std::vector tablet_submitted; + std::map tablet_submitted; std::vector data_dirs; for (auto& tmp_store : _store_map) { data_dirs.push_back(tmp_store.second); @@ -325,18 +325,37 @@ void StorageEngine::_compaction_tasks_producer_callback() { int round = 0; CompactionType compaction_type; + // Used to record the time when the score metric was last updated. + // The update of the score metric is accompanied by the logic of selecting the tablet. + // If there is no slot available, the logic of selecting the tablet will be terminated, + // which causes the score metric update to be terminated. + // In order to avoid this situation, we need to update the score regularly. + int64_t last_cumulative_score_update_time = 0; + int64_t last_base_score_update_time = 0; + static const int64_t check_score_interval_ms = 5000; // 5 secs + int64_t interval = config::generate_compaction_tasks_min_interval_ms; do { if (!config::disable_auto_compaction) { + bool check_score = false; + int64_t cur_time = UnixMillis(); if (round < config::cumulative_compaction_rounds_for_each_base_compaction_round) { compaction_type = CompactionType::CUMULATIVE_COMPACTION; round++; + if (cur_time - last_cumulative_score_update_time >= check_score_interval_ms) { + check_score = true; + last_cumulative_score_update_time = cur_time; + } } else { compaction_type = CompactionType::BASE_COMPACTION; round = 0; + if (cur_time - last_base_score_update_time >= check_score_interval_ms) { + check_score = true; + last_base_score_update_time = cur_time; + } } std::vector tablets_compaction = - _compaction_tasks_generator(compaction_type, data_dirs); + _compaction_tasks_generator(compaction_type, data_dirs, check_score); if (tablets_compaction.size() == 0) { std::unique_lock lock(_compaction_producer_sleep_mutex); _wakeup_producer_flag = 0; @@ -356,7 +375,7 @@ void StorageEngine::_compaction_tasks_producer_callback() { int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet); if (permits > 0 && _permit_limiter.request(permits)) { // Push to _tablet_submitted_compaction before submitting task - _push_tablet_into_submitted_compaction(tablet); + _push_tablet_into_submitted_compaction(tablet, compaction_type); auto st =_compaction_thread_pool->submit_func([=]() { CgroupsMgr::apply_system_cgroup(); tablet->execute_compaction(compaction_type); @@ -384,15 +403,43 @@ void StorageEngine::_compaction_tasks_producer_callback() { } std::vector StorageEngine::_compaction_tasks_generator( - CompactionType compaction_type, std::vector data_dirs) { + CompactionType compaction_type, std::vector& data_dirs, bool check_score) { std::vector tablets_compaction; uint32_t max_compaction_score = 0; + bool need_pick_tablet = true; std::random_shuffle(data_dirs.begin(), data_dirs.end()); for (auto data_dir : data_dirs) { std::unique_lock lock(_tablet_submitted_compaction_mutex); + // We need to reserve at least one Slot for cumulative compaction. + // So when there is only one Slot, we have to judge whether there is a cumulative compaction + // in the currently submitted task. + // If so, the last Slot can be assigned to Base compaction, + // otherwise, this Slot needs to be reserved for cumulative compaction. if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) { - continue; + // Return if no available slot + need_pick_tablet = false; + if (!check_score) { + continue; + } + } else if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk - 1) { + // Only one slot left, check if it can be assigned to base compaction task. + if (compaction_type == CompactionType::BASE_COMPACTION) { + bool has_cumu_submitted = false; + for (const auto& submitted : _tablet_submitted_compaction[data_dir]) { + if (submitted.second == CompactionType::CUMULATIVE_COMPACTION) { + has_cumu_submitted = true; + break; + } + } + if (!has_cumu_submitted) { + need_pick_tablet = false; + if (!check_score) { + continue; + } + } + } } + if (!data_dir->reach_capacity_limit(0)) { uint32_t disk_max_score = 0; TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction( @@ -403,6 +450,7 @@ std::vector StorageEngine::_compaction_tasks_generator( } } } + if (!tablets_compaction.empty()) { if (compaction_type == CompactionType::BASE_COMPACTION) { DorisMetrics::instance()->tablet_base_max_compaction_score->set_value(max_compaction_score); @@ -410,24 +458,23 @@ std::vector StorageEngine::_compaction_tasks_generator( DorisMetrics::instance()->tablet_cumulative_max_compaction_score->set_value(max_compaction_score); } } + if (!need_pick_tablet) { + // This is just for updating the compaction score metric, no need to return tablet. + tablets_compaction.clear(); + } return tablets_compaction; } -void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) { +void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type) { std::unique_lock lock(_tablet_submitted_compaction_mutex); - _tablet_submitted_compaction[tablet->data_dir()].emplace_back( - tablet->tablet_id()); + _tablet_submitted_compaction[tablet->data_dir()].emplace( + tablet->tablet_id(), compaction_type); } void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) { std::unique_lock lock(_tablet_submitted_compaction_mutex); - std::vector::iterator it_tablet = - find(_tablet_submitted_compaction[tablet->data_dir()].begin(), - _tablet_submitted_compaction[tablet->data_dir()].end(), - tablet->tablet_id()); - if (it_tablet != - _tablet_submitted_compaction[tablet->data_dir()].end()) { - _tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet); + int removed = _tablet_submitted_compaction[tablet->data_dir()].erase(tablet->tablet_id()); + if (removed == 1) { std::unique_lock lock(_compaction_producer_sleep_mutex); _wakeup_producer_flag = 1; _compaction_producer_sleep_cv.notify_one(); diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h index 4019e31c9bf580..752b9fa084760f 100644 --- a/be/src/olap/storage_engine.h +++ b/be/src/olap/storage_engine.h @@ -238,8 +238,9 @@ class StorageEngine { void _compaction_tasks_producer_callback(); vector _compaction_tasks_generator(CompactionType compaction_type, - std::vector data_dirs); - void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet); + std::vector& data_dirs, + bool check_score); + void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet, CompactionType compaction_type); void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet); private: @@ -340,7 +341,7 @@ class StorageEngine { CompactionPermitLimiter _permit_limiter; std::mutex _tablet_submitted_compaction_mutex; - std::map> _tablet_submitted_compaction; + std::map> _tablet_submitted_compaction; AtomicInt32 _wakeup_producer_flag; diff --git a/be/src/olap/tablet_manager.cpp b/be/src/olap/tablet_manager.cpp index 8ad683fe375049..cb7cf44c907581 100644 --- a/be/src/olap/tablet_manager.cpp +++ b/be/src/olap/tablet_manager.cpp @@ -686,7 +686,7 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) { TabletSharedPtr TabletManager::find_best_tablet_to_compaction( CompactionType compaction_type, DataDir* data_dir, - std::vector& tablet_submitted_compaction, uint32_t* score) { + const std::map& tablet_submitted_compaction, uint32_t* score) { int64_t now_ms = UnixMillis(); const string& compaction_type_str = compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative"; @@ -698,12 +698,11 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction( ReadLock rlock(tablets_shard.lock.get()); for (const auto& tablet_map : tablets_shard.tablet_map) { for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) { - std::vector::iterator it_tablet = - find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(), - tablet_ptr->tablet_id()); - if (it_tablet != tablet_submitted_compaction.end()) { + auto search = tablet_submitted_compaction.find(tablet_ptr->tablet_id()); + if (search != tablet_submitted_compaction.end()) { continue; } + AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task(); if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED && cur_alter_task->alter_state() != ALTER_FAILED) { diff --git a/be/src/olap/tablet_manager.h b/be/src/olap/tablet_manager.h index 67742984cc13d7..ed1b547e231ff4 100644 --- a/be/src/olap/tablet_manager.h +++ b/be/src/olap/tablet_manager.h @@ -73,7 +73,7 @@ class TabletManager { TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type, DataDir* data_dir, - vector& tablet_submitted_compaction, + const std::map& tablet_submitted_compaction, uint32_t* score); TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,