Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add comments and adjust log level for meta/processors/admin #3662

Merged
merged 3 commits into from
Jan 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions src/meta/processors/admin/AdminClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ folly::Future<Status> AdminClient::transLeader(GraphSpaceID spaceId,
req.part_id_ref() = partId;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -83,7 +83,7 @@ folly::Future<Status> AdminClient::addPart(GraphSpaceID spaceId,
req.as_learner_ref() = asLearner;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -110,7 +110,7 @@ folly::Future<Status> AdminClient::addLearner(GraphSpaceID spaceId,
req.learner_ref() = learner;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -137,7 +137,7 @@ folly::Future<Status> AdminClient::waitingForCatchUpData(GraphSpaceID spaceId,
req.target_ref() = target;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -166,7 +166,7 @@ folly::Future<Status> AdminClient::memberChange(GraphSpaceID spaceId,
req.peer_ref() = peer;
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand All @@ -191,7 +191,7 @@ folly::Future<Status> AdminClient::updateMeta(GraphSpaceID spaceId,
CHECK_NOTNULL(kv_);
auto ret = getPeers(spaceId, partId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(ret));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -267,7 +267,7 @@ folly::Future<Status> AdminClient::checkPeers(GraphSpaceID spaceId, PartitionID
req.part_id_ref() = partId;
auto peerRet = getPeers(spaceId, partId);
if (!nebula::ok(peerRet)) {
LOG(ERROR) << "Get peers failed: " << static_cast<int32_t>(nebula::error(peerRet));
LOG(INFO) << "Get peers failed: " << static_cast<int32_t>(nebula::error(peerRet));
return Status::Error("Get peers failed");
}

Expand Down Expand Up @@ -554,8 +554,8 @@ void AdminClient::getLeaderDist(const HostAddr& host,
.then([pro = std::move(pro), host, retry, retryLimit, this](
folly::Try<storage::cpp2::GetLeaderPartsResp>&& t) mutable {
if (t.hasException()) {
LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
if (retry < retryLimit) {
usleep(1000 * 50);
getLeaderDist(
Expand Down Expand Up @@ -637,8 +637,8 @@ folly::Future<StatusOr<cpp2::HostBackupInfo>> AdminClient::createSnapshot(
.then([p = std::move(pro), storageHost, host](
folly::Try<storage::cpp2::CreateCPResp>&& t) mutable {
if (t.hasException()) {
LOG(ERROR) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
LOG(INFO) << folly::stringPrintf("RPC failure in AdminClient: %s",
t.exception().what().c_str());
p.setValue(Status::Error("RPC failure in createCheckpoint"));
return;
}
Expand Down
127 changes: 123 additions & 4 deletions src/meta/processors/admin/AdminClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,57 +43,168 @@ class AdminClient {
return ioThreadPool_.get();
}

/**
* @brief Transfer given partition's leader from now to dst
*
* @param spaceId
* @param partId
* @param leader
* @param dst
* @return folly::Future<Status>
*/
virtual folly::Future<Status> transLeader(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& dst = kRandomPeer);

/**
* @brief Add a peer for given partition in specified host. The rpc
* will be sent to the new partition peer, letting it know the
* other peers.
*
* @param spaceId
* @param partId
* @param host
* @param asLearner Add an peer only as raft learner
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addPart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host,
bool asLearner);

/**
* @brief Add a learner for given partition. The rpc will be sent to
* the partition leader, writting the add event as a log.
*
* @param spaceId
* @param partId
* @param learner
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addLearner(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& learner);

/**
* @brief Waiting for give partition peer catching data
*
* @param spaceId
* @param partId
* @param target partition peer address
* @return folly::Future<Status>
*/
virtual folly::Future<Status> waitingForCatchUpData(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& target);

/**
* Add/Remove one peer for raft group (spaceId, partId).
* "added" should be true if we want to add one peer, otherwise it is false.
* */
* @brief Add/Remove one peer for partition (spaceId, partId).
* "added" should be true if we want to add one peer, otherwise it is false.
* @param spaceId
* @param partId
* @param peer
* @param added
* @return folly::Future<Status>
*/
virtual folly::Future<Status> memberChange(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& peer,
bool added);

/**
* @brief Update partition peers info in meta kvstore, remove peer 'src', add peer 'dst'
*
* @param spaceId
* @param partId
* @param src
* @param dst
* @return folly::Future<Status>
*/
virtual folly::Future<Status> updateMeta(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& leader,
const HostAddr& src,
const HostAddr& dst);

/**
* @brief Remove partition peer in given storage host
*
* @param spaceId
* @param partId
* @param host storage admin service address
* @return folly::Future<Status>
*/
virtual folly::Future<Status> removePart(GraphSpaceID spaceId,
PartitionID partId,
const HostAddr& host);

/**
* @brief Check and adjust(add/remove) each peer's peers info according to meta kv store
*
* @param spaceId
* @param partId
* @return folly::Future<Status>
*/
virtual folly::Future<Status> checkPeers(GraphSpaceID spaceId, PartitionID partId);

/**
* @brief Get the all partitions' leader distribution
*
* @param result
* @return folly::Future<Status>
*/
virtual folly::Future<Status> getLeaderDist(HostLeaderMap* result);

// used for snapshot and backup
/**
* @brief Create snapshots for given spaces in given host with specified snapshot name
*
* @param spaceIds spaces to create snapshot
* @param name snapshot name
* @param host storage host
* @return folly::Future<StatusOr<cpp2::HostBackupInfo>>
*/
virtual folly::Future<StatusOr<cpp2::HostBackupInfo>> createSnapshot(
const std::set<GraphSpaceID>& spaceIds, const std::string& name, const HostAddr& host);

/**
* @brief Drop snapshots of given spaces in given host with specified snapshot name
*
* @param spaceIds spaces to drop
* @param name snapshot name
* @param host storage host
* @return folly::Future<Status>
*/
virtual folly::Future<Status> dropSnapshot(const std::set<GraphSpaceID>& spaceIds,
const std::string& name,
const HostAddr& host);

/**
* @brief Blocking/Allowing writings to given spaces in specified storage host
*
* @param spaceIds
* @param sign BLOCK_ON: blocking, BLOCK_OFF: allowing
* @param host
* @return folly::Future<Status>
*/
virtual folly::Future<Status> blockingWrites(const std::set<GraphSpaceID>& spaceIds,
storage::cpp2::EngineSignType sign,
const HostAddr& host);

/**
* @brief Add storage admin task to given storage host
*
* @param cmd
* @param jobId
* @param taskId
* @param spaceId
* @param specificHosts if hosts are empty, will send request to all ative storage hosts
* @param taskSpecficParas
* @param parts
* @param concurrency
* @param statsResult
* @return folly::Future<Status>
*/
virtual folly::Future<Status> addTask(cpp2::AdminCmd cmd,
int32_t jobId,
int32_t taskId,
Expand All @@ -104,6 +215,14 @@ class AdminClient {
int concurrency,
cpp2::StatsItem* statsResult = nullptr);

/**
* @brief Stop stoarge admin task in given storage host
*
* @param target if target hosts are emtpy, will send request to all active storage hosts
* @param jobId
* @param taskId
* @return folly::Future<Status>
*/
virtual folly::Future<Status> stopTask(const std::vector<HostAddr>& target,
int32_t jobId,
int32_t taskId);
Expand Down
26 changes: 12 additions & 14 deletions src/meta/processors/admin/AgentHBProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ void AgentHBProcessor::onFinished() {
Base::onFinished();
}

// Agent heartbeat register agent to meta and pull all services info in agent's host
void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
HostAddr agentAddr((*req.host_ref()).host, (*req.host_ref()).port);
LOG(INFO) << "Receive heartbeat from " << agentAddr << ", role = AGENT";
Expand All @@ -36,19 +35,19 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
time::WallClock::fastNowInMilliSec(), cpp2::HostRole::AGENT, req.get_git_info_sha());
ret = ActiveHostsMan::updateHostInfo(kvstore_, agentAddr, info);
if (ret != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << folly::sformat("Put agent {} info failed: {}",
agentAddr.toString(),
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Put agent {} info failed: {}",
agentAddr.toString(),
apache::thrift::util::enumNameSafe(ret));
break;
}

// get services in the agent host
auto servicesRet = ActiveHostsMan::getServicesInHost(kvstore_, agentAddr.host);
if (!nebula::ok(servicesRet)) {
ret = nebula::error(servicesRet);
LOG(ERROR) << folly::sformat("Get active services for {} failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Get active services for {} failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
break;
}

Expand All @@ -58,9 +57,9 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
auto dirIterRet = doPrefix(hostDirHostPrefix);
if (!nebula::ok(dirIterRet)) {
ret = nebula::error(dirIterRet);
LOG(ERROR) << folly::sformat("Get host {} dir prefix iterator failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
LOG(INFO) << folly::sformat("Get host {} dir prefix iterator failed: {}",
agentAddr.host,
apache::thrift::util::enumNameSafe(ret));
break;
}
auto dirIter = std::move(nebula::value(dirIterRet));
Expand All @@ -86,7 +85,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {

auto it = serviceDirinfo.find(addr);
if (it == serviceDirinfo.end()) {
LOG(ERROR) << folly::sformat("{} dir info not found", addr.toString());
LOG(INFO) << folly::sformat("{} dir info not found", addr.toString());
break;
}

Expand All @@ -99,7 +98,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
if (serviceList.size() != services.size() - 1) {
ret = nebula::cpp2::ErrorCode::E_AGENT_HB_FAILUE;
// missing some services' dir info
LOG(ERROR) << folly::sformat(
LOG(INFO) << folly::sformat(
"Missing some services's dir info, excepted service {}, but only got {}",
services.size() - 1,
serviceList.size());
Expand All @@ -111,8 +110,7 @@ void AgentHBProcessor::process(const cpp2::AgentHBReq& req) {
auto partRet = kvstore_->part(kDefaultSpaceId, kDefaultPartId);
if (!nebula::ok(partRet)) {
ret = nebula::error(partRet);
LOG(ERROR) << "Get meta part store failed, error: "
<< apache::thrift::util::enumNameSafe(ret);
LOG(INFO) << "Get meta part store failed, error: " << apache::thrift::util::enumNameSafe(ret);
return;
}
auto raftPeers = nebula::value(partRet)->peers();
Expand Down
3 changes: 3 additions & 0 deletions src/meta/processors/admin/AgentHBProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ struct AgentHBCounters final {
};
extern AgentHBCounters kAgentHBCounters;

/**
* @brief Agent heartbeat register agent to meta and pull all services info in agent's host
*/
class AgentHBProcessor : public BaseProcessor<cpp2::AgentHBResp> {
FRIEND_TEST(AgentHBProcessorTest, AgentHBTest);

Expand Down
Loading