Skip to content

Commit

Permalink
Test (#43848)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Issue Number: close #xxx

Related PR: #xxx

Problem Summary:

### Release note

None

### Check List (For Author)

- Test <!-- At least one of them must be included. -->
    - [ ] Regression test
    - [ ] Unit Test
    - [ ] Manual test (add detailed scripts or steps below)
    - [ ] No need to test or manual test. Explain why:
- [ ] This is a refactor/code format and no logic has been changed.
        - [ ] Previous test can cover this change.
        - [ ] No code files have been changed.
        - [ ] Other reason <!-- Add your reason?  -->

- Behavior changed:
    - [ ] No.
    - [ ] Yes. <!-- Explain the behavior change -->

- Does this need documentation?
    - [ ] No.
- [ ] Yes. <!-- Add document PR link here. eg:
apache/doris-website#1214 -->

### Check List (For Reviewer who merge this PR)

- [ ] Confirm the release note
- [ ] Confirm test cases
- [ ] Confirm document
- [ ] Add branch pick label <!-- Add branch pick label that this PR
should merge into -->

---------

Co-authored-by: zhannngchen <48427519+zhannngchen@users.noreply.github.com>
Co-authored-by: Chen Zhang <zhangchen@selectdb.com>
Co-authored-by: zzzxl <33418555+zzzxl1993@users.noreply.github.com>
Co-authored-by: lihangyu <lihangyu@flywheels.com>
Co-authored-by: eldenmoon <lihangyu@selectdb.com>
Co-authored-by: meiyi <myimeiyi@gmail.com>
Co-authored-by: bobhan1 <bh2444151092@outlook.com>
Co-authored-by: airborne12 <airborne08@gmail.com>
Co-authored-by: airborne12 <jiangkai@selectdb.com>
Co-authored-by: deardeng <dengxin@selectdb.com>
Co-authored-by: Lei Zhang <zhanglei@selectdb.com>
Co-authored-by: Siyang Tang <82279870+TangSiyang2001@users.noreply.github.com>
Co-authored-by: feiniaofeiafei <53502832+feiniaofeiafei@users.noreply.github.com>
Co-authored-by: Mingyu Chen (Rayner) <morningman@163.com>
Co-authored-by: morningman <yunyou@selectdb.com>
Co-authored-by: GentleCold <gentlecold@qq.com>
Co-authored-by: Vallish Pai <vallishpai@gmail.com>
Co-authored-by: abmdocrt <lianyukang@selectdb.com>
Co-authored-by: Gabriel <liwenqiang@selectdb.com>
Co-authored-by: Mryange <yanxuecheng@selectdb.com>
Co-authored-by: Jerry Hu <mrhhsg@gmail.com>
Co-authored-by: Pxl <xl@selectdb.com>
Co-authored-by: minghong <zhouminghong@selectdb.com>
Co-authored-by: morrySnow <zhangwenxin@selectdb.com>
Co-authored-by: zclllhhjj <zhaochangle@selectdb.com>
Co-authored-by: LiBinfeng <libinfeng@selectdb.com>
Co-authored-by: Sun Chenyang <sunchenyang@selectdb.com>
Co-authored-by: 924060929 <lanhuajian@selectdb.com>
Co-authored-by: Gavin Chou <gavin@selectdb.com>
Co-authored-by: Siyang Tang <tangsiyang@selectdb.com>
Co-authored-by: wuwenchi <wuwenchi@selectdb.com>
Co-authored-by: Dongyang Li <lidongyang@selectdb.com>
Co-authored-by: Xin Liao <liaoxin@selectdb.com>
  • Loading branch information
1 parent b1db33f commit 054742d
Show file tree
Hide file tree
Showing 261 changed files with 3,808 additions and 1,754 deletions.
34 changes: 18 additions & 16 deletions be/src/agent/agent_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ using std::vector;

namespace doris {

AgentServer::AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info)
: _master_info(master_info), _topic_subscriber(new TopicSubscriber()) {
MasterServerClient::create(master_info);
AgentServer::AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info)
: _cluster_info(cluster_info), _topic_subscriber(new TopicSubscriber()) {
MasterServerClient::create(cluster_info);

#if !defined(BE_TEST) && !defined(__APPLE__)
// Add subscriber here and register listeners
Expand Down Expand Up @@ -170,7 +170,7 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
"ALTER_TABLE", config::alter_tablet_worker_count, [&engine](auto&& task) { return alter_tablet_callback(engine, task); });

_workers[TTaskType::CLONE] = std::make_unique<TaskWorkerPool>(
"CLONE", config::clone_worker_count, [&engine, &master_info = _master_info](auto&& task) { return clone_callback(engine, master_info, task); });
"CLONE", config::clone_worker_count, [&engine, &cluster_info = _cluster_info](auto&& task) { return clone_callback(engine, cluster_info, task); });

