Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,8 +283,10 @@ CONF_mInt32(cumulative_compaction_skip_window_seconds, "30");
CONF_mInt64(min_compaction_failure_interval_sec, "600"); // 10 min

// This config can be set to limit thread number in compaction thread pool.
CONF_mInt32(min_compaction_threads, "10");
CONF_mInt32(max_compaction_threads, "10");
CONF_mInt32(min_base_compaction_threads, "10");
CONF_mInt32(max_base_compaction_threads, "10");
CONF_mInt32(min_cumulative_compaction_threads, "10");
CONF_mInt32(max_cumulative_compaction_threads, "10");
Comment on lines +286 to +289
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not mutable, use CONF_Int32 instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not mutable, use CONF_Int32 instead.

OK, thanks.


// The upper limit of "permits" held by all compaction tasks. This config can be set to limit memory consumption for compaction.
CONF_mInt64(total_permits_for_compaction_score, "10000");
Expand All @@ -293,7 +295,8 @@ CONF_mInt64(total_permits_for_compaction_score, "10000");
CONF_mInt32(generate_compaction_tasks_min_interval_ms, "10")

// Compaction task number per disk.
CONF_mInt32(compaction_task_num_per_disk, "2");
CONF_mInt32(max_base_compaction_task_num_per_disk, "2");
CONF_mInt32(max_cumulative_compaction_task_num_per_disk, "2");

