Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -336,10 +336,10 @@ void FragmentMgr::stop() {
_cancel_thread->join();
}

_thread_pool->shutdown();
// Only me can delete
_query_ctx_map.clear();
_pipeline_map.clear();
_thread_pool->shutdown();
}

std::string FragmentMgr::to_http_path(const std::string& file_name) {
Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@ class FragmentMgr : public RestMonitorIface {

ThreadPool* get_thread_pool() { return _thread_pool.get(); }

// When fragment mgr is going to stop, the _stop_background_threads_latch is set to 0
// and other module that use fragment mgr's thread pool should get this signal and exit.
bool shutting_down() { return _stop_background_threads_latch.count() == 0; }

int32_t running_query_num() { return _query_ctx_map.num_items(); }

std::string dump_pipeline_tasks(int64_t duration = 0);
Expand Down Expand Up @@ -215,7 +219,7 @@ class FragmentMgr : public RestMonitorIface {

CountDownLatch _stop_background_threads_latch;
scoped_refptr<Thread> _cancel_thread;
// every job is a pool
// This pool is used as global async task pool
std::unique_ptr<ThreadPool> _thread_pool;

std::shared_ptr<MetricEntity> _entity;
Expand Down
10 changes: 9 additions & 1 deletion be/src/vec/sink/writer/async_result_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi
//1) wait scan operator write data
{
std::unique_lock l(_m);
while (!_eos && _data_queue.empty() && _writer_status.ok()) {
// When the query is cancelled, _writer_status may be set to error status in force_close method.
// When the BE process is exit gracefully, the fragment mgr's thread pool will be shutdown,
// and the async thread will be exit.
while (!_eos && _data_queue.empty() && _writer_status.ok() &&
!ExecEnv::GetInstance()->fragment_mgr()->shutting_down()) {
// Add 1s to check to avoid lost signal
_cv.wait_for(l, std::chrono::seconds(1));
}
// If writer status is not ok, then we should not change its status to avoid lost the actual error status.
if (ExecEnv::GetInstance()->fragment_mgr()->shutting_down() && _writer_status.ok()) {
_writer_status.update(Status::InternalError<false>("FragmentMgr is shutting down"));
}

//check if eos or writer error
if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {
Expand Down
Loading