_workers[TTaskType::STORAGE_MEDIUM_MIGRATE] = std::make_unique<TaskWorkerPool>(
"STORAGE_MEDIUM_MIGRATE", config::storage_medium_migrate_count, [&engine](auto&& task) { return storage_medium_migrate_callback(engine, task); });
Expand All @@ -188,13 +188,13 @@ void AgentServer::start_workers(StorageEngine& engine, ExecEnv* exec_env) {
"UPDATE_VISIBLE_VERSION", 1, [&engine](auto&& task) { return visible_version_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds, [&master_info = _master_info] { report_task_callback(master_info); }));
"REPORT_TASK", _cluster_info, config::report_task_interval_seconds, [&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds, [&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
"REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds, [&engine, &cluster_info = _cluster_info] { report_disk_callback(engine, cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,[&engine, &master_info = _master_info] { report_tablet_callback(engine, master_info); }));
"REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,[&engine, &cluster_info = _cluster_info] { report_tablet_callback(engine, cluster_info); }));
// clang-format on
}

Expand All @@ -217,18 +217,20 @@ void AgentServer::cloud_start_workers(CloudStorageEngine& engine, ExecEnv* exec_
"DROP_TABLE", 1, [&engine](auto&& task) { return drop_tablet_callback(engine, task); });

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_TASK", _master_info, config::report_task_interval_seconds,
[&master_info = _master_info] { report_task_callback(master_info); }));
"REPORT_TASK", _cluster_info, config::report_task_interval_seconds,
[&cluster_info = _cluster_info] { report_task_callback(cluster_info); }));

_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_DISK_STATE", _master_info, config::report_disk_state_interval_seconds,
[&engine, &master_info = _master_info] { report_disk_callback(engine, master_info); }));
"REPORT_DISK_STATE", _cluster_info, config::report_disk_state_interval_seconds,
[&engine, &cluster_info = _cluster_info] {
report_disk_callback(engine, cluster_info);
}));

