Skip to content

Commit

Permalink
switch sync data (vesoft-inc#629)
Browse files Browse the repository at this point in the history
* switch sync data

* adjust log level

* fix rebasea master

* address wenhaocs's comments
  • Loading branch information
panda-sheep authored Mar 14, 2022
1 parent 0ac143b commit 2eb345d
Show file tree
Hide file tree
Showing 35 changed files with 651 additions and 37 deletions.
1 change: 1 addition & 0 deletions .linters/cpp/checkKeyword.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
'KW_GEOGRAPHY',
'KW_DURATION',
'KW_ACROSS',
'KW_RESTART',
]


Expand Down
56 changes: 54 additions & 2 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,15 +637,22 @@ bool MetaClient::loadListeners(GraphSpaceID spaceId,
return false;
}
Listeners listeners;
cpp2::SyncStatus syncStatus = cpp2::SyncStatus::UNKNOWN;
for (auto& listener : listenerRet.value()) {
if (listener.get_part_id() == 0 && listener.space_name_ref().has_value()) {
metaListeners[listener.get_host()].emplace_back(spaceId, *listener.space_name_ref());
if (listener.get_part_id() == 0 && listener.get_type() == cpp2::ListenerType::SYNC) {
if (listener.space_name_ref().has_value()) {
metaListeners[listener.get_host()].emplace_back(spaceId, *listener.space_name_ref());
}
if (listener.sync_status_ref().has_value()) {
syncStatus = *listener.sync_status_ref();
}
} else {
listeners[listener.get_host()].emplace_back(
std::make_pair(listener.get_part_id(), listener.get_type()));
}
}
cache->listeners_ = std::move(listeners);
cache->syncStatus_ = std::move(syncStatus);
return true;
}

Expand Down Expand Up @@ -3052,6 +3059,36 @@ MetaClient::listListenerDrainers(GraphSpaceID spaceId) {
return future;
}

