Skip to content

Commit

Permalink
Merge branch 'master' into profiling_storage_detail
Browse files Browse the repository at this point in the history
  • Loading branch information
cangfengzhs authored Sep 6, 2021
2 parents f7ca01d + 5905860 commit 679db39
Show file tree
Hide file tree
Showing 65 changed files with 782 additions and 184 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull_request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ jobs:
run: |
make RM_DIR=false DEBUG=false J=${{ steps.cmake.outputs.j }} tck
working-directory: tests/
timeout-minutes: 30
timeout-minutes: 60
- name: Down cluster
run: |
make down
Expand Down
6 changes: 4 additions & 2 deletions README-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@

## 发布通告

Nebula Graph 2.0.0 GA 版本已发布,目前已经有若干项重要更新,更多新特性正在开发中。欢迎加入我们,一同开发Nebula Graph 2.0。
v1.x和v2.5.0之后的版本,Nebula Graph在这个repo管理。如需获取v2.0.0到v2.5.0之间的版本,请访问[Nebula Graph repo](https://github.com/vesoft-inc/nebula-graph)

Nebula Graph 1.x 后续不再进行功能的更新,请升级到2.0版本中。Nebula Graph内核 1.x 与 2.x数据格式、通信协议、客户端等均双向不兼容,可参照[升级指导](https://docs.nebula-graph.com.cn/2.5.0/4.deployment-and-installation/3.upgrade-nebula-graph/upgrade-nebula-graph-to-250/)进行升级。

<!--
如需使用稳定版本,请参见[Nebula Graph 1.0](https://github.com/vesoft-inc/nebula)。
Expand Down Expand Up @@ -62,7 +64,7 @@ Nebula Graph 2.0.0 GA 版本已发布,目前已经有若干项重要更新,

## 文档

* 简体中文(TODO)
* [简体中文](https://docs.nebula-graph.com.cn/)
* [English](https://docs.nebula-graph.io/)

## Nebula Graph 产品架构图
Expand Down
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@ Compared with other graph database solutions, **Nebula Graph** has the following

## Notice of Release

The Nebula Graph team is glad to announce the Nebula Graph 2.0.0 GA. This is a brand new start with several important updates, and more of them are on the way. Welcome to join us.
This repository hosts the source code of Nebula Graph versions before 2.0.0-alpha and after v2.5.0. If you are looking to use the versions between v2.0.0 and v2.5.0, please head to [Nebula Graph repo](https://github.com/vesoft-inc/nebula-graph).

Nebula Graph 1.x is not actively maintained. Please move to Nebula Graph 2.x. The data format, rpc protocols, clients, etc. are not compatible between Nebula Graph v1.x and v2.x, but we do offer [upgrade guide from 1.x to v2.5.0](https://docs.nebula-graph.io/2.5.0/4.deployment-and-installation/3.upgrade-nebula-graph/upgrade-nebula-graph-to-250/).

<!--
To use the stable release, see [Nebula Graph 1.0](https://github.com/vesoft-inc/nebula).
Expand Down
94 changes: 78 additions & 16 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,21 @@ DEFINE_int32(meta_client_retry_times, 3, "meta client retry times, 0 means no re
DEFINE_int32(meta_client_retry_interval_secs, 1, "meta client sleep interval between retry");
DEFINE_int32(meta_client_timeout_ms, 60 * 1000, "meta client timeout");
DEFINE_string(cluster_id_path, "cluster.id", "file path saved clusterId");

DEFINE_int32(check_plan_killed_frequency, 8, "check plan killed every 1<<n times");
namespace nebula {
namespace meta {

MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
std::vector<HostAddr> addrs,
const MetaClientOptions& options)
: ioThreadPool_(ioThreadPool), addrs_(std::move(addrs)), options_(options) {
: ioThreadPool_(ioThreadPool),
addrs_(std::move(addrs)),
options_(options),
sessionMap_(new SessionMap{}),
killedPlans_(new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>{}) {
CHECK(ioThreadPool_ != nullptr) << "IOThreadPool is required";
CHECK(!addrs_.empty()) << "No meta server address is specified or can be "
"solved. Meta server is required";
CHECK(!addrs_.empty())
<< "No meta server address is specified or can be solved. Meta server is required";
clientsMan_ = std::make_shared<thrift::ThriftClientManager<cpp2::MetaServiceAsyncClient>>();
updateActive();
updateLeader();
Expand All @@ -50,17 +54,24 @@ MetaClient::MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool

MetaClient::~MetaClient() {
stop();
delete sessionMap_.load();
delete killedPlans_.load();
VLOG(3) << "~MetaClient";
}

bool MetaClient::isMetadReady() {
auto ret = heartbeat().get();
if (!ret.ok() && ret.status() != Status::LeaderChanged()) {
if (!ret.ok()) {
LOG(ERROR) << "Heartbeat failed, status:" << ret.status();
ready_ = false;
return ready_;
} else if (options_.role_ == cpp2::HostRole::STORAGE &&
metaServerVersion_ != EXPECT_META_VERSION) {
LOG(ERROR) << "Expect meta version is " << EXPECT_META_VERSION << ", but actual is "
<< metaServerVersion_;
return ready_;
}

// ready_ will be set in loadData
bool ldRet = loadData();
bool lcRet = true;
if (!options_.skipConfig_) {
Expand Down Expand Up @@ -177,6 +188,11 @@ bool MetaClient::loadData() {
return false;
}

if (!loadSessions()) {
LOG(ERROR) << "Load sessions Failed";
return false;
}

auto ret = listSpaces().get();
if (!ret.ok()) {
LOG(ERROR) << "List space failed, status:" << ret.status();
Expand Down Expand Up @@ -992,8 +1008,7 @@ void MetaClient::loadRemoteListeners() {
}
}

/// ================================== public methods
/// =================================
/// ================================== public methods =================================

PartitionID MetaClient::partId(int32_t numParts, const VertexID id) const {
// If the length of the id is 8, we will treat it as int64_t to be compatible
Expand Down Expand Up @@ -2330,7 +2345,8 @@ folly::Future<StatusOr<bool>> MetaClient::heartbeat() {
heartbeatTime_ = time::WallClock::fastNowInMilliSec();
metadLastUpdateTime_ = resp.get_last_update_time_in_ms();
VLOG(1) << "Metad last update time: " << metadLastUpdateTime_;
return true; // resp.code == nebula::cpp2::ErrorCode::SUCCEEDED
metaServerVersion_ = resp.get_meta_version();
return resp.get_code() == nebula::cpp2::ErrorCode::SUCCEEDED;
},
std::move(promise));
return future;
Expand Down Expand Up @@ -2853,17 +2869,15 @@ bool MetaClient::loadCfg() {
// only load current module's config is enough
auto ret = listConfigs(gflagsModule_).get();
if (ret.ok()) {
// if we load config from meta server successfully, update gflags and set
// configReady_
// if we load config from meta server successfully, update gflags and set configReady_
auto items = ret.value();
MetaConfigMap metaConfigMap;
for (auto& item : items) {
std::pair<cpp2::ConfigModule, std::string> key = {item.get_module(), item.get_name()};
metaConfigMap.emplace(std::move(key), std::move(item));
}
{
// For any configurations that is in meta, update in cache to replace
// previous value
// For any configurations that is in meta, update in cache to replace previous value
folly::RWSpinLock::WriteHolder holder(configCacheLock_);
for (const auto& entry : metaConfigMap) {
auto& key = entry.first;
Expand Down Expand Up @@ -2958,9 +2972,8 @@ void MetaClient::loadLeader(const std::vector<cpp2::HostItem>& hostItems,
<< item.get_leader_parts().size() << " space";
}
{
// todo(doodle): in worst case, storage and meta isolated, so graph may get
// a outdate leader info. The problem could be solved if leader term are
// cached as well.
// todo(doodle): in worst case, storage and meta isolated, so graph may get a outdate
// leader info. The problem could be solved if leader term are cached as well.
LOG(INFO) << "Load leader ok";
folly::RWSpinLock::WriteHolder wh(leadersLock_);
leadersInfo_ = std::move(leaderInfo);
Expand Down Expand Up @@ -3480,5 +3493,54 @@ folly::Future<StatusOr<bool>> MetaClient::ingest(GraphSpaceID spaceId) {
return folly::async(func);
}

bool MetaClient::loadSessions() {
auto session_list = listSessions().get();
if (!session_list.ok()) {
LOG(ERROR) << "List sessions failed, status:" << session_list.status();
return false;
}
SessionMap* oldSessionMap = sessionMap_.load();
SessionMap* newSessionMap = new SessionMap(*oldSessionMap);
auto oldKilledPlan = killedPlans_.load();
auto newKilledPlan = new folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>(*oldKilledPlan);
for (auto& session : session_list.value().get_sessions()) {
(*newSessionMap)[session.get_session_id()] = session;
for (auto& query : session.get_queries()) {
if (query.second.get_status() == cpp2::QueryStatus::KILLING) {
newKilledPlan->insert({session.get_session_id(), query.first});
}
}
}
sessionMap_.store(newSessionMap);
killedPlans_.store(newKilledPlan);
folly::rcu_retire(oldKilledPlan);
folly::rcu_retire(oldSessionMap);
return true;
}

StatusOr<cpp2::Session> MetaClient::getSessionFromCache(const nebula::SessionID& session_id) {
if (!ready_) {
return Status::Error("Not ready!");
}
folly::rcu_reader guard;
auto session_map = sessionMap_.load();
auto it = session_map->find(session_id);
if (it != session_map->end()) {
return it->second;
}
return Status::SessionNotFound();
}

bool MetaClient::checkIsPlanKilled(SessionID sessionId, ExecutionPlanID planId) {
static thread_local int check_counter = 0;
// Inaccurate in a multi-threaded environment, but it is not important
check_counter = (check_counter + 1) & ((1 << FLAGS_check_plan_killed_frequency) - 1);
if (check_counter != 0) {
return false;
}
folly::rcu_reader guard;
return killedPlans_.load()->count({sessionId, planId});
}

} // namespace meta
} // namespace nebula
24 changes: 21 additions & 3 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,14 @@
#define CLIENTS_META_METACLIENT_H_

#include <folly/RWSpinLock.h>
#include <folly/container/F14Map.h>
#include <folly/container/F14Set.h>
#include <folly/executors/IOThreadPoolExecutor.h>
#include <folly/synchronization/Rcu.h>
#include <gtest/gtest_prod.h>

#include <atomic>

#include "common/base/Base.h"
#include "common/base/Status.h"
#include "common/base/StatusOr.h"
Expand All @@ -20,9 +25,11 @@
#include "common/thread/GenericWorker.h"
#include "common/thrift/ThriftClientManager.h"
#include "interface/gen-cpp2/MetaServiceAsyncClient.h"
#include "interface/gen-cpp2/common_types.h"
#include "interface/gen-cpp2/meta_types.h"

DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace meta {
Expand Down Expand Up @@ -54,8 +61,7 @@ using NameIndexMap = std::unordered_map<std::pair<GraphSpaceID, std::string>, In
// Get Index Structure by indexID
using Indexes = std::unordered_map<IndexID, std::shared_ptr<cpp2::IndexItem>>;

// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove
// listener on local host
// Listeners is a map of ListenerHost => <PartId + type>, used to add/remove listener on local host
using Listeners =
std::unordered_map<HostAddr, std::vector<std::pair<PartitionID, cpp2::ListenerType>>>;

Expand Down Expand Up @@ -114,6 +120,7 @@ using FulltextClientsList = std::vector<cpp2::FTClient>;

using FTIndexMap = std::unordered_map<std::string, cpp2::FTIndex>;

using SessionMap = std::unordered_map<SessionID, cpp2::Session>;
class MetaChangedListener {
public:
virtual ~MetaChangedListener() = default;
Expand Down Expand Up @@ -174,6 +181,7 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryOnceTest);
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand All @@ -184,7 +192,7 @@ class MetaClient {

bool isMetadReady();

bool waitForMetadReady(int count = -1, int retryIntervalSecs = 2);
bool waitForMetadReady(int count = -1, int retryIntervalSecs = FLAGS_heartbeat_interval_secs);

void stop();

Expand Down Expand Up @@ -550,6 +558,10 @@ class MetaClient {

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

StatusOr<cpp2::Session> getSessionFromCache(const nebula::SessionID& session_id);

bool checkIsPlanKilled(SessionID session_id, ExecutionPlanID plan_id);

StatusOr<HostAddr> getStorageLeaderFromCache(GraphSpaceID spaceId, PartitionID partId);

void updateStorageLeader(GraphSpaceID spaceId, PartitionID partId, const HostAddr& leader);
Expand Down Expand Up @@ -633,6 +645,8 @@ class MetaClient {

bool loadFulltextIndexes();

bool loadSessions();

void loadLeader(const std::vector<cpp2::HostItem>& hostItems,
const SpaceNameIdMap& spaceIndexByName);

Expand Down Expand Up @@ -694,6 +708,8 @@ class MetaClient {
folly::RWSpinLock leaderIdsLock_;
int64_t localLastUpdateTime_{0};
int64_t metadLastUpdateTime_{0};
int64_t metaServerVersion_{-1};
static constexpr int64_t EXPECT_META_VERSION = 2;

// leadersLock_ is used to protect leadersInfo
folly::RWSpinLock leadersLock_;
Expand Down Expand Up @@ -743,6 +759,8 @@ class MetaClient {
MetaClientOptions options_;
std::vector<HostAddr> storageHosts_;
int64_t heartbeatTime_;
std::atomic<SessionMap*> sessionMap_;
std::atomic<folly::F14FastSet<std::pair<SessionID, ExecutionPlanID>>*> killedPlans_;
};

} // namespace meta
Expand Down
Loading

0 comments on commit 679db39

Please sign in to comment.