if (config::enable_cloud_tablet_report) {
_report_workers.push_back(std::make_unique<ReportWorker>(
"REPORT_OLAP_TABLET", _master_info, config::report_tablet_interval_seconds,
[&engine, &master_info = _master_info] {
report_tablet_callback(engine, master_info);
"REPORT_OLAP_TABLET", _cluster_info, config::report_tablet_interval_seconds,
[&engine, &cluster_info = _cluster_info] {
report_tablet_callback(engine, cluster_info);
}));
}
}
Expand All @@ -239,8 +241,8 @@ void AgentServer::submit_tasks(TAgentResult& agent_result,
const std::vector<TAgentTaskRequest>& tasks) {
Status ret_st;

// TODO check master_info here if it is the same with that of heartbeat rpc
if (_master_info.network_address.hostname.empty() || _master_info.network_address.port == 0) {
// TODO check cluster_info here if it is the same with that of heartbeat rpc
if (_cluster_info->master_fe_addr.hostname.empty() || _cluster_info->master_fe_addr.port == 0) {
Status ret_st = Status::Cancelled("Have not get FE Master heartbeat yet");
ret_st.to_thrift(&agent_result.status);
return;
Expand Down
8 changes: 4 additions & 4 deletions be/src/agent/agent_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,15 @@ class ExecEnv;
class TAgentPublishRequest;
class TAgentResult;
class TAgentTaskRequest;
class TMasterInfo;
class ClusterInfo;
class TSnapshotRequest;
class StorageEngine;
class CloudStorageEngine;

// Each method corresponds to one RPC from FE Master, see BackendService.
class AgentServer {
public:
explicit AgentServer(ExecEnv* exec_env, const TMasterInfo& master_info);
explicit AgentServer(ExecEnv* exec_env, const ClusterInfo* cluster_info);
~AgentServer();

void start_workers(StorageEngine& engine, ExecEnv* exec_env);
Expand All @@ -63,8 +63,8 @@ class AgentServer {
void stop_report_workers();

private:
// Reference to the ExecEnv::_master_info
const TMasterInfo& _master_info;
// Reference to the ExecEnv::_cluster_info
const ClusterInfo* _cluster_info;

std::unordered_map<int64_t /* TTaskType */, std::unique_ptr<TaskWorkerPoolIf>> _workers;

Expand Down
63 changes: 39 additions & 24 deletions be/src/agent/heartbeat_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "common/config.h"
#include "common/status.h"
#include "olap/storage_engine.h"
#include "runtime/cluster_info.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/heartbeat_flags.h"
Expand All @@ -49,23 +50,23 @@ class TProcessor;

namespace doris {

HeartbeatServer::HeartbeatServer(TMasterInfo* master_info)
HeartbeatServer::HeartbeatServer(ClusterInfo* cluster_info)
: _engine(ExecEnv::GetInstance()->storage_engine()),
_master_info(master_info),
_cluster_info(cluster_info),
_fe_epoch(0) {
_be_epoch = GetCurrentTimeMicros() / 1000;
}

void HeartbeatServer::init_cluster_id() {
_master_info->cluster_id = _engine.effective_cluster_id();
_cluster_info->cluster_id = _engine.effective_cluster_id();
}

void HeartbeatServer::heartbeat(THeartbeatResult& heartbeat_result,
const TMasterInfo& master_info) {
//print heartbeat in every minute
LOG_EVERY_N(INFO, 12) << "get heartbeat from FE."
<< "host:" << master_info.network_address.hostname
<< ", port:" << master_info.network_address.port
<< ", rpc port:" << master_info.network_address.port
<< ", cluster id:" << master_info.cluster_id
<< ", frontend_info:" << PrintFrontendInfos(master_info.frontend_infos)
<< ", counter:" << google::COUNTER << ", BE start time: " << _be_epoch;
Expand Down Expand Up @@ -108,22 +109,23 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
std::lock_guard<std::mutex> lk(_hb_mtx);

// Check cluster id
if (_master_info->cluster_id == -1) {
if (_cluster_info->cluster_id == -1) {
LOG(INFO) << "get first heartbeat. update cluster id";
// write and update cluster id
RETURN_IF_ERROR(_engine.set_cluster_id(master_info.cluster_id));

_master_info->cluster_id = master_info.cluster_id;
_cluster_info->cluster_id = master_info.cluster_id;
LOG(INFO) << "record cluster id. host: " << master_info.network_address.hostname
<< ". port: " << master_info.network_address.port
<< ". cluster id: " << master_info.cluster_id
<< ". frontend_infos: " << PrintFrontendInfos(master_info.frontend_infos);
} else {
if (_master_info->cluster_id != master_info.cluster_id) {
if (_cluster_info->cluster_id != master_info.cluster_id) {
return Status::InternalError(
"invalid cluster id. ignore. Record cluster id ={}, record frontend info {}. "
"Invalid cluster_id={}, invalid frontend info {}",
_master_info->cluster_id, PrintFrontendInfos(_master_info->frontend_infos),
_cluster_info->cluster_id,
PrintFrontendInfos(ExecEnv::GetInstance()->get_frontends()),
master_info.cluster_id, PrintFrontendInfos(master_info.frontend_infos));
}
}
Expand Down Expand Up @@ -183,22 +185,22 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

bool need_report = false;
if (_master_info->network_address.hostname != master_info.network_address.hostname ||
_master_info->network_address.port != master_info.network_address.port) {
if (_cluster_info->master_fe_addr.hostname != master_info.network_address.hostname ||
_cluster_info->master_fe_addr.port != master_info.network_address.port) {
if (master_info.epoch > _fe_epoch) {
_master_info->network_address.hostname = master_info.network_address.hostname;
_master_info->network_address.port = master_info.network_address.port;
_cluster_info->master_fe_addr.hostname = master_info.network_address.hostname;
_cluster_info->master_fe_addr.port = master_info.network_address.port;
_fe_epoch = master_info.epoch;
need_report = true;
LOG(INFO) << "master change. new master host: "
<< _master_info->network_address.hostname
<< ". port: " << _master_info->network_address.port
<< _cluster_info->master_fe_addr.hostname
<< ". port: " << _cluster_info->master_fe_addr.port
<< ". epoch: " << _fe_epoch;
} else {
return Status::InternalError(
"epoch is not greater than local. ignore heartbeat. host: {}, port: {}, local "
"epoch: {}, received epoch: {}",
_master_info->network_address.hostname, _master_info->network_address.port,
_cluster_info->master_fe_addr.hostname, _cluster_info->master_fe_addr.port,
_fe_epoch, master_info.epoch);
}
} else {
Expand All @@ -211,16 +213,17 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.token) {
if (!_master_info->__isset.token) {
_master_info->__set_token(master_info.token);
LOG(INFO) << "get token. token: " << _master_info->token;
} else if (_master_info->token != master_info.token) {
return Status::InternalError("invalid token");
if (_cluster_info->token == "") {
_cluster_info->token = master_info.token;
LOG(INFO) << "get token. token: " << _cluster_info->token;
} else if (_cluster_info->token != master_info.token) {
return Status::InternalError("invalid token. local: {}, master: {}",
_cluster_info->token, master_info.token);
}
}

if (master_info.__isset.http_port) {
_master_info->__set_http_port(master_info.http_port);
_cluster_info->master_fe_http_port = master_info.http_port;
}

if (master_info.__isset.heartbeat_flags) {
Expand All @@ -229,7 +232,7 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
}

if (master_info.__isset.backend_id) {
_master_info->__set_backend_id(master_info.backend_id);
_cluster_info->backend_id = master_info.backend_id;
BackendOptions::set_backend_id(master_info.backend_id);
}
if (master_info.__isset.frontend_infos) {
Expand Down Expand Up @@ -281,6 +284,18 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {
master_info.tablet_report_inactive_duration_ms;
}

if (master_info.__isset.auth_token) {
if (_cluster_info->curr_auth_token == "") {
_cluster_info->curr_auth_token = master_info.auth_token;
LOG(INFO) << "set new auth token: " << master_info.auth_token;
} else if (_cluster_info->curr_auth_token != master_info.auth_token) {
LOG(INFO) << "last auth token: " << _cluster_info->last_auth_token
<< "set new auth token: " << master_info.auth_token;
_cluster_info->last_auth_token = _cluster_info->curr_auth_token;
_cluster_info->curr_auth_token = master_info.auth_token;
}
}

if (need_report) {
LOG(INFO) << "Master FE is changed or restarted. report tablet and disk info immediately";
_engine.notify_listeners();
Expand All @@ -291,8 +306,8 @@ Status HeartbeatServer::_heartbeat(const TMasterInfo& master_info) {

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t server_port,
std::unique_ptr<ThriftServer>* thrift_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info) {
HeartbeatServer* heartbeat_server = new HeartbeatServer(local_master_info);
uint32_t worker_thread_num, ClusterInfo* cluster_info) {
HeartbeatServer* heartbeat_server = new HeartbeatServer(cluster_info);
if (heartbeat_server == nullptr) {
return Status::InternalError("Get heartbeat server failed");
}
Expand Down
13 changes: 7 additions & 6 deletions be/src/agent/heartbeat_server.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"

namespace doris {
class ClusterInfo;
class ExecEnv;
class THeartbeatResult;
class TMasterInfo;
Expand All @@ -36,15 +37,15 @@ class ThriftServer;

class HeartbeatServer : public HeartbeatServiceIf {
public:
explicit HeartbeatServer(TMasterInfo* master_info);
explicit HeartbeatServer(ClusterInfo* cluster_info);
~HeartbeatServer() override = default;

void init_cluster_id();

// Master send heartbeat to this server
//
// Input parameters:
// * master_info: The struct of master info, contains host ip and port
// * master_info: The struct of master info, contains cluster info from Master FE
//
// Output parameters:
// * heartbeat_result: The result of heartbeat set
Expand All @@ -56,16 +57,16 @@ class HeartbeatServer : public HeartbeatServiceIf {
BaseStorageEngine& _engine;
int64_t _be_epoch;

// mutex to protect master_info and _epoch
// mutex to protect cluster_info and _epoch
std::mutex _hb_mtx;
// Not owned. Point to the ExecEnv::_master_info
TMasterInfo* _master_info = nullptr;
// Not owned. Point to the ExecEnv::_cluster_info
ClusterInfo* _cluster_info = nullptr;
int64_t _fe_epoch;

DISALLOW_COPY_AND_ASSIGN(HeartbeatServer);
}; // class HeartBeatServer

Status create_heartbeat_server(ExecEnv* exec_env, uint32_t heartbeat_server_port,
std::unique_ptr<ThriftServer>* heart_beat_server,
uint32_t worker_thread_num, TMasterInfo* local_master_info);
uint32_t worker_thread_num, ClusterInfo* cluster_info);
} // namespace doris
Loading

0 comments on commit 054742d

Please sign in to comment.