folly::Future<StatusOr<bool>> MetaClient::stopSync(GraphSpaceID spaceId) {
cpp2::StopSyncReq req;
req.space_id_ref() = spaceId;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_stopSync(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

folly::Future<StatusOr<bool>> MetaClient::restartSync(GraphSpaceID spaceId) {
cpp2::RestartSyncReq req;
req.space_id_ref() = spaceId;
folly::Promise<StatusOr<bool>> promise;
auto future = promise.getFuture();
getResponse(
std::move(req),
[](auto client, auto request) { return client->future_restartSync(request); },
[](cpp2::ExecResp&& resp) -> bool {
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
}

bool MetaClient::registerCfg() {
auto ret = regConfig(gflagsDeclared_).get();
if (ret.ok()) {
Expand Down Expand Up @@ -3113,6 +3150,21 @@ StatusOr<cpp2::DrainerClientInfo> MetaClient::getMetaListenerDrainerOnSpaceFromC
return spaceIt->second->metaDrainerClient_;
}

StatusOr<bool> MetaClient::checkListenerCanSync(GraphSpaceID space) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto& metadata = *metadata_.load();
auto spaceIt = metadata.localCache_.find(space);
if (spaceIt == metadata.localCache_.end()) {
VLOG(3) << "Space " << space << " not found!";
return Status::SpaceNotFound();
}

return spaceIt->second->syncStatus_ == cpp2::SyncStatus::ONLINE;
}

StatusOr<ListenersMap> MetaClient::getListenersByHostFromCache(const HostAddr& host) {
if (!ready_) {
return Status::Error("Not ready!");
Expand Down
11 changes: 10 additions & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,9 @@ struct SpaceInfoCache {
// drainer server for slave cluster
std::vector<cpp2::DrainerInfo> drainerServer_;

// If there is a sync listener, set sync status
cpp2::SyncStatus syncStatus_;

// space level variable
std::unordered_map<std::string, Value> variables_;

Expand All @@ -134,6 +137,7 @@ struct SpaceInfoCache {
drainerclients_(info.drainerclients_),
metaDrainerClient_(info.metaDrainerClient_),
drainerServer_(info.drainerServer_),
syncStatus_(info.syncStatus_),
variables_(info.variables_) {}

~SpaceInfoCache() = default;
Expand Down Expand Up @@ -448,7 +452,6 @@ class MetaClient {
folly::Future<StatusOr<std::vector<cpp2::Snapshot>>> listSnapshots();

// Operations for listener.

folly::Future<StatusOr<bool>> addListener(GraphSpaceID spaceId,
cpp2::ListenerType type,
std::vector<HostAddr> storageHosts,
Expand All @@ -475,6 +478,10 @@ class MetaClient {
StatusOr<std::vector<RemoteListenerInfo>> getListenerHostTypeBySpacePartType(GraphSpaceID spaceId,
PartitionID partId);

folly::Future<StatusOr<bool>> stopSync(GraphSpaceID spaceId);

folly::Future<StatusOr<bool>> restartSync(GraphSpaceID spaceId);

// Opeartions for drainer.
folly::Future<StatusOr<bool>> addDrainer(GraphSpaceID spaceId, std::vector<HostAddr> hosts);

Expand Down Expand Up @@ -642,6 +649,8 @@ class MetaClient {

StatusOr<cpp2::DrainerClientInfo> getMetaListenerDrainerOnSpaceFromCache(GraphSpaceID space);

StatusOr<bool> checkListenerCanSync(GraphSpaceID space);

StatusOr<TermID> getTermFromCache(GraphSpaceID spaceId, PartitionID);

bool checkShadowAccountFromCache(const std::string& account);
Expand Down
8 changes: 8 additions & 0 deletions src/common/meta/ServiceManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,13 @@ StatusOr<std::vector<std::pair<GraphSpaceID, std::string>>> ServiceManager::getM
return std::move(ret).value();
}

StatusOr<bool> ServiceManager::checkListenerCanSync(GraphSpaceID spaceId) {
auto ret = metaClient_->checkListenerCanSync(spaceId);
if (!ret.ok()) {
return ret.status();
}
return std::move(ret).value();
}

} // namespace meta
} // namespace nebula
52 changes: 45 additions & 7 deletions src/common/meta/ServiceManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,67 @@
namespace nebula {
namespace meta {

// This class manages some external services.
// For example, es client, drainer client, drainer server, etc.
/**
* @brief This class manages some external services.
* For example, es client, drainer client, drainer server, etc.
*
*/
class ServiceManager final {
public:
ServiceManager() = default;
~ServiceManager();

// Get the service client, such as es client.
/**
* @brief Get the service client, such as es client.
*
* @param type
* @return StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>>
*/
StatusOr<std::vector<nebula::meta::cpp2::ServiceClient>> getServiceClients(
cpp2::ExternalServiceType type);

// Sync storage listener drainer client for master cluster.
/**
* @brief Get sync storage listener drainer client of specified space on master cluster.
*
* @param space
* @param partId
* @return StatusOr<nebula::meta::cpp2::DrainerClientInfo>
*/
StatusOr<nebula::meta::cpp2::DrainerClientInfo> getDrainerClient(GraphSpaceID space,
PartitionID partId);
// Sync meta listener drainer client for master cluster.

/**
* @brief Get sync meta listener drainer client of specified space on master cluster.
*
* @param space
* @return StatusOr<nebula::meta::cpp2::DrainerClientInfo>
*/
StatusOr<nebula::meta::cpp2::DrainerClientInfo> getMetaListenerDrainerClient(GraphSpaceID space);

// Drainer server for slave cluster
/**
* @brief Get drainer server of specified space on slave cluster
*
* @param spaceId
* @return StatusOr<std::vector<cpp2::DrainerInfo>>
*/
StatusOr<std::vector<cpp2::DrainerInfo>> getDrainerServer(GraphSpaceID spaceId);

// Get meta listener info
/**
* @brief Get the meta listener info of specified host
*
* @param host
* @return StatusOr<std::vector<std::pair<GraphSpaceID, std::string>>>
*/
StatusOr<std::vector<std::pair<GraphSpaceID, std::string>>> getMetaListenerInfo(HostAddr host);

/**
* @brief Check if listener can sync data.
*
* @param spaceId
* @return StatusOr<bool>
*/
StatusOr<bool> checkListenerCanSync(GraphSpaceID spaceId);

void init(MetaClient *client);

static std::unique_ptr<ServiceManager> create(MetaClient *client);
Expand Down
22 changes: 22 additions & 0 deletions src/common/utils/MetaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ static const std::unordered_map<
{"leader_terms", {"__leader_terms__", nullptr}},
{"listener", {"__listener__", nullptr}},
{"listener_drainer", {"__listener_drainer__", nullptr}},
{"sync_status", {"__sync_status__", nullptr}},
{"drainer", {"__drainer__", nullptr}},
{"stats", {"__stats__", MetaKeyUtils::parseStatsSpace}},
{"balance_task", {"__balance_task__", nullptr}},
Expand Down Expand Up @@ -87,6 +88,7 @@ static const std::string kGroupsTable = systemTableMaps.at("groups").fir
static const std::string kZonesTable = systemTableMaps.at("zones").first; // NOLINT
static const std::string kListenerTable = tableMaps.at("listener").first; // NOLINT
static const std::string kListenerDrainerTable = tableMaps.at("listener_drainer").first; // NOLINT
static const std::string kSyncStatusTable = tableMaps.at("sync_status").first; // NOLINT
static const std::string kDrainerTable = tableMaps.at("drainer").first; // NOLINT
static const std::string kDiskPartsTable = tableMaps.at("disk_parts").first; // NOLINT
static const std::string kVariableTable = tableMaps.at("variable").first; // NOLINT
Expand Down Expand Up @@ -1366,6 +1368,26 @@ PartitionID MetaKeyUtils::parseListenerPart(folly::StringPiece rawData) {
return *reinterpret_cast<const PartitionID*>(rawData.data() + offset);
}

std::string MetaKeyUtils::syncStatusKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(kSyncStatusTable.size() + sizeof(GraphSpaceID));
key.append(kSyncStatusTable.data(), kSyncStatusTable.size())
.append(reinterpret_cast<const char*>(&spaceId), sizeof(GraphSpaceID));
return key;
}

std::string MetaKeyUtils::syncStatusVal(const meta::cpp2::SyncStatus& status) {
std::string val;
apache::thrift::CompactSerializer::serialize(status, &val);
return val;
}

meta::cpp2::SyncStatus MetaKeyUtils::parseSyncStatusVal(const folly::StringPiece& rawData) {
meta::cpp2::SyncStatus syncStatus;
apache::thrift::CompactSerializer::deserialize(rawData, syncStatus);
return syncStatus;
}

std::string MetaKeyUtils::drainerKey(GraphSpaceID spaceId) {
std::string key;
key.reserve(kDrainerTable.size() + sizeof(GraphSpaceID));
Expand Down
6 changes: 6 additions & 0 deletions src/common/utils/MetaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,12 @@ class MetaKeyUtils final {

static PartitionID parseListenerPart(folly::StringPiece rawData);

static std::string syncStatusKey(GraphSpaceID spaceId);

static std::string syncStatusVal(const meta::cpp2::SyncStatus& status);

static meta::cpp2::SyncStatus parseSyncStatusVal(const folly::StringPiece& rawVal);

// drainer
static std::string drainerKey(GraphSpaceID spaceId);

Expand Down
2 changes: 1 addition & 1 deletion src/drainer/DrainerTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class DrainerTask {
*/
void finish(nebula::cpp2::ErrorCode rc) {
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Drainer task space " << spaceId_ << " succeeded";
VLOG(2) << "Drainer task space " << spaceId_ << " succeeded";
} else {
LOG(INFO) << "Drainer task space " << spaceId_ << " failed, "
<< apache::thrift::util::enumNameSafe(rc);
Expand Down
6 changes: 6 additions & 0 deletions src/graph/executor/Executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,6 +502,12 @@ Executor *Executor::makeExecutor(QueryContext *qctx, const PlanNode *node) {
case PlanNode::Kind::kShowListener: {
return pool->add(new ShowListenerExecutor(node, qctx));
}
case PlanNode::Kind::kStopSync: {
return pool->add(new StopSyncExecutor(node, qctx));
}
case PlanNode::Kind::kRestartSync: {
return pool->add(new RestartSyncExecutor(node, qctx));
}
case PlanNode::Kind::kAddDrainer: {
return pool->add(new AddDrainerExecutor(node, qctx));
}
Expand Down
38 changes: 36 additions & 2 deletions src/graph/executor/admin/ListenerExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ folly::Future<Status> ShowListenerExecutor::execute() {
});

if (type == meta::cpp2::ListenerType::SYNC) {
DataSet result({"PartId", "Type", "Host", "SpaceName", "Status"});
DataSet result({"PartId", "Type", "Host", "SpaceName", "Host Status", "Sync Status"});
for (const auto& info : listenerInfos) {
Row row;
row.values.emplace_back(info.get_part_id());
Expand All @@ -89,11 +89,17 @@ folly::Future<Status> ShowListenerExecutor::execute() {
row.values.emplace_back("");
}
row.values.emplace_back(apache::thrift::util::enumNameSafe(info.get_status()));

if (info.sync_status_ref().has_value()) {
row.values.emplace_back(apache::thrift::util::enumNameSafe(*info.sync_status_ref()));
} else {
row.values.emplace_back("OFFLINE");
}
result.emplace_back(std::move(row));
}
return finish(std::move(result));
} else {
DataSet result({"PartId", "Type", "Host", "Status"});
DataSet result({"PartId", "Type", "Host", "Host Status"});
for (const auto& info : listenerInfos) {
Row row;
row.values.emplace_back(info.get_part_id());
Expand All @@ -107,5 +113,33 @@ folly::Future<Status> ShowListenerExecutor::execute() {
});
}

folly::Future<Status> StopSyncExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto spaceId = qctx()->rctx()->session()->space().id;
return qctx()->getMetaClient()->stopSync(spaceId).via(runner()).thenValue(
[this](StatusOr<bool> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
return Status::OK();
});
}

folly::Future<Status> RestartSyncExecutor::execute() {
SCOPED_TIMER(&execTime_);
auto spaceId = qctx()->rctx()->session()->space().id;
return qctx()->getMetaClient()->restartSync(spaceId).via(runner()).thenValue(
[this](StatusOr<bool> resp) {
SCOPED_TIMER(&execTime_);
if (!resp.ok()) {
LOG(ERROR) << resp.status();
return resp.status();
}
return Status::OK();
});
}

} // namespace graph
} // namespace nebula
16 changes: 16 additions & 0 deletions src/graph/executor/admin/ListenerExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,22 @@ class ShowListenerExecutor final : public Executor {
folly::Future<Status> execute() override;
};

class StopSyncExecutor final : public Executor {
public:
StopSyncExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("StopSyncExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

class RestartSyncExecutor final : public Executor {
public:
RestartSyncExecutor(const PlanNode *node, QueryContext *qctx)
: Executor("RestartSyncExecutor", node, qctx) {}

folly::Future<Status> execute() override;
};

} // namespace graph
} // namespace nebula

Expand Down
Loading

0 comments on commit 2eb345d

Please sign in to comment.