Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -259,4 +259,10 @@ void AgentServer::publish_cluster_state(TAgentResult& t_agent_result,
status.to_thrift(&t_agent_result.status);
}

void AgentServer::stop_report_workers() {
for (auto& work : _report_workers) {
work->stop();
}
}

} // namespace doris
2 changes: 2 additions & 0 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class AgentServer {

TopicSubscriber* get_topic_subscriber() { return _topic_subscriber.get(); }

void stop_report_workers();

private:
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
Expand Down
4 changes: 2 additions & 2 deletions be/src/cloud/cloud_backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ CloudBackendService::CloudBackendService(CloudStorageEngine& engine, ExecEnv* ex
CloudBackendService::~CloudBackendService() = default;

Status CloudBackendService::create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server) {
auto service = std::make_shared<CloudBackendService>(engine, exec_env);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::CloudBackendService> service) {
service->_agent_server->cloud_start_workers(engine, exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't starve
Expand Down
3 changes: 2 additions & 1 deletion be/src/cloud/cloud_backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ class CloudStorageEngine;
class CloudBackendService final : public BaseBackendService {
public:
static Status create_service(CloudStorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::CloudBackendService> service);

CloudBackendService(CloudStorageEngine& engine, ExecEnv* exec_env);

Expand Down
4 changes: 2 additions & 2 deletions be/src/service/backend_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -374,8 +374,8 @@ BackendService::BackendService(StorageEngine& engine, ExecEnv* exec_env)
BackendService::~BackendService() = default;

Status BackendService::create_service(StorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server) {
auto service = std::make_shared<BackendService>(engine, exec_env);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::BackendService> service) {
service->_agent_server->start_workers(engine, exec_env);
// TODO: do we want a BoostThreadFactory?
// TODO: we want separate thread factories here, so that fe requests can't starve
Expand Down
5 changes: 4 additions & 1 deletion be/src/service/backend_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ class BaseBackendService : public BackendServiceIf {
void warm_up_tablets(TWarmUpTabletsResponse& response,
const TWarmUpTabletsRequest& request) override;

void stop_works() { _agent_server->stop_report_workers(); }

protected:
Status start_plan_fragment_execution(const TExecPlanFragmentParams& exec_params);

Expand All @@ -169,7 +171,8 @@ class BackendService final : public BaseBackendService {
public:
// NOTE: now we do not support multiple backend in one process
static Status create_service(StorageEngine& engine, ExecEnv* exec_env, int port,
std::unique_ptr<ThriftServer>* server);
std::unique_ptr<ThriftServer>* server,
std::shared_ptr<doris::BackendService> service);

BackendService(StorageEngine& engine, ExecEnv* exec_env);

Expand Down
60 changes: 29 additions & 31 deletions be/src/service/doris_main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@
#include <unistd.h>

#include <cstring>
#include <functional>
#include <memory>
#include <ostream>
#include <string>
#include <string_view>
#include <tuple>
#include <vector>

Expand Down Expand Up @@ -498,40 +501,44 @@ int main(int argc, char** argv) {
doris::ThriftRpcHelper::setup(exec_env);
// 1. thrift server with be_port
std::unique_ptr<doris::ThriftServer> be_server;
std::shared_ptr<doris::BaseBackendService> service;
std::function<void(Status&, std::string_view)> stop_work_if_error = [&](Status& status,
std::string_view msg) {
if (!status.ok()) {
std::cerr << msg << '\n';
service->stop_works();
exit(-1);
}
};

if (doris::config::is_cloud_mode()) {
service = std::make_shared<doris::CloudBackendService>(
exec_env->storage_engine().to_cloud(), exec_env);
EXIT_IF_ERROR(doris::CloudBackendService::create_service(
exec_env->storage_engine().to_cloud(), exec_env, doris::config::be_port,
&be_server));
exec_env->storage_engine().to_cloud(), exec_env, doris::config::be_port, &be_server,
std::dynamic_pointer_cast<doris::CloudBackendService>(service)));
} else {
EXIT_IF_ERROR(doris::BackendService::create_service(exec_env->storage_engine().to_local(),
exec_env, doris::config::be_port,
&be_server));
service = std::make_shared<doris::BackendService>(exec_env->storage_engine().to_local(),
exec_env);
EXIT_IF_ERROR(doris::BackendService::create_service(
exec_env->storage_engine().to_local(), exec_env, doris::config::be_port, &be_server,
std::dynamic_pointer_cast<doris::BackendService>(service)));
}

status = be_server->start();
if (!status.ok()) {
std::cerr << "Doris BE server did not start correctly, exiting\n";
exit(-1);
}
stop_work_if_error(status, "Doris BE server did not start correctly, exiting");

// 2. bprc service
std::unique_ptr<doris::BRpcService> brpc_service =
std::make_unique<doris::BRpcService>(exec_env);
status = brpc_service->start(doris::config::brpc_port, doris::config::brpc_num_threads);
if (!status.ok()) {
std::cerr << "BRPC service did not start correctly, exiting\n";
exit(-1);
}
stop_work_if_error(status, "BRPC service did not start correctly, exiting");

// 3. http service
std::unique_ptr<doris::HttpService> http_service = std::make_unique<doris::HttpService>(
exec_env, doris::config::webserver_port, doris::config::webserver_num_workers);
status = http_service->start();
if (!status.ok()) {
std::cerr << "Doris Be http service did not start correctly, exiting\n";
exit(-1);
}
stop_work_if_error(status, "Doris Be http service did not start correctly, exiting");

// 4. heart beat server
doris::TMasterInfo* master_info = exec_env->master_info();
Expand All @@ -540,17 +547,11 @@ int main(int argc, char** argv) {
exec_env, doris::config::heartbeat_service_port, &heartbeat_thrift_server,
doris::config::heartbeat_service_thread_count, master_info);

if (!heartbeat_status.ok()) {
std::cerr << "Heartbeat services did not start correctly, exiting";
exit(-1);
}
stop_work_if_error(heartbeat_status, "Heartbeat services did not start correctly, exiting");

status = heartbeat_thrift_server->start();
if (!status.ok()) {
std::cerr << "Doris BE HeartBeat Service did not start correctly, exiting: " << status
<< '\n';
exit(-1);
}
stop_work_if_error(status, "Doris BE HeartBeat Service did not start correctly, exiting: " +
status.to_string());

// 5. arrow flight service
std::shared_ptr<doris::flight::FlightSqlServer> flight_server =
Expand All @@ -560,11 +561,8 @@ int main(int argc, char** argv) {
// 6. start daemon thread to do clean or gc jobs
doris::Daemon daemon;
daemon.start();
if (!status.ok()) {
std::cerr << "Arrow Flight Service did not start correctly, exiting, " << status.to_string()
<< '\n';
exit(-1);
}
stop_work_if_error(
status, "Arrow Flight Service did not start correctly, exiting, " + status.to_string());

while (!doris::k_doris_exit) {
#if defined(LEAK_SANITIZER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ protected void parseFile() throws AnalysisException {
try {
BrokerUtil.parseFile(path, brokerDesc, fileStatuses);
} catch (UserException e) {
throw new AnalysisException("parse file failed, path = " + path, e);
throw new AnalysisException("parse file failed, path = " + path + ", reason: " + e);
}
}

Expand Down