Skip to content

Commit

Permalink
do some clean
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Sep 26, 2021
1 parent 2361066 commit bf09973
Show file tree
Hide file tree
Showing 16 changed files with 30 additions and 201 deletions.
9 changes: 0 additions & 9 deletions src/graph/validator/MutateValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,10 +191,6 @@ Status InsertEdgesValidator::check() {
}

Status InsertEdgesValidator::prepareEdges() {
// using IsoLevel = meta::cpp2::IsolationLevel;
// auto isoLevel = space_.spaceDesc.isolation_level_ref().value_or(IsoLevel::DEFAULT);
// auto useToss = isoLevel == IsoLevel::TOSS;
// auto size = useToss ? rows_.size() : rows_.size() * 2;
auto size = FLAGS_enable_experimental_feature ? rows_.size() : rows_.size() * 2;
edges_.reserve(size);
for (auto i = 0u; i < rows_.size(); i++) {
Expand Down Expand Up @@ -244,7 +240,6 @@ Status InsertEdgesValidator::prepareEdges() {
edge.set_key(key);
edge.set_props(std::move(props));
edges_.emplace_back(edge);
// if (!useToss) {
if (!FLAGS_enable_experimental_feature) {
// inbound
key.set_src(dstId);
Expand Down Expand Up @@ -761,10 +756,6 @@ Status UpdateEdgeValidator::toPlan() {
{},
condition_,
{});
// using IsoLevel = meta::cpp2::IsolationLevel;
// auto isoLevel = space_.spaceDesc.isolation_level_ref().value_or(IsoLevel::DEFAULT);
// auto useToss = isoLevel == IsoLevel::TOSS;
// if (useToss) {
if (FLAGS_enable_experimental_feature) {
root_ = outNode;
tail_ = root_;
Expand Down
38 changes: 0 additions & 38 deletions src/kvstore/test/RocksEngineTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,44 +21,6 @@ namespace kvstore {

const int32_t kDefaultVIdLen = 8;

DEFINE_string(db_path, "hello world", "");
DEFINE_int32(space_id, 1, "");
DEFINE_int32(part_id, 1, "");
DEFINE_int32(vid_len, 8, "");
DEFINE_int32(srcId, 72, "");
DEFINE_int32(edge_type, 5, "");
DEFINE_int32(rank, 0, "");
DEFINE_int32(dstId, 32, "");

std::string toString(int32_t vid) {
std::string key;
key.append(reinterpret_cast<const char*>(&vid), sizeof(vid))
.append(FLAGS_vid_len - sizeof(vid), '\0');
return key;
}

// TEST(RocksEngineTest, GetValueFromDB) {
// LOG(INFO) << "FLAGS_db_path=" << FLAGS_db_path;
// LOG(INFO) << "FLAGS_space_id=" << FLAGS_space_id;
// LOG(INFO) << "FLAGS_part_id=" << FLAGS_part_id;
// LOG(INFO) << "FLAGS_vid_len=" << FLAGS_vid_len;
// LOG(INFO) << "FLAGS_srcId=" << FLAGS_srcId;
// LOG(INFO) << "FLAGS_edge_type=" << FLAGS_edge_type;
// LOG(INFO) << "FLAGS_rank=" << FLAGS_rank;
// LOG(INFO) << "FLAGS_dstId=" << FLAGS_dstId;

// auto engine = std::make_unique<RocksEngine>(FLAGS_space_id, FLAGS_vid_len, FLAGS_db_path);
// auto strSrc = toString(FLAGS_srcId);
// auto strDst = toString(FLAGS_dstId);
// auto strEdgeKey = NebulaKeyUtils::edgeKey(
// FLAGS_vid_len, FLAGS_part_id, strSrc, FLAGS_edge_type, FLAGS_rank, strDst);
// LOG(INFO) << "hex strEdgeKey=" << folly::hexlify(strEdgeKey);
// std::string val;
// auto rc = engine->get(strEdgeKey, &val);
// LOG(INFO) << "rc = " << apache::thrift::util::enumNameSafe(rc);
// LOG(INFO) << "val.size()=" << val.size() << ", val=" << folly::hexlify(val);
// }

TEST(RocksEngineTest, SimpleTest) {
fs::TempDir rootPath("/tmp/rocksdb_engine_SimpleTest.XXXXXX");
auto engine = std::make_unique<RocksEngine>(0, kDefaultVIdLen, rootPath.path());
Expand Down
2 changes: 1 addition & 1 deletion src/meta/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -161,5 +161,5 @@ set(meta_test_deps
)

nebula_add_subdirectory(http)
# nebula_add_subdirectory(test)
nebula_add_subdirectory(test)
nebula_add_subdirectory(upgrade)
2 changes: 1 addition & 1 deletion src/storage/StorageFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ DEFINE_uint64(default_mvcc_ver,

DEFINE_string(reader_handlers_type, "cpu", "Type of reader handlers, options: cpu,io");

DEFINE_bool(trace_toss, true, "output verbose log of toss");
DEFINE_bool(trace_toss, false, "output verbose log of toss");

DEFINE_int32(max_edge_returned_per_vertex, INT_MAX, "Max edge number returnred searching vertex");

Expand Down
1 change: 1 addition & 0 deletions src/storage/mutate/AddEdgesProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class AddEdgesProcessor : public BaseProcessor<cpp2::ExecResponse> {
std::vector<std::shared_ptr<nebula::meta::cpp2::IndexItem>> indexes_;
bool ifNotExists_{false};

/// this is a hook function to keep out-edge and in-edge consist
using ConsistOper = std::function<void(kvstore::BatchHolder&, std::vector<kvstore::KV>*)>;
folly::Optional<ConsistOper> consistOp_;
};
Expand Down
5 changes: 0 additions & 5 deletions src/storage/mutate/UpdateEdgeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,6 @@ class UpdateEdgeProcessor
void onProcessFinished() override;

std::vector<Expression*> getReturnPropsExp() {
// std::vector<Expression*> result;
// result.resize(returnPropsExp_.size());
// auto get = [] (auto &ptr) {return ptr.get(); };
// std::transform(returnPropsExp_.begin(), returnPropsExp_.end(), result.begin(), get);
// return result;
return returnPropsExp_;
}
void profilePlan(StoragePlan<cpp2::EdgeKey>& plan) {
Expand Down
13 changes: 7 additions & 6 deletions src/storage/test/ChainResumeEdgeTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ TEST(ChainResumeEdgesTest, resumeTest1) {
auto* iClient = FakeInternalStorageClient::instance(env);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_EQ(334, numOfKey(req, gTestUtil.genKey, env));
EXPECT_EQ(0, numOfKey(req, gTestUtil.genPrime, env));
Expand Down Expand Up @@ -158,6 +158,7 @@ TEST(ChainResumeEdgesTest, resumePrimeTest3) {
auto error = nebula::cpp2::ErrorCode::E_RPC_FAILURE;
auto* iClient = FakeInternalStorageClient::instance(env, error);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
env->txnMan_->scanPrimes(1, 1);
ChainResumeProcessor resumeProc(env);
resumeProc.process();

Expand Down Expand Up @@ -287,7 +288,7 @@ TEST(ChainResumeEdgesTest, resumeTest6) {
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
env->txnMan_->scanPrimes(1, 1);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_EQ(334, numOfKey(req, util.genKey, env));
EXPECT_EQ(0, numOfKey(req, util.genPrime, env));
Expand Down Expand Up @@ -328,7 +329,7 @@ TEST(ChainUpdateEdgeTest, resumeTest7) {
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
env->txnMan_->scanPrimes(1, 1);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_TRUE(helper.edgeExist(env, req));
EXPECT_FALSE(helper.primeExist(env, req));
Expand Down Expand Up @@ -368,7 +369,7 @@ TEST(ChainUpdateEdgeTest, resumeTest8) {
iClient->setErrorCode(Code::E_UNKNOWN);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_TRUE(helper.edgeExist(env, req));
EXPECT_TRUE(helper.primeExist(env, req));
Expand Down Expand Up @@ -409,7 +410,7 @@ TEST(ChainUpdateEdgeTest, resumeTest9) {
iClient->setErrorCode(Code::E_RPC_FAILURE);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_TRUE(helper.edgeExist(env, req));
EXPECT_FALSE(helper.primeExist(env, req));
Expand Down Expand Up @@ -447,7 +448,7 @@ TEST(ChainUpdateEdgeTest, resumeTest10) {
auto* iClient = FakeInternalStorageClient::instance(env);
FakeInternalStorageClient::hookInternalStorageClient(env, iClient);
ChainResumeProcessor resumeProc(env);
resumeProc.process2();
resumeProc.process();

EXPECT_TRUE(helper.edgeExist(env, req));
EXPECT_FALSE(helper.primeExist(env, req));
Expand Down
29 changes: 16 additions & 13 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,12 @@ void ChainAddEdgesProcessorLocal::process(const cpp2::AddEdgesRequest& req) {
* 3. write edge prime(key = edge prime, val = )
*/
folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {
uuid_ = ConsistUtil::strUUID();
readableEdgeDesc_ = makeReadableEdge(req_);
if (!readableEdgeDesc_.empty()) {
uuid_.append(" ").append(readableEdgeDesc_);
if (FLAGS_trace_toss) {
uuid_ = ConsistUtil::strUUID();
readableEdgeDesc_ = makeReadableEdge(req_);
if (!readableEdgeDesc_.empty()) {
uuid_.append(" ").append(readableEdgeDesc_);
}
}

if (!lockEdges(req_)) {
Expand All @@ -47,7 +49,7 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {
auto primes = makePrime();
if (FLAGS_trace_toss) {
for (auto& kv : primes) {
LOG(INFO) << uuid_ << " put prime " << folly::hexlify(kv.first);
VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first);
}
}

Expand All @@ -62,7 +64,7 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {
}

folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processRemote(Code code) {
LOG(INFO) << uuid_ << " prepareLocal(), code = " << apache::thrift::util::enumNameSafe(code);
VLOG(1) << uuid_ << " prepareLocal(), code = " << apache::thrift::util::enumNameSafe(code);
if (code != Code::SUCCEEDED) {
return code;
}
Expand All @@ -75,7 +77,9 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processRemote(Code code) {
}

folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
LOG(INFO) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code);
if (FLAGS_trace_toss) {
VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code);
}

if (code == Code::SUCCEEDED) {
// do nothing
Expand Down Expand Up @@ -126,7 +130,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
spaceVidType_ = vidType.value();
}
localPartId_ = req.get_parts().begin()->first;
// replaceNullWithDefaultValue(req_);
replaceNullWithDefaultValue(req_);
auto part = env_->kvstore_->part(spaceId_, localPartId_);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
Expand Down Expand Up @@ -155,8 +159,8 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::forwardToDelegateProcessor(
std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable {
auto rc = extractRpcError(resp);
if (rc != Code::SUCCEEDED) {
LOG(INFO) << uuid_ << " forwardToDelegateProcessor(), code = "
<< apache::thrift::util::enumNameSafe(rc);
VLOG(1) << uuid_
<< " forwardToDelegateProcessor(), code = " << apache::thrift::util::enumNameSafe(rc);
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
}
p.setValue(rc);
Expand Down Expand Up @@ -225,7 +229,7 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::abort() {
localPartId_,
std::move(kvErased_),
[p = std::move(pro), this](auto rc) mutable {
LOG(INFO) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc);
VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc);
if (rc != Code::SUCCEEDED) {
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
}
Expand Down Expand Up @@ -350,8 +354,7 @@ cpp2::AddEdgesRequest ChainAddEdgesProcessorLocal::reverseRequest(
}

void ChainAddEdgesProcessorLocal::finish() {
LOG_IF(INFO, FLAGS_trace_toss) << uuid_ << " commitLocal(), code_ = "
<< apache::thrift::util::enumNameSafe(code_);
VLOG(1) << uuid_ << " commitLocal(), code_ = " << apache::thrift::util::enumNameSafe(code_);
pushResultCode(code_, localPartId_);
finished_.setValue(code_);
onFinished();
Expand Down
55 changes: 3 additions & 52 deletions src/storage/transaction/ChainResumeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,71 +16,22 @@ namespace nebula {
namespace storage {

void ChainResumeProcessor::process() {
std::unordered_map<GraphSpaceID, std::vector<meta::cpp2::LeaderInfo>> leaders;
if (env_->kvstore_->allLeader(leaders) == 0) {
LOG(INFO) << "no leader found, skip any resume process";
return;
}
std::unique_ptr<kvstore::KVIterator> iter;
for (auto& leader : leaders) {
auto spaceId = leader.first;
for (auto& partInfo : leader.second) {
auto partId = partInfo.get_part_id();
auto prefix = ConsistUtil::primePrefix(partId);
auto rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
break;
}
for (; iter->valid(); iter->next()) {
// if (needResume(spaceId, iter->key())) {
LOG(INFO) << "resume prime " << folly::hexlify(iter->key());
ResumeOptions opt(ResumeType::RESUME_CHAIN, iter->val().str());
auto* proc = ChainProcessorFactory::makeProcessor(env_, opt);
// futs.emplace_back(proc->getFinished());
auto fut = proc->getFinished();
env_->txnMan_->addChainTask(proc);
std::move(fut).get();
}

// prefix = ConsistUtil::doublePrimePrefix(partId);
// rc = env_->kvstore_->prefix(spaceId, partId, prefix, &iter);
// if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
// break;
// }
// for (; iter->valid(); iter->next()) {
// if (needResume(spaceId, iter->key())) {
// LOG(INFO) << "resume double prime " << folly::hexlify(iter->key());
// ResumeOptions opt(ResumeType::RESUME_REMOTE, iter->val().str());
// auto* proc = ChainProcessorFactory::makeProcessor(env_, opt);
// futs.emplace_back(proc->getFinished());
// env_->txnMan_->addChainTask(proc);
// } else {
// LOG(INFO) << "skip double prime " << folly::hexlify(iter->key());
// }
// }
// break;
}
// break;
}
}

void ChainResumeProcessor::process2() {
auto* table = env_->txnMan_->getReserveTable();
std::unique_ptr<kvstore::KVIterator> iter;
for (auto it = table->begin(); it != table->end(); ++it) {
auto spaceId = *reinterpret_cast<GraphSpaceID*>(const_cast<char*>(it->first.c_str()));
auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID),
it->first.size() - sizeof(GraphSpaceID));
auto partId = NebulaKeyUtils::getPart(edgeKey);
LOG(INFO) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex=" << folly::hexlify(edgeKey);
VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex=" << folly::hexlify(edgeKey);
auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable()
: ConsistUtil::doublePrimeTable();
auto key = prefix + edgeKey;
std::string val;
auto rc = env_->kvstore_->get(spaceId, partId, key, &val);
if (rc != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(WARNING) << "kvstore->get() failed, hex=" << apache::thrift::util::enumNameSafe(rc);
LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc);
continue;
}
ResumeOptions opt(it->second, val);
Expand Down
2 changes: 0 additions & 2 deletions src/storage/transaction/ChainResumeProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ class ChainResumeProcessor {

void process();

void process2();

private:
StorageEnv* env_{nullptr};
};
Expand Down
1 change: 0 additions & 1 deletion src/storage/transaction/ConsistUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ RequestType ConsistUtil::parseType(folly::StringPiece val) {
return RequestType::INSERT;
default:
LOG(FATAL) << "shoule not happend, identifier is " << identifier;
// return RequestType::UNKNOWN;
}
}

Expand Down
6 changes: 0 additions & 6 deletions src/storage/transaction/ConsistUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,6 @@ enum class RequestType {
UPDATE,
};

enum class ChainProcessType {
NORMAL = 0,
RESUME_CHAIN = 1,
RESUME_REMOTE = 2,
};

enum class ResumeType {
UNKNOWN = 0,
RESUME_CHAIN,
Expand Down
Loading

0 comments on commit bf09973

Please sign in to comment.