Skip to content

Commit

Permalink
add comments and adjust log level for meta/processors/admin (#3662)
Browse files Browse the repository at this point in the history
* add comments and adjust log level for meta admin processors

* change agent hb log level

Co-authored-by: Doodle <13706157+critical27@users.noreply.github.com>
  • Loading branch information
pengweisong and critical27 authored Jan 19, 2022
1 parent df073d7 commit fd59b45
Show file tree
Hide file tree
Showing 22 changed files with 288 additions and 101 deletions.
22 changes: 11 additions & 11 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
req.part_id_ref() = partId;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -83,7 +83,7 @@ folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
req.as_learner_ref() = asLearner;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -110,7 +110,7 @@ folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId,
req.learner_ref() = learner;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -137,7 +137,7 @@ folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
req.target_ref() = target;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -166,7 +166,7 @@ folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId,
req.peer_ref() = peer;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -191,7 +191,7 @@ folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
CHECK_NOTNULL(kv_);
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -267,7 +267,7 @@ folly::Future<Status> AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID
req.part_id_ref() = partId;
auto peerRet = getPeers(spaceId, partId);
if (!nebula::ok(peerRet)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(peerRet));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(peerRet));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -554,8 +554,8 @@ void AdminClient::getLeaderDist(const HostAddr& host,
.then([pro = std::move(pro), host, retry, retryLimit, this](
folly::Try<storage::cpp2::GetLeaderPartsResp>&& t) mutable {
if (t.hasException()) {
LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
if (retry < retryLimit) {
usleep(1000 * 50);
getLeaderDist(
Expand Down Expand Up @@ -637,8 +637,8 @@ folly::Future<StatusOr<cpp2::HostBackupInfo>> AdminClient::createSnapshot(
.then([p = std::move(pro), storageHost, host](
folly::Try<storage::cpp2::CreateCPResp>&& t) mutable {
if (t.hasException()) {
LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
p.setValue(Status::Error("RPC failure in createCheckpoint"));
return;
}
Expand Down
127 changes: 123 additions & 4 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,57 +43,168 @@ class AdminClient {
return ioThreadPool_.get();
}

/**
* @brief Transfer given partition's leader from now to dst
*
* @param spaceId
* @param partId
* @param leader
* @param dst
* @return folly::Future<Status>
*/
virtual folly::Future<Status> transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst = kRandomPeer);

/**
* @brief Add a peer for given partition in specified host. The rpc
* will be sent to the new partition peer, letting it know the
* other peers.
*
* @param spaceId
* @param partId
* @param host
* @param asLearner Add an peer only as raft learner
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner);

/**
* @brief Add a learner for given partition. The rpc will be sent to
* the partition leader, writting the add event as a log.
*
* @param spaceId
* @param partId
* @param learner
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addLearner(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& learner);

/**
* @brief Waiting for give partition peer catching data
*
* @param spaceId
* @param partId
* @param target partition peer address
* @return folly::Future<Status>
*/
virtual folly::Future<Status> waitingForCatchUpData(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& target);

/**
* Add/Remove one peer for raft group (spaceId, partId).
* "added" should be true if we want to add one peer, otherwise it is false.
* */
* @brief Add/Remove one peer for partition (spaceId, partId).
* "added" should be true if we want to add one peer, otherwise it is false.
* @param spaceId
* @param partId
* @param peer
* @param added
* @return folly::Future<Status>
*/
virtual folly::Future<Status> memberChange(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& peer,
bool added);

/**
* @brief Update partition peers info in meta kvstore, remove peer 'src', add peer 'dst'
*
* @param spaceId
* @param partId
* @param src
* @param dst
* @return folly::Future<Status>
*/
virtual folly::Future<Status> updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& src,
const HostAddr& dst);

/**
* @brief Remove partition peer in given storage host
*
* @param spaceId
* @param partId
* @param host storage admin service address
* @return folly::Future<Status>
*/
virtual folly::Future<Status> removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host);

/**
* @brief Check and adjust(add/remove) each peer's peers info according to meta kv store
*
* @param spaceId
* @param partId
* @return folly::Future<Status>
*/
virtual folly::Future<Status> checkPeers(GraphSpaceID spaceId, PartitionID partId);

/**
* @brief Get the all partitions' leader distribution
*
* @param result
* @return folly::Future<Status>
*/
virtual folly::Future<Status> getLeaderDist(HostLeaderMap* result);

// used for snapshot and backup
/**
* @brief Create snapshots for given spaces in given host with specified snapshot name
*
* @param spaceIds spaces to create snapshot
* @param name snapshot name
* @param host storage host
* @return folly::Future<StatusOr<cpp2::HostBackupInfo>>
*/
virtual folly::Future<StatusOr<cpp2::HostBackupInfo>> createSnapshot(
const std::set<GraphSpaceID>& spaceIds, const std::string& name, const HostAddr& host);

/**
* @brief Drop snapshots of given spaces in given host with specified snapshot name
*
* @param spaceIds spaces to drop
* @param name snapshot name
* @param host storage host
* @return folly::Future<Status>
*/
virtual folly::Future<Status> dropSnapshot(const std::set<GraphSpaceID>& spaceIds,
const std::string& name,
const HostAddr& host);

/**
* @brief Blocking/Allowing writings to given spaces in specified storage host
*
* @param spaceIds
* @param sign BLOCK_ON: blocking, BLOCK_OFF: allowing
* @param host
* @return folly::Future<Status>
*/
virtual folly::Future<Status> blockingWrites(const std::set<GraphSpaceID>& spaceIds,
storage::cpp2::EngineSignType sign,
const HostAddr& host);

/**
* @brief Add storage admin task to given storage host
*
* @param cmd
* @param jobId
* @param taskId
* @param spaceId
* @param specificHosts if hosts are empty, will send request to all ative storage hosts
* @param taskSpecficParas
* @param parts
* @param concurrency
* @param statsResult
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addTask(cpp2::AdminCmd cmd,
int32_t jobId,
int32_t taskId,
Expand All @@ -104,6 +215,14 @@ class AdminClient {
int concurrency,
cpp2::StatsItem* statsResult = nullptr);

/**
* @brief Stop stoarge admin task in given storage host
*
* @param target if target hosts are emtpy, will send request to all active storage hosts
* @param jobId
* @param taskId
* @return folly::Future<Status>
*/
virtual folly::Future<Status> stopTask(const std::vector<HostAddr>& target,
int32_t jobId,
int32_t taskId);
Expand Down
26 changes: 12 additions & 14 deletions src/meta/processors/admin/AgentHBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void AgentHBProcessor::onFinished() {
Base::onFinished();
}

// Agent heartbeat register agent to meta and pull all services info in agent's host
void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port);
LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT";
Expand All @@ -36,19 +35,19 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
time::WallClock::fastNowInMilliSec(), cpp2::HostRole::AGENT, req.get_git_info_sha());
ret = ActiveHostsMan::updateHostInfo(kvstore_, agentAddr, info);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << folly::sformat("Put agent {} info failed: {}",
agentAddr.toString(),
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Put agent {} info failed: {}",
agentAddr.toString(),
apache::thrift::util::enumNameSafe(ret));
break;
}

// get services in the agent host
auto servicesRet = ActiveHostsMan::getServicesInHost(kvstore_, agentAddr.host);
if (!nebula::ok(servicesRet)) {
ret = nebula::error(servicesRet);
LOG(ERROR) << folly::sformat("Get active services for {} failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Get active services for {} failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
break;
}

Expand All @@ -58,9 +57,9 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
auto dirIterRet = doPrefix(hostDirHostPrefix);
if (!nebula::ok(dirIterRet)) {
ret = nebula::error(dirIterRet);
LOG(ERROR) << folly::sformat("Get host {} dir prefix iterator failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Get host {} dir prefix iterator failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
break;
}
auto dirIter = std::move(nebula::value(dirIterRet));
Expand All @@ -86,7 +85,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {

auto it = serviceDirinfo.find(addr);
if (it == serviceDirinfo.end()) {
LOG(ERROR) << folly::sformat("{} dir info not found", addr.toString());
LOG(INFO) << folly::sformat("{} dir info not found", addr.toString());
break;
}

Expand All @@ -99,7 +98,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
if (serviceList.size() != services.size() - 1) {
ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE;
// missing some services' dir info
LOG(ERROR) << folly::sformat(
LOG(INFO) << folly::sformat(
"Missing some services's dir info, excepted service {}, but only got {}",
services.size() - 1,
serviceList.size());
Expand All @@ -111,8 +110,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId);
if (!nebula::ok(partRet)) {
ret = nebula::error(partRet);
LOG(ERROR) << "Get meta part store failed, error: "
<< apache::thrift::util::enumNameSafe(ret);
LOG(INFO) << "Get meta part store failed, error: " << apache::thrift::util::enumNameSafe(ret);
return;
}
auto raftPeers = nebula::value(partRet)->peers();
Expand Down
3 changes: 3 additions & 0 deletions src/meta/processors/admin/AgentHBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ struct AgentHBCounters final {
};
extern AgentHBCounters kAgentHBCounters;

/**
* @brief Agent heartbeat register agent to meta and pull all services info in agent's host
*/
class AgentHBProcessor : public BaseProcessor<cpp2::AgentHBResp> {
FRIEND_TEST(AgentHBProcessorTest, AgentHBTest);

Expand Down
Loading

0 comments on commit fd59b45

Please sign in to comment.