diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index cae968fee7b94e..e81852f97f020c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -336,10 +336,10 @@ void FragmentMgr::stop() { _cancel_thread->join(); } + _thread_pool->shutdown(); // Only me can delete _query_ctx_map.clear(); _pipeline_map.clear(); - _thread_pool->shutdown(); } std::string FragmentMgr::to_http_path(const std::string& file_name) { diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 4c17fff2acb795..4eab31ff3a24ab 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -171,6 +171,10 @@ class FragmentMgr : public RestMonitorIface { ThreadPool* get_thread_pool() { return _thread_pool.get(); } + // When fragment mgr is going to stop, the _stop_background_threads_latch is set to 0 + // and other module that use fragment mgr's thread pool should get this signal and exit. + bool shutting_down() { return _stop_background_threads_latch.count() == 0; } + int32_t running_query_num() { return _query_ctx_map.num_items(); } std::string dump_pipeline_tasks(int64_t duration = 0); @@ -215,7 +219,7 @@ class FragmentMgr : public RestMonitorIface { CountDownLatch _stop_background_threads_latch; scoped_refptr _cancel_thread; - // every job is a pool + // This pool is used as global async task pool std::unique_ptr _thread_pool; std::shared_ptr _entity; diff --git a/be/src/vec/sink/writer/async_result_writer.cpp b/be/src/vec/sink/writer/async_result_writer.cpp index ea623d2dcc304e..57f656bda39f44 100644 --- a/be/src/vec/sink/writer/async_result_writer.cpp +++ b/be/src/vec/sink/writer/async_result_writer.cpp @@ -144,10 +144,18 @@ void AsyncResultWriter::process_block(RuntimeState* state, RuntimeProfile* profi //1) wait scan operator write data { std::unique_lock l(_m); - while (!_eos && _data_queue.empty() && _writer_status.ok()) { + // When the query is cancelled, _writer_status may be set to error status in force_close method. + // When the BE process is exit gracefully, the fragment mgr's thread pool will be shutdown, + // and the async thread will be exit. + while (!_eos && _data_queue.empty() && _writer_status.ok() && + !ExecEnv::GetInstance()->fragment_mgr()->shutting_down()) { // Add 1s to check to avoid lost signal _cv.wait_for(l, std::chrono::seconds(1)); } + // If writer status is not ok, then we should not change its status to avoid lost the actual error status. + if (ExecEnv::GetInstance()->fragment_mgr()->shutting_down() && _writer_status.ok()) { + _writer_status.update(Status::InternalError("FragmentMgr is shutting down")); + } //check if eos or writer error if ((_eos && _data_queue.empty()) || !_writer_status.ok()) {