diff --git a/be/src/agent/agent_server.cpp b/be/src/agent/agent_server.cpp index fc63c1ff837217..0b0b946aae0577 100644 --- a/be/src/agent/agent_server.cpp +++ b/be/src/agent/agent_server.cpp @@ -259,4 +259,10 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result, status.to_thrift(&t_agent_result.status); } +void AgentServer::stop_report_workers() { + for (auto& work : _report_workers) { + work->stop(); + } +} + } // namespace doris diff --git a/be/src/agent/agent_server.h b/be/src/agent/agent_server.h index 761bb9e4b67953..5a7fbafb72eb40 100644 --- a/be/src/agent/agent_server.h +++ b/be/src/agent/agent_server.h @@ -60,6 +60,8 @@ class AgentServer { TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); } + void stop_report_workers(); + private: // Reference to the ExecEnv::_master_info const TMasterInfo& _master_info; diff --git a/be/src/cloud/cloud_backend_service.cpp b/be/src/cloud/cloud_backend_service.cpp index 23a75ffda761cf..80c1aed1a26fb2 100644 --- a/be/src/cloud/cloud_backend_service.cpp +++ b/be/src/cloud/cloud_backend_service.cpp @@ -28,8 +28,8 @@ CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* ex CloudBackendService::~CloudBackendService() = default; Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port, - std::unique_ptr* server) { - auto service = std::make_shared(engine, exec_env); + std::unique_ptr* server, + std::shared_ptr service) { service->_agent_server->cloud_start_workers(engine, exec_env); // TODO: do we want a BoostThreadFactory? // TODO: we want separate thread factories here, so that fe requests can't starve diff --git a/be/src/cloud/cloud_backend_service.h b/be/src/cloud/cloud_backend_service.h index 6c9a710c3a91f0..3fffc0b16a114b 100644 --- a/be/src/cloud/cloud_backend_service.h +++ b/be/src/cloud/cloud_backend_service.h @@ -26,7 +26,8 @@ class CloudStorageEngine; class CloudBackendService final : public BaseBackendService { public: static Status create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port, - std::unique_ptr* server); + std::unique_ptr* server, + std::shared_ptr service); CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env); diff --git a/be/src/service/backend_service.cpp b/be/src/service/backend_service.cpp index 68adeb1abe2610..a084f8192e36ba 100644 --- a/be/src/service/backend_service.cpp +++ b/be/src/service/backend_service.cpp @@ -374,8 +374,8 @@ BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env) BackendService::~BackendService() = default; Status BackendService::create_service(StorageEngine& engine, ExecEnv* exec_env, int port, - std::unique_ptr* server) { - auto service = std::make_shared(engine, exec_env); + std::unique_ptr* server, + std::shared_ptr service) { service->_agent_server->start_workers(engine, exec_env); // TODO: do we want a BoostThreadFactory? // TODO: we want separate thread factories here, so that fe requests can't starve diff --git a/be/src/service/backend_service.h b/be/src/service/backend_service.h index 3aaee5297358ba..20aaa96685ae13 100644 --- a/be/src/service/backend_service.h +++ b/be/src/service/backend_service.h @@ -156,6 +156,8 @@ class BaseBackendService : public BackendServiceIf { void warm_up_tablets(TWarmUpTabletsResponse& response, const TWarmUpTabletsRequest& request) override; + void stop_works() { _agent_server->stop_report_workers(); } + protected: Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params); @@ -169,7 +171,8 @@ class BackendService final : public BaseBackendService { public: // NOTE: now we do not support multiple backend in one process static Status create_service(StorageEngine& engine, ExecEnv* exec_env, int port, - std::unique_ptr* server); + std::unique_ptr* server, + std::shared_ptr service); BackendService(StorageEngine& engine, ExecEnv* exec_env); diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp index 2b0c469bcd2d17..56353db1f31a65 100644 --- a/be/src/service/doris_main.cpp +++ b/be/src/service/doris_main.cpp @@ -34,8 +34,11 @@ #include #include +#include +#include #include #include +#include #include #include @@ -498,40 +501,44 @@ int main(int argc, char** argv) { doris::ThriftRpcHelper::setup(exec_env); // 1. thrift server with be_port std::unique_ptr be_server; + std::shared_ptr service; + std::function stop_work_if_error = [&](Status& status, + std::string_view msg) { + if (!status.ok()) { + std::cerr << msg << '\n'; + service->stop_works(); + exit(-1); + } + }; if (doris::config::is_cloud_mode()) { + service = std::make_shared( + exec_env->storage_engine().to_cloud(), exec_env); EXIT_IF_ERROR(doris::CloudBackendService::create_service( - exec_env->storage_engine().to_cloud(), exec_env, doris::config::be_port, - &be_server)); + exec_env->storage_engine().to_cloud(), exec_env, doris::config::be_port, &be_server, + std::dynamic_pointer_cast(service))); } else { - EXIT_IF_ERROR(doris::BackendService::create_service(exec_env->storage_engine().to_local(), - exec_env, doris::config::be_port, - &be_server)); + service = std::make_shared(exec_env->storage_engine().to_local(), + exec_env); + EXIT_IF_ERROR(doris::BackendService::create_service( + exec_env->storage_engine().to_local(), exec_env, doris::config::be_port, &be_server, + std::dynamic_pointer_cast(service))); } status = be_server->start(); - if (!status.ok()) { - std::cerr << "Doris BE server did not start correctly, exiting\n"; - exit(-1); - } + stop_work_if_error(status, "Doris BE server did not start correctly, exiting"); // 2. bprc service std::unique_ptr brpc_service = std::make_unique(exec_env); status = brpc_service->start(doris::config::brpc_port, doris::config::brpc_num_threads); - if (!status.ok()) { - std::cerr << "BRPC service did not start correctly, exiting\n"; - exit(-1); - } + stop_work_if_error(status, "BRPC service did not start correctly, exiting"); // 3. http service std::unique_ptr http_service = std::make_unique( exec_env, doris::config::webserver_port, doris::config::webserver_num_workers); status = http_service->start(); - if (!status.ok()) { - std::cerr << "Doris Be http service did not start correctly, exiting\n"; - exit(-1); - } + stop_work_if_error(status, "Doris Be http service did not start correctly, exiting"); // 4. heart beat server doris::TMasterInfo* master_info = exec_env->master_info(); @@ -540,17 +547,11 @@ int main(int argc, char** argv) { exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server, doris::config::heartbeat_service_thread_count, master_info); - if (!heartbeat_status.ok()) { - std::cerr << "Heartbeat services did not start correctly, exiting"; - exit(-1); - } + stop_work_if_error(heartbeat_status, "Heartbeat services did not start correctly, exiting"); status = heartbeat_thrift_server->start(); - if (!status.ok()) { - std::cerr << "Doris BE HeartBeat Service did not start correctly, exiting: " << status - << '\n'; - exit(-1); - } + stop_work_if_error(status, "Doris BE HeartBeat Service did not start correctly, exiting: " + + status.to_string()); // 5. arrow flight service std::shared_ptr flight_server = @@ -560,11 +561,8 @@ int main(int argc, char** argv) { // 6. start daemon thread to do clean or gc jobs doris::Daemon daemon; daemon.start(); - if (!status.ok()) { - std::cerr << "Arrow Flight Service did not start correctly, exiting, " << status.to_string() - << '\n'; - exit(-1); - } + stop_work_if_error( + status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string()); while (!doris::k_doris_exit) { #if defined(LEAK_SANITIZER) diff --git a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java index 451b37d1311222..c76cd74bf82d57 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java +++ b/fe/fe-core/src/main/java/org/apache/doris/tablefunction/ExternalFileTableValuedFunction.java @@ -168,7 +168,7 @@ protected void parseFile() throws AnalysisException { try { BrokerUtil.parseFile(path, brokerDesc, fileStatuses); } catch (UserException e) { - throw new AnalysisException("parse file failed, path = " + path, e); + throw new AnalysisException("parse file failed, path = " + path + ", reason: " + e); } }