Skip to content

Commit

Permalink
Merge branch 'master' into user_meta
Browse files Browse the repository at this point in the history
  • Loading branch information
boshengchen authored May 31, 2019
2 parents dbf79e2 + b24ca04 commit ec4e59d
Show file tree
Hide file tree
Showing 47 changed files with 1,052 additions and 884 deletions.
1 change: 1 addition & 0 deletions src/common/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ add_subdirectory(thread)
add_subdirectory(process)
add_subdirectory(stats)
add_subdirectory(filter)
add_subdirectory(test)
3 changes: 2 additions & 1 deletion src/common/base/Configuration.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ Status Configuration::parseFromFile(const std::string &filename) {
// read the whole content
// TODO(dutor) ::read might be interrupted by signals
auto buffer = std::make_unique<char[]>(len + 1);
::read(fd, buffer.get(), len);
auto charsRead = ::read(fd, buffer.get(), len);
UNUSED(charsRead);
buffer[len] = '\0';
// strip off all comments
static const std::regex comment("//.*|#.*");
Expand Down
12 changes: 8 additions & 4 deletions src/common/filter/Expressions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1047,12 +1047,16 @@ std::string LogicalExpression::toString() const {


VariantType LogicalExpression::eval() const {
auto left = left_->eval();
auto right = right_->eval();
if (op_ == AND) {
return asBool(left) && asBool(right);
if (!asBool(left_->eval())) {
return false;
}
return asBool(right_->eval());
} else {
return asBool(left) || asBool(right);
if (asBool(left_->eval())) {
return true;
}
return asBool(right_->eval());
}
}

Expand Down
11 changes: 10 additions & 1 deletion src/common/network/NetworkUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,16 @@ bool NetworkUtils::getDynamicPortRange(uint16_t& low, uint16_t& high) {
return false;
}

fscanf(pipe, "%hu %hu", &low, &high);
if (fscanf(pipe, "%hu %hu", &low, &high) != 2) {
LOG(ERROR) << "Failed to read from /proc/sys/net/ipv4/ip_local_port_range";
// According to ICANN, the port range is devided into three sections
//
// Well-known ports: 0 to 1023 (used for system services)
// Registered/user ports: 1024 to 49151
// Dynamic/private ports: 49152 to 65535
low = 49152;
high = 65535;
}

if (pclose(pipe) < 0) {
LOG(ERROR) << "Failed to close the pipe: " << strerror(errno);
Expand Down
3 changes: 2 additions & 1 deletion src/common/network/test/NetworkUtilsTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ TEST(NetworkUtils, getHostname) {

FILE* fp = popen("LD_PRELOAD= hostname | tr -d ['\n']", "r");
char buffer[256];
fgets(buffer, sizeof(buffer), fp);
auto numChars = fgets(buffer, sizeof(buffer), fp);
UNUSED(numChars);
pclose(fp);
EXPECT_EQ(std::string(buffer), hostname);
}
Expand Down
3 changes: 2 additions & 1 deletion src/common/process/test/ProcessTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ TEST(ProcessUtils, getExeCWD) {
auto result = ProcessUtils::getExeCWD();
ASSERT_TRUE(result.ok()) << result.status();
char buffer[PATH_MAX];
::getcwd(buffer, sizeof(buffer));
auto len = ::getcwd(buffer, sizeof(buffer));
UNUSED(len);
ASSERT_EQ(buffer, result.value());
}

Expand Down
14 changes: 14 additions & 0 deletions src/common/test/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
add_executable(
test_serverContext_test
TestServerContext.cpp
$<TARGET_OBJECTS:thrift_obj>
$<TARGET_OBJECTS:base_obj>
$<TARGET_OBJECTS:thread_obj>
)
nebula_link_libraries(
test_serverContext_test
${THRIFT_LIBRARIES}
wangle
gtest
)
nebula_add_test(test_serverContext_test)
62 changes: 62 additions & 0 deletions src/common/test/ServerContext.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/
#ifndef COMMON_TEST_SERVERCONTEXT_H_
#define COMMON_TEST_SERVERCONTEXT_H_

#include "base/Base.h"
#include "thread/NamedThread.h"
#include "kvstore/KVStore.h"
#include "meta/client/MetaClient.h"
#include <thrift/lib/cpp2/server/ThriftServer.h>

namespace nebula {
namespace test {

struct ServerContext {
~ServerContext() {
if (server_ != nullptr) {
server_->stop();
}
if (thread_ != nullptr) {
thread_->join();
}
KVStore_ = nullptr;
server_ = nullptr;
thread_ = nullptr;
VLOG(3) << "~ServerContext";
}

std::unique_ptr<apache::thrift::ThriftServer> server_{nullptr};
std::unique_ptr<thread::NamedThread> thread_{nullptr};
// To keep meta and storage's KVStore
std::unique_ptr<kvstore::KVStore> KVStore_{nullptr};
uint16_t port_{0};
};

static void mockCommon(test::ServerContext *sc,
const std::string &name,
uint16_t port,
std::shared_ptr<apache::thrift::ServerInterface> handler) {
if (nullptr == sc) {
LOG(ERROR) << "ServerContext is nullptr";
return;
}
sc->server_ = std::make_unique<apache::thrift::ThriftServer>();
sc->server_->setInterface(std::move(handler));
sc->server_->setPort(port);
sc->thread_ = std::make_unique<thread::NamedThread>(name, [&]() {
sc->server_->serve();
LOG(INFO) << "Stop the server...";
});
while (!sc->server_->getServeEventBase() ||
!sc->server_->getServeEventBase()->isRunning()) {
}
sc->port_ = sc->server_->getAddress().getPort();
}

} // namespace test
} // namespace nebula
#endif // COMMON_TEST_SERVERCONTEXT_H_
39 changes: 39 additions & 0 deletions src/common/test/TestServerContext.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/* Copyright (c) 2019 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

#include "base/Base.h"
#include <gtest/gtest.h>
#include "test/ServerContext.h"


namespace nebula {
namespace test {

class TestServer final : public apache::thrift::ServerInterface {
public:
TestServer() {}
~TestServer() {}
std::unique_ptr<apache::thrift::AsyncProcessor> getProcessor() override {
return nullptr;
}
};

TEST(ServerContext, mockCommon) {
auto sc = std::make_unique<ServerContext>();
auto handler = std::make_shared<TestServer>();
test::mockCommon(sc.get(), "test", 0, handler);
}

} // namespace test
} // namespace nebula

int main(int argc, char **argv) {
::testing::InitGoogleTest(&argc, argv);
folly::init(&argc, &argv, true);
google::SetStderrLogging(google::INFO);

return RUN_ALL_TESTS();
}
9 changes: 7 additions & 2 deletions src/graph/DescribeEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,16 @@ Status DescribeEdgeExecutor::prepare() {
void DescribeEdgeExecutor::execute() {
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto edgeType = ectx()->schemaManager()->toEdgeType(spaceId, *name);
auto status = ectx()->schemaManager()->toEdgeType(spaceId, *name);
if (!status.ok()) {
onError_(Status::Error("Schema not found for edge '%s'", name->c_str()));
return;
}
auto edgeType = status.value();
auto schema = ectx()->schemaManager()->getEdgeSchema(spaceId, edgeType);
resp_ = std::make_unique<cpp2::ExecutionResponse>();
if (schema == nullptr) {
onError_(Status::Error("Schema not found for edge `%s'", name->c_str()));
onError_(Status::Error("Schema not found for edge '%s'", name->c_str()));
return;
}

Expand Down
7 changes: 6 additions & 1 deletion src/graph/DescribeSpaceExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,12 @@ Status DescribeSpaceExecutor::prepare() {

void DescribeSpaceExecutor::execute() {
auto *name = sentence_->spaceName();
auto spaceId = ectx()->schemaManager()->toGraphSpaceID(*name);
auto status = ectx()->schemaManager()->toGraphSpaceID(*name);
if (!status.ok()) {
onError_(Status::Error("Space not found"));
return;
}
auto spaceId = status.value();
auto future = ectx()->getMetaClient()->getSpace(spaceId);
auto *runner = ectx()->rctx()->runner();

Expand Down
9 changes: 7 additions & 2 deletions src/graph/DescribeTagExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,17 @@ Status DescribeTagExecutor::prepare() {
void DescribeTagExecutor::execute() {
auto *name = sentence_->name();
auto spaceId = ectx()->rctx()->session()->space();
auto tagId = ectx()->schemaManager()->toTagID(spaceId, *name);
auto status = ectx()->schemaManager()->toTagID(spaceId, *name);
if (!status.ok()) {
onError_(Status::Error("Schema not found for tag '%s'", name->c_str()));
return;
}
auto tagId = status.value();
auto schema = ectx()->schemaManager()->getTagSchema(spaceId, tagId);

resp_ = std::make_unique<cpp2::ExecutionResponse>();
if (schema == nullptr) {
onError_(Status::Error("Schema not found for tag `%s'", name->c_str()));
onError_(Status::Error("Schema not found for tag '%s'", name->c_str()));
return;
}

Expand Down
39 changes: 32 additions & 7 deletions src/graph/GoExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,12 @@ Status GoExecutor::prepareOver() {
LOG(FATAL) << "Over clause shall never be null";
}
auto spaceId = ectx()->rctx()->session()->space();
edgeType_ = ectx()->schemaManager()->toEdgeType(spaceId, *clause->edge());
auto edgeStatus = ectx()->schemaManager()->toEdgeType(spaceId, *clause->edge());
if (!edgeStatus.ok()) {
status = edgeStatus.status();
break;
}
edgeType_ = edgeStatus.value();
reversely_ = clause->isReversely();
if (clause->alias() != nullptr) {
expCtx_->addAlias(*clause->alias(), AliasKind::Edge, *clause->edge());
Expand Down Expand Up @@ -234,7 +239,13 @@ void GoExecutor::setupResponse(cpp2::ExecutionResponse &resp) {

void GoExecutor::stepOut() {
auto spaceId = ectx()->rctx()->session()->space();
auto returns = getStepOutProps();
auto status = getStepOutProps();
if (!status.ok()) {
DCHECK(onError_);
onError_(Status::Error("Get step out props failed"));
return;
}
auto returns = status.value();
auto future = ectx()->storage()->getNeighbors(spaceId,
starts_,
edgeType_,
Expand Down Expand Up @@ -336,7 +347,7 @@ void GoExecutor::finishExecution(RpcResponse &&rpcResp) {
}


std::vector<storage::cpp2::PropDef> GoExecutor::getStepOutProps() const {
StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getStepOutProps() const {
std::vector<storage::cpp2::PropDef> props;
{
storage::cpp2::PropDef pd;
Expand All @@ -354,7 +365,11 @@ std::vector<storage::cpp2::PropDef> GoExecutor::getStepOutProps() const {
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::SOURCE;
pd.name = tagProp.second;
auto tagId = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
auto status = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
if (!status.ok()) {
return Status::Error("No schema found for '%s'", tagProp.first);
}
auto tagId = status.value();
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
}
Expand All @@ -369,14 +384,18 @@ std::vector<storage::cpp2::PropDef> GoExecutor::getStepOutProps() const {
}


std::vector<storage::cpp2::PropDef> GoExecutor::getDstProps() const {
StatusOr<std::vector<storage::cpp2::PropDef>> GoExecutor::getDstProps() const {
std::vector<storage::cpp2::PropDef> props;
auto spaceId = ectx()->rctx()->session()->space();
for (auto &tagProp : expCtx_->dstTagProps()) {
storage::cpp2::PropDef pd;
pd.owner = storage::cpp2::PropOwner::DEST;
pd.name = tagProp.second;
auto tagId = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
auto status = ectx()->schemaManager()->toTagID(spaceId, tagProp.first);
if (!status.ok()) {
return Status::Error("No schema found for '%s'", tagProp.first);
}
auto tagId = status.value();
pd.set_tag_id(tagId);
props.emplace_back(std::move(pd));
}
Expand All @@ -386,7 +405,13 @@ std::vector<storage::cpp2::PropDef> GoExecutor::getDstProps() const {

void GoExecutor::fetchVertexProps(std::vector<VertexID> ids, RpcResponse &&rpcResp) {
auto spaceId = ectx()->rctx()->session()->space();
auto returns = getDstProps();
auto status = getDstProps();
if (!status.ok()) {
DCHECK(onError_);
onError_(Status::Error("Get dest props failed"));
return;
}
auto returns = status.value();
auto future = ectx()->storage()->getVertexProps(spaceId, ids, returns);
auto *runner = ectx()->rctx()->runner();
auto cb = [this, stepOutResp = std::move(rpcResp)] (auto &&result) mutable {
Expand Down
4 changes: 2 additions & 2 deletions src/graph/GoExecutor.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ class GoExecutor final : public TraverseExecutor {
*/
void onVertexProps(RpcResponse &&rpcResp);

std::vector<storage::cpp2::PropDef> getStepOutProps() const;
std::vector<storage::cpp2::PropDef> getDstProps() const;
StatusOr<std::vector<storage::cpp2::PropDef>> getStepOutProps() const;
StatusOr<std::vector<storage::cpp2::PropDef>> getDstProps() const;

void fetchVertexProps(std::vector<VertexID> ids, RpcResponse &&rpcResp);

Expand Down
38 changes: 24 additions & 14 deletions src/graph/InsertEdgeExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,31 @@ InsertEdgeExecutor::InsertEdgeExecutor(Sentence *sentence,


Status InsertEdgeExecutor::prepare() {
auto status = checkIfGraphSpaceChosen();
if (!status.ok()) {
return status;
}
Status status;
do {
status = checkIfGraphSpaceChosen();
if (!status.ok()) {
break;
}

auto spaceId = ectx()->rctx()->session()->space();
overwritable_ = sentence_->overwritable();
edgeType_ = ectx()->schemaManager()->toEdgeType(spaceId, *sentence_->edge());
properties_ = sentence_->properties();
rows_ = sentence_->rows();
schema_ = ectx()->schemaManager()->getEdgeSchema(spaceId, edgeType_);
if (schema_ == nullptr) {
return Status::Error("No schema found for `%s'", sentence_->edge()->c_str());
}
return Status::OK();
auto spaceId = ectx()->rctx()->session()->space();
overwritable_ = sentence_->overwritable();
auto edgeStatus = ectx()->schemaManager()->toEdgeType(spaceId, *sentence_->edge());
if (!edgeStatus.ok()) {
status = edgeStatus.status();
break;
}
edgeType_ = edgeStatus.value();
properties_ = sentence_->properties();
rows_ = sentence_->rows();
schema_ = ectx()->schemaManager()->getEdgeSchema(spaceId, edgeType_);
if (schema_ == nullptr) {
status = Status::Error("No schema found for '%s'", sentence_->edge()->c_str());
break;
}
} while (false);

return status;
}


Expand Down
Loading

0 comments on commit ec4e59d

Please sign in to comment.