// How many rounds of cumulative compaction for each round of base compaction when compaction tasks generation.
CONF_mInt32(cumulative_compaction_rounds_for_each_base_compaction_round, "9");
Expand Down
151 changes: 102 additions & 49 deletions be/src/olap/olap_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,19 @@ Status StorageEngine::start_bg_threads() {
// check cumulative compaction config
_check_cumulative_compaction_config();

int32_t max_thread_num = config::max_compaction_threads;
int32_t min_thread_num = config::min_compaction_threads;
ThreadPoolBuilder("CompactionTaskThreadPool")
.set_min_threads(min_thread_num)
.set_max_threads(max_thread_num)
.build(&_compaction_thread_pool);
int32_t max_base_thread_num = config::max_base_compaction_threads;
int32_t min_base_thread_num = config::min_base_compaction_threads;
ThreadPoolBuilder("BaseCompactionTaskThreadPool")
.set_min_threads(min_base_thread_num)
.set_max_threads(max_base_thread_num)
.build(&_base_compaction_thread_pool);

int32_t max_cumulative_thread_num = config::max_cumulative_compaction_threads;
int32_t min_cumulative_thread_num = config::min_cumulative_compaction_threads;
ThreadPoolBuilder("CumulativeCompactionTaskThreadPool")
.set_min_threads(min_cumulative_thread_num)
.set_max_threads(max_cumulative_thread_num)
.build(&_cumulative_compaction_thread_pool);

// compaction tasks producer thread
RETURN_IF_ERROR(Thread::create(
Expand Down Expand Up @@ -315,11 +322,18 @@ void StorageEngine::_compaction_tasks_producer_callback() {
#endif
LOG(INFO) << "try to start compaction producer process!";

std::vector<TTabletId> tablet_submitted;
std::set<TTabletId> tablet_submitted;
std::vector<DataDir*> data_dirs;
for (auto& tmp_store : _store_map) {
data_dirs.push_back(tmp_store.second);
_tablet_submitted_compaction[tmp_store.second] = tablet_submitted;
{
WriteLock wr_lock(&_tablet_submitted_base_compaction_mutex);
_tablet_submitted_base_compaction[tmp_store.second] = tablet_submitted;
}
{
WriteLock wr_lock(&_tablet_submitted_cumulative_compaction_mutex);
_tablet_submitted_cumulative_compaction[tmp_store.second] = tablet_submitted;
}
}

int round = 0;
Expand All @@ -335,6 +349,7 @@ void StorageEngine::_compaction_tasks_producer_callback() {
compaction_type = CompactionType::BASE_COMPACTION;
round = 0;
}

std::vector<TabletSharedPtr> tablets_compaction =
_compaction_tasks_generator(compaction_type, data_dirs);
if (tablets_compaction.size() == 0) {
Expand All @@ -355,21 +370,40 @@ void StorageEngine::_compaction_tasks_producer_callback() {
for (const auto& tablet : tablets_compaction) {
int64_t permits = tablet->prepare_compaction_and_calculate_permits(compaction_type, tablet);
if (permits > 0 && _permit_limiter.request(permits)) {
// Push to _tablet_submitted_compaction before submitting task
_push_tablet_into_submitted_compaction(tablet);
auto st =_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
_pop_tablet_from_submitted_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
_pop_tablet_from_submitted_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
if (compaction_type == CompactionType::BASE_COMPACTION) {
// Push to _tablet_submitted_base_compaction before submitting task
_push_tablet_into_submitted_base_compaction(tablet);
auto st =_base_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
_pop_tablet_from_submitted_base_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
_pop_tablet_from_submitted_base_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
}
} else {
// Push to _tablet_submitted_cumulative_compaction before submitting task
_push_tablet_into_submitted_cumulative_compaction(tablet);
auto st =_cumulative_compaction_thread_pool->submit_func([=]() {
CgroupsMgr::apply_system_cgroup();
tablet->execute_compaction(compaction_type);
_permit_limiter.release(permits);
_pop_tablet_from_submitted_cumulative_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
});
if (!st.ok()) {
_permit_limiter.release(permits);
_pop_tablet_from_submitted_cumulative_compaction(tablet);
// reset compaction
tablet->reset_compaction(compaction_type);
}
}
} else {
// reset compaction
Expand All @@ -387,41 +421,60 @@ std::vector<TabletSharedPtr> StorageEngine::_compaction_tasks_generator(
CompactionType compaction_type, std::vector<DataDir*> data_dirs) {
std::vector<TabletSharedPtr> tablets_compaction;
std::random_shuffle(data_dirs.begin(), data_dirs.end());
for (auto data_dir : data_dirs) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
if (_tablet_submitted_compaction[data_dir].size() >= config::compaction_task_num_per_disk) {
continue;
if (compaction_type == CompactionType::BASE_COMPACTION) {
for (auto data_dir : data_dirs) {
ReadLock rd_lock(&_tablet_submitted_base_compaction_mutex);
if (_tablet_submitted_base_compaction[data_dir].size() >= config::max_base_compaction_task_num_per_disk) {
continue;
}
if (!data_dir->reach_capacity_limit(0)) {
ReadLock rd_lock(&_tablet_submitted_cumulative_compaction_mutex);
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
compaction_type, data_dir, &_tablet_submitted_base_compaction[data_dir],
&_tablet_submitted_cumulative_compaction[data_dir]);
if (tablet != nullptr) {
tablets_compaction.emplace_back(tablet);
}
}
}
if (!data_dir->reach_capacity_limit(0)) {
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
compaction_type, data_dir, _tablet_submitted_compaction[data_dir]);
if (tablet != nullptr) {
tablets_compaction.emplace_back(tablet);
} else {
for (auto data_dir : data_dirs) {
ReadLock rd_lock(&_tablet_submitted_cumulative_compaction_mutex);
if (_tablet_submitted_cumulative_compaction[data_dir].size() >= config::max_cumulative_compaction_task_num_per_disk) {
continue;
}
if (!data_dir->reach_capacity_limit(0)) {
ReadLock rd_lock(&_tablet_submitted_base_compaction_mutex);
TabletSharedPtr tablet = _tablet_manager->find_best_tablet_to_compaction(
compaction_type, data_dir, &_tablet_submitted_base_compaction[data_dir],
&_tablet_submitted_cumulative_compaction[data_dir]);
if (tablet != nullptr) {
tablets_compaction.emplace_back(tablet);
}
}
}
}
return tablets_compaction;
}

void StorageEngine::_push_tablet_into_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
_tablet_submitted_compaction[tablet->data_dir()].emplace_back(
tablet->tablet_id());
void StorageEngine::_push_tablet_into_submitted_base_compaction(TabletSharedPtr tablet) {
WriteLock wr_lock(&_tablet_submitted_base_compaction_mutex);
_tablet_submitted_base_compaction[tablet->data_dir()].insert(tablet->tablet_id());
}

void StorageEngine::_pop_tablet_from_submitted_compaction(TabletSharedPtr tablet) {
std::unique_lock<std::mutex> lock(_tablet_submitted_compaction_mutex);
std::vector<TTabletId>::iterator it_tablet =
find(_tablet_submitted_compaction[tablet->data_dir()].begin(),
_tablet_submitted_compaction[tablet->data_dir()].end(),
tablet->tablet_id());
if (it_tablet !=
_tablet_submitted_compaction[tablet->data_dir()].end()) {
_tablet_submitted_compaction[tablet->data_dir()].erase(it_tablet);
std::unique_lock<std::mutex> lock(_compaction_producer_sleep_mutex);
_wakeup_producer_flag = 1;
_compaction_producer_sleep_cv.notify_one();
}
void StorageEngine::_pop_tablet_from_submitted_base_compaction(TabletSharedPtr tablet) {
WriteLock wr_lock(&_tablet_submitted_base_compaction_mutex);
_tablet_submitted_base_compaction[tablet->data_dir()].erase(tablet->tablet_id());
}

void StorageEngine::_push_tablet_into_submitted_cumulative_compaction(TabletSharedPtr tablet) {
WriteLock wr_lock(&_tablet_submitted_cumulative_compaction_mutex);
_tablet_submitted_cumulative_compaction[tablet->data_dir()].insert(tablet->tablet_id());
}

void StorageEngine::_pop_tablet_from_submitted_cumulative_compaction(TabletSharedPtr tablet) {
WriteLock wr_lock(&_tablet_submitted_cumulative_compaction_mutex);
_tablet_submitted_cumulative_compaction[tablet->data_dir()].erase(tablet->tablet_id());
}

} // namespace doris
8 changes: 6 additions & 2 deletions be/src/olap/storage_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,12 @@ StorageEngine::~StorageEngine() {
DEREGISTER_HOOK_METRIC(compaction_mem_current_consumption);
_clear();

if(_compaction_thread_pool){
_compaction_thread_pool->shutdown();
if(_base_compaction_thread_pool){
_base_compaction_thread_pool->shutdown();
}

if(_cumulative_compaction_thread_pool){
_cumulative_compaction_thread_pool->shutdown();
}
}

Expand Down
15 changes: 10 additions & 5 deletions be/src/olap/storage_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,10 @@ class StorageEngine {
void _compaction_tasks_producer_callback();
vector<TabletSharedPtr> _compaction_tasks_generator(CompactionType compaction_type,
std::vector<DataDir*> data_dirs);
void _push_tablet_into_submitted_compaction(TabletSharedPtr tablet);
void _pop_tablet_from_submitted_compaction(TabletSharedPtr tablet);
void _push_tablet_into_submitted_base_compaction(TabletSharedPtr tablet);
void _pop_tablet_from_submitted_base_compaction(TabletSharedPtr tablet);
void _push_tablet_into_submitted_cumulative_compaction(TabletSharedPtr tablet);
void _pop_tablet_from_submitted_cumulative_compaction(TabletSharedPtr tablet);

private:
struct CompactionCandidate {
Expand Down Expand Up @@ -334,12 +336,15 @@ class StorageEngine {

HeartbeatFlags* _heartbeat_flags;

std::unique_ptr<ThreadPool> _compaction_thread_pool;
std::unique_ptr<ThreadPool> _base_compaction_thread_pool;
std::unique_ptr<ThreadPool> _cumulative_compaction_thread_pool;

CompactionPermitLimiter _permit_limiter;

std::mutex _tablet_submitted_compaction_mutex;
std::map<DataDir*, vector<TTabletId>> _tablet_submitted_compaction;
RWMutex _tablet_submitted_base_compaction_mutex;
std::map<DataDir*, std::set<TTabletId>> _tablet_submitted_base_compaction;
RWMutex _tablet_submitted_cumulative_compaction_mutex;
std::map<DataDir*, std::set<TTabletId>> _tablet_submitted_cumulative_compaction;
Comment on lines +339 to +347
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use a two elements array, one for base compaction, the other for cumulative compaction, to simplify code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about use a two elements array, one for base compaction, the other for cumulative compaction, to simplify code?

OK, Thank you.


AtomicInt32 _wakeup_producer_flag;

Expand Down
15 changes: 10 additions & 5 deletions be/src/olap/tablet_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -681,7 +681,8 @@ void TabletManager::get_tablet_stat(TTabletStatResult* result) {

TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
CompactionType compaction_type, DataDir* data_dir,
std::vector<TTabletId>& tablet_submitted_compaction) {
std::set<TTabletId>* tablet_submitted_base_compaction,
std::set<TTabletId>* tablet_submitted_cumulative_compaction) {
int64_t now_ms = UnixMillis();
const string& compaction_type_str =
compaction_type == CompactionType::BASE_COMPACTION ? "base" : "cumulative";
Expand All @@ -693,12 +694,16 @@ TabletSharedPtr TabletManager::find_best_tablet_to_compaction(
ReadLock rlock(tablets_shard.lock.get());
for (const auto& tablet_map : tablets_shard.tablet_map) {
for (const TabletSharedPtr& tablet_ptr : tablet_map.second.table_arr) {
std::vector<TTabletId>::iterator it_tablet =
find(tablet_submitted_compaction.begin(), tablet_submitted_compaction.end(),
tablet_ptr->tablet_id());
if (it_tablet != tablet_submitted_compaction.end()) {
std::set<TTabletId>::iterator it_tablet =
(*tablet_submitted_base_compaction).find(tablet_ptr->tablet_id());
if (it_tablet != (*tablet_submitted_base_compaction).end()) {
continue;
}
it_tablet = (*tablet_submitted_cumulative_compaction).find(tablet_ptr->tablet_id());
if (it_tablet != (*tablet_submitted_cumulative_compaction).end()) {
continue;
}

Comment on lines +697 to +706
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use ContainsKey in be/src/gutil/map-util.h instead to simplify code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use ContainsKey in be/src/gutil/map-util.h instead to simplify code.

We need to judge whether an element exists in setrather than map.

AlterTabletTaskSharedPtr cur_alter_task = tablet_ptr->alter_task();
if (cur_alter_task != nullptr && cur_alter_task->alter_state() != ALTER_FINISHED &&
cur_alter_task->alter_state() != ALTER_FAILED) {
Expand Down
3 changes: 2 additions & 1 deletion be/src/olap/tablet_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ class TabletManager {

TabletSharedPtr find_best_tablet_to_compaction(CompactionType compaction_type,
DataDir* data_dir,
vector<TTabletId>& tablet_submitted_compaction);
std::set<TTabletId>* tablet_submitted_base_compaction,
std::set<TTabletId>* tablet_submitted_cumulative_compaction);

TabletSharedPtr get_tablet(TTabletId tablet_id, SchemaHash schema_hash,
bool include_deleted = false, std::string* err = nullptr);
Expand Down