Skip to content

Commit

Permalink
Merge branch 'master' into subgraph_vertices_filter
Browse files Browse the repository at this point in the history
  • Loading branch information
nevermore3 authored Sep 22, 2022
2 parents 64b0d5c + f92c24c commit 3b117e0
Show file tree
Hide file tree
Showing 51 changed files with 264 additions and 3,012 deletions.
2 changes: 1 addition & 1 deletion cmake/nebula/CompilerLauncher.cmake
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
option(ENABLE_COMPILER_LAUNCHER "Using compiler launcher if available" ON)
if(NOT ENABLE_COMPILER_LAUNCHER)
if(NOT (ENABLE_COMPILER_LAUNCHER AND ENABLE_CCACHE))
return()
endif()

Expand Down
2 changes: 1 addition & 1 deletion docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@ Following docker images will be ready in production.
- [vesoft/nebula-graphd](https://hub.docker.com/r/vesoft/nebula-graphd): nebula-graphd service built with `Dockerfile.graphd`
- [vesoft/nebula-metad](https://hub.docker.com/r/vesoft/nebula-metad): nebula-metad service built with `Dockerfile.metad`
- [vesoft/nebula-storaged](https://hub.docker.com/r/vesoft/nebula-storaged): nebula-storaged service built with `Dockerfile.storaged`
- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile.tools`, including db_dump, meta_dump and db_upgrader
- [vesoft/nebula-tools](https://hub.docker.com/r/vesoft/nebula-tools): nebula tools built with `Dockerfile.tools`, including db_dump and meta_dump
2 changes: 1 addition & 1 deletion src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -737,7 +737,7 @@ class MetaClient : public BaseMetaClient {

ListenersMap doGetListenersMap(const HostAddr& host, const LocalCache& localCache);

// Checks if the the client version is compatible with the server version by checking the
// Checks if the client version is compatible with the server version by checking the
// whilelist in meta.
Status verifyVersion();

Expand Down
2 changes: 1 addition & 1 deletion src/common/conf/test/ConfigurationTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ TEST(Configuration, ParseFromFile) {
fprintf(file, "// This a C++ style comment line\n");
fprintf(file, "# This a shell style comment line\n");
fprintf(file, "{ \n");
fprintf(file, "\t\"int\": 123, // this is a integer\n");
fprintf(file, "\t\"int\": 123, // this is an integer\n");
fprintf(file, "\t\"double\": 3.14 # this is a double\n");
fprintf(file, "} \n");
::fflush(file);
Expand Down
1 change: 0 additions & 1 deletion src/common/utils/NebulaKeyUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ std::vector<std::string> NebulaKeyUtils::snapshotPrefix(PartitionID partId) {
if (partId == 0) {
result.emplace_back("");
} else {
result.emplace_back(vertexPrefix(partId));
result.emplace_back(tagPrefix(partId));
result.emplace_back(edgePrefix(partId));
result.emplace_back(IndexKeyUtils::indexPrefix(partId));
Expand Down
6 changes: 6 additions & 0 deletions src/common/utils/NebulaKeyUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,12 @@ class NebulaKeyUtils final {
return static_cast<NebulaKeyType>(type) == NebulaKeyType::kEdge;
}

static bool isVertex(const folly::StringPiece& rawKey) {
constexpr int32_t len = static_cast<int32_t>(sizeof(NebulaKeyType));
auto type = readInt<uint32_t>(rawKey.data(), len) & kTypeMask;
return static_cast<NebulaKeyType>(type) == NebulaKeyType::kVertex;
}

static bool isLock(size_t vIdLen, const folly::StringPiece& rawKey) {
return isEdge(vIdLen, rawKey, kLockVersion);
}
Expand Down
29 changes: 5 additions & 24 deletions src/daemons/MetaDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <thrift/lib/cpp2/server/ThriftServer.h>

#include "MetaDaemonInit.h"
#include "clients/meta/MetaClient.h"
#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/fs/FileUtils.h"
Expand All @@ -25,7 +26,6 @@
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand Down Expand Up @@ -167,29 +167,10 @@ int main(int argc, char* argv[]) {
}
}

{
/**
* Only leader part needed.
*/
auto ret = gKVStore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Part leader get failed";
return EXIT_FAILURE;
}
if (nebula::value(ret) == localhost) {
LOG(INFO) << "Check and init root user";
auto checkRet = nebula::meta::RootUserMan::isGodExists(gKVStore.get());
if (!nebula::ok(checkRet)) {
auto retCode = nebula::error(checkRet);
LOG(ERROR) << "Parser God Role error:" << apache::thrift::util::enumNameSafe(retCode);
return EXIT_FAILURE;
}
auto existGod = nebula::value(checkRet);
if (!existGod && !nebula::meta::RootUserMan::initRootUser(gKVStore.get())) {
LOG(ERROR) << "Init root user failed";
return EXIT_FAILURE;
}
}
auto godInit = initGodUser(gKVStore.get(), localhost);
if (godInit != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Init god user failed";
return EXIT_FAILURE;
}

auto metaServer = std::make_unique<apache::thrift::ThriftServer>();
Expand Down
56 changes: 56 additions & 0 deletions src/daemons/MetaDaemonInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,22 @@

#include "common/base/Base.h"
#include "common/base/SignalHandler.h"
#include "common/datatypes/HostAddr.h"
#include "common/fs/FileUtils.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "common/hdfs/HdfsHelper.h"
#include "common/network/NetworkUtils.h"
#include "common/ssl/SSLConfig.h"
#include "common/thread/GenericThreadPool.h"
#include "common/utils/MetaKeyUtils.h"
#include "kvstore/KVStore.h"
#include "kvstore/NebulaStore.h"
#include "kvstore/PartManager.h"
#include "meta/ActiveHostsMan.h"
#include "meta/KVBasedClusterIdMan.h"
#include "meta/MetaServiceHandler.h"
#include "meta/MetaVersionMan.h"
#include "meta/RootUserMan.h"
#include "meta/http/MetaHttpReplaceHostHandler.h"
#include "meta/processors/job/JobManager.h"
#include "meta/stats/MetaStats.h"
Expand All @@ -46,6 +49,8 @@ DECLARE_string(meta_server_addrs); // use define from grap flags.
DEFINE_int32(ws_meta_http_port, 11000, "Port to listen on Meta with HTTP protocol");
#endif

DECLARE_uint32(raft_heartbeat_interval_secs);

using nebula::web::PathParams;

namespace nebula::meta {
Expand Down Expand Up @@ -158,6 +163,57 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
return kvstore;
}

nebula::cpp2::ErrorCode initGodUser(nebula::kvstore::KVStore* kvstore,
const nebula::HostAddr& localhost) {
const int kMaxRetryTime = FLAGS_raft_heartbeat_interval_secs * 3;
constexpr int kRetryInterval = 1;
int retryTime = 0;
// Both leader & follower need to wait all reading ok.
// leader init need to retry writing if leader changed.
while (true) {
retryTime += kRetryInterval;
if (retryTime > kMaxRetryTime) {
LOG(ERROR) << "Retry too many times";
return nebula::cpp2::ErrorCode::E_RETRY_EXHAUSTED;
}
auto ret = kvstore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Part leader get failed";
return nebula::error(ret);
}
LOG(INFO) << "Check root user"; // follower need to wait reading all ok, too.
auto checkRet = nebula::meta::RootUserMan::isGodExists(kvstore);
if (!nebula::ok(checkRet)) {
auto retCode = nebula::error(checkRet);
if (retCode == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
LOG(INFO) << "Leader changed, retry";
sleep(kRetryInterval);
continue;
}
LOG(ERROR) << "Parser God Role error:" << apache::thrift::util::enumNameSafe(retCode);
return nebula::error(checkRet);
}
if (nebula::value(ret) == localhost) {
auto existGod = nebula::value(checkRet);
if (!existGod) {
auto initGod = nebula::meta::RootUserMan::initRootUser(kvstore);
if (initGod != nebula::cpp2::ErrorCode::SUCCEEDED) {
if (initGod != nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
LOG(ERROR) << "Init God Role error:" << apache::thrift::util::enumNameSafe(initGod);
return initGod;
} else {
LOG(INFO) << "Leader changed, retry";
sleep(kRetryInterval);
continue;
}
}
}
}
break;
}
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore) {
LOG(INFO) << "Starting Meta HTTP Service";
auto& router = svc->router();
Expand Down
4 changes: 4 additions & 0 deletions src/daemons/MetaDaemonInit.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

#include "common/base/Status.h"
#include "common/hdfs/HdfsCommandHelper.h"
#include "interface/gen-cpp2/common_types.h"
#include "kvstore/KVStore.h"
#include "webservice/WebService.h"

Expand All @@ -18,4 +19,7 @@ std::unique_ptr<nebula::kvstore::KVStore> initKV(std::vector<nebula::HostAddr> p
nebula::HostAddr localhost);

nebula::Status initWebService(nebula::WebService* svc, nebula::kvstore::KVStore* kvstore);

nebula::cpp2::ErrorCode initGodUser(nebula::kvstore::KVStore* kvstore,
const nebula::HostAddr& localhost);
#endif
27 changes: 4 additions & 23 deletions src/daemons/StandAloneDaemon.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,29 +223,10 @@ int main(int argc, char *argv[]) {
}
}

{
/**
* Only leader part needed.
*/
auto ret = gMetaKVStore->partLeader(nebula::kDefaultSpaceId, nebula::kDefaultPartId);
if (!nebula::ok(ret)) {
LOG(ERROR) << "Part leader get failed";
return;
}
if (nebula::value(ret) == metaLocalhost) {
LOG(INFO) << "Check and init root user";
auto checkRet = nebula::meta::RootUserMan::isGodExists(gMetaKVStore.get());
if (!nebula::ok(checkRet)) {
auto retCode = nebula::error(checkRet);
LOG(ERROR) << "Parser God Role error:" << apache::thrift::util::enumNameSafe(retCode);
return;
}
auto existGod = nebula::value(checkRet);
if (!existGod && !nebula::meta::RootUserMan::initRootUser(gMetaKVStore.get())) {
LOG(ERROR) << "Init root user failed";
return;
}
}
auto godInit = initGodUser(gMetaKVStore.get(), metaLocalhost);
if (godInit != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << "Init god user failed";
return;
}

auto handler =
Expand Down
5 changes: 4 additions & 1 deletion src/graph/planner/ngql/LookupPlanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,11 @@ StatusOr<SubPlan> LookupPlanner::transform(AstContext* astCtx) {
if (lookupCtx->filter) {
plan.root = Filter::make(qctx, plan.root, lookupCtx->filter);
}

plan.root = Project::make(qctx, plan.root, lookupCtx->yieldExpr);
if (lookupCtx->dedup) {
plan.root = Dedup::make(qctx, plan.root);
}

return plan;
}

Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,3 +103,5 @@ DEFINE_uint32(
gc_worker_size,
0,
"Background garbage clean workers, default number is 0 which means using hardware core size.");

DEFINE_bool(graph_use_vertex_key, false, "whether allow insert or query the vertex key");
3 changes: 3 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,4 +65,7 @@ DECLARE_int32(max_job_size);

DECLARE_bool(enable_async_gc);
DECLARE_uint32(gc_worker_size);

DECLARE_bool(graph_use_vertex_key);

#endif // GRAPH_GRAPHFLAGS_H_
3 changes: 1 addition & 2 deletions src/graph/validator/LookupValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -442,8 +442,7 @@ StatusOr<Expression*> LookupValidator::checkConstExpr(Expression* expr,
if (type == nebula::cpp2::PropertyType::UNKNOWN) {
return Status::SemanticError("Invalid column: %s", prop.c_str());
}
QueryExpressionContext dummy(nullptr);
auto v = Expression::eval(expr, dummy);
auto v = Expression::eval(expr, QueryExpressionContext(qctx_->ectx())());
// TODO(Aiee) extract the type cast logic as a method if we decide to support
// more cross-type comparisons.

Expand Down
3 changes: 3 additions & 0 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ Status InsertVerticesValidator::check() {
}

auto tagItems = sentence->tagItems();
if (!FLAGS_graph_use_vertex_key && tagItems.empty()) {
return Status::SemanticError("Insert vertex is forbidden, please speicify the tag");
}

schemas_.reserve(tagItems.size());

Expand Down
24 changes: 14 additions & 10 deletions src/graph/visitor/PrunePropertiesVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,11 +358,13 @@ void PrunePropertiesVisitor::pruneCurrent(AppendVertices *node) {
node->setVertexProps(nullptr);
} else {
// only get _tag when props is nullptr
auto tagId = vertexProps->front().tag_ref().value();
VertexProp newVProp;
newVProp.tag_ref() = tagId;
newVProp.props_ref() = {nebula::kTag};
prunedVertexProps->emplace_back(std::move(newVProp));
for (auto &vertexProp : *vertexProps) {
auto tagId = vertexProp.tag_ref().value();
VertexProp newVProp;
newVProp.tag_ref() = tagId;
newVProp.props_ref() = {nebula::kTag};
prunedVertexProps->emplace_back(std::move(newVProp));
}
node->setVertexProps(std::move(prunedVertexProps));
}
return;
Expand All @@ -373,11 +375,13 @@ void PrunePropertiesVisitor::pruneCurrent(AppendVertices *node) {
node->setVertexProps(nullptr);
} else {
// only get _tag when props is nullptr
auto tagId = vertexProps->front().tag_ref().value();
VertexProp newVProp;
newVProp.tag_ref() = tagId;
newVProp.props_ref() = {nebula::kTag};
prunedVertexProps->emplace_back(std::move(newVProp));
for (auto &vertexProp : *vertexProps) {
auto tagId = vertexProp.tag_ref().value();
VertexProp newVProp;
newVProp.tag_ref() = tagId;
newVProp.props_ref() = {nebula::kTag};
prunedVertexProps->emplace_back(std::move(newVProp));
}
node->setVertexProps(std::move(prunedVertexProps));
}
return;
Expand Down
2 changes: 1 addition & 1 deletion src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1988,7 +1988,7 @@ void RaftPart::processSendSnapshotRequest(const cpp2::SendSnapshotRequest& req,
lastTotalSize_ = 0;
} else if (lastSnapshotCommitId_ != req.get_committed_log_id() ||
lastSnapshotCommitTerm_ != req.get_committed_log_term()) {
// Still waiting for snapshot from another peer, just return error. If the the peer doesn't
// Still waiting for snapshot from another peer, just return error. If the peer doesn't
// send any logs during raft_snapshot_timeout, will convert to Status::RUNNING, so we can accept
// snapshot again
resp.error_code_ref() = nebula::cpp2::ErrorCode::E_RAFT_WAITING_SNAPSHOT;
Expand Down
12 changes: 5 additions & 7 deletions src/meta/RootUserMan.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "common/base/Base.h"
#include "common/utils/MetaKeyUtils.h"
#include "interface/gen-cpp2/common_types.h"
#include "kvstore/KVStore.h"

namespace nebula {
Expand Down Expand Up @@ -53,7 +54,7 @@ class RootUserMan {
}
}

static bool initRootUser(kvstore::KVStore* kv) {
static nebula::cpp2::ErrorCode initRootUser(kvstore::KVStore* kv) {
LOG(INFO) << "Init root user";
auto encodedPwd = proxygen::md5Encode(folly::StringPiece("nebula"));
auto userKey = MetaKeyUtils::userKey("root");
Expand All @@ -64,18 +65,15 @@ class RootUserMan {
std::vector<kvstore::KV> data;
data.emplace_back(std::move(userKey), std::move(userVal));
data.emplace_back(std::move(roleKey), std::move(roleVal));
bool ret = true;
nebula::cpp2::ErrorCode ec;
folly::Baton<true, std::atomic> baton;
kv->asyncMultiPut(
kDefaultSpaceId, kDefaultPartId, std::move(data), [&](nebula::cpp2::ErrorCode code) {
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(INFO) << "Put failed, error " << static_cast<int32_t>(code);
ret = false;
}
ec = code;
baton.post();
});
baton.wait();
return ret;
return ec;
}
};

Expand Down
3 changes: 3 additions & 0 deletions src/storage/CompactionFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "common/utils/OperationKeyUtils.h"
#include "kvstore/CompactionFilter.h"
#include "storage/CommonUtils.h"
#include "storage/StorageFlags.h"

namespace nebula {
namespace storage {
Expand All @@ -36,6 +37,8 @@ class StorageCompactionFilter final : public kvstore::KVFilter {
return !edgeValid(spaceId, key, val);
} else if (IndexKeyUtils::isIndexKey(key)) {
return !indexValid(spaceId, key, val);
} else if (!FLAGS_use_vertex_key && NebulaKeyUtils::isVertex(key)) {
return true;
} else if (NebulaKeyUtils::isLock(vIdLen_, key)) {
return !lockValid(spaceId, key);
} else {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,5 @@ DEFINE_bool(query_concurrently,
false,
"whether to run query of each part concurrently, only lookup and "
"go are supported");

DEFINE_bool(use_vertex_key, false, "whether allow insert or query the vertex key");
Loading

0 comments on commit 3b117e0

Please sign in to comment.