Skip to content

Commit

Permalink
*: Fix bug that DynamicThreadPool may not destruct tasks in time. (#4210
Browse files Browse the repository at this point in the history
)

ref #4098, ref #4207
  • Loading branch information
fuzhe1989 authored Mar 11, 2022
1 parent f50e1a9 commit 86e21ef
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 2 deletions.
4 changes: 2 additions & 2 deletions dbms/src/Common/DynamicThreadPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,11 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task)
t.detach();
}

void executeTask(const std::unique_ptr<IExecutableTask> & task)
void DynamicThreadPool::executeTask(TaskPtr & task)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool);
task->execute();
task.reset();
}

void DynamicThreadPool::fixedWork(size_t index)
Expand Down Expand Up @@ -124,7 +125,6 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task)
if (!node.task) // may be timeout or cancelled
break;
executeTask(node.task);
node.task.reset();
}
alive_dynamic_threads.fetch_sub(1);
}
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/DynamicThreadPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class DynamicThreadPool
void fixedWork(size_t index);
void dynamicWork(TaskPtr initial_task);

static void executeTask(TaskPtr & task);

const std::chrono::nanoseconds dynamic_auto_shrink_cooldown;

std::vector<std::thread> fixed_threads;
Expand Down
43 changes: 43 additions & 0 deletions dbms/src/Common/tests/gtest_dynamic_thread_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@

namespace DB::tests
{
namespace
{
class DynamicThreadPoolTest : public ::testing::Test
{
};
Expand Down Expand Up @@ -148,4 +150,45 @@ try
}
CATCH

struct X
{
std::mutex * mu;
std::condition_variable * cv;
bool * destructed;

X(std::mutex * mu_, std::condition_variable * cv_, bool * destructed_)
: mu(mu_)
, cv(cv_)
, destructed(destructed_)
{}

~X()
{
std::unique_lock lock(*mu);
*destructed = true;
cv->notify_all();
}
};

TEST_F(DynamicThreadPoolTest, testTaskDestruct)
try
{
std::mutex mu;
std::condition_variable cv;
bool destructed = false;

DynamicThreadPool pool(0, std::chrono::minutes(1));
auto tmp = std::make_shared<X>(&mu, &cv, &destructed);
pool.schedule(true, [x = tmp] {});
tmp.reset();

{
std::unique_lock lock(mu);
auto ret = cv.wait_for(lock, std::chrono::seconds(1), [&] { return destructed; });
ASSERT_TRUE(ret);
}
}
CATCH

} // namespace
} // namespace DB::tests

0 comments on commit 86e21ef

Please sign in to comment.