Skip to content

Commit

Permalink
Merge branch 'master' into small-delete
Browse files Browse the repository at this point in the history
  • Loading branch information
jievince authored Oct 18, 2021
2 parents e0e4878 + 55ed186 commit 63ab756
Show file tree
Hide file tree
Showing 17 changed files with 179 additions and 54 deletions.
2 changes: 1 addition & 1 deletion src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
VLOG(1) << "leader host: " << leader;
VLOG(2) << "leader host: " << leader;

cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
Expand Down
13 changes: 13 additions & 0 deletions src/kvstore/NebulaStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1192,5 +1192,18 @@ ErrorOr<nebula::cpp2::ErrorCode, std::string> NebulaStore::getProperty(
return folly::toJson(obj);
}

void NebulaStore::registerOnNewPartAdded(
const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts) {
for (auto& item : spaces_) {
for (auto& partItem : item.second->parts_) {
existParts.emplace_back(std::make_pair(item.first, partItem.first));
func(partItem.second);
}
}
onNewPartAdded_.insert(std::make_pair(funcName, func));
}

} // namespace kvstore
} // namespace nebula
5 changes: 2 additions & 3 deletions src/kvstore/NebulaStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -279,9 +279,8 @@ class NebulaStore : public KVStore, public Handler {
ErrorOr<nebula::cpp2::ErrorCode, std::string> getProperty(GraphSpaceID spaceId,
const std::string& property) override;
void registerOnNewPartAdded(const std::string& funcName,
std::function<void(std::shared_ptr<Part>&)> func) {
onNewPartAdded_.insert(std::make_pair(funcName, func));
}
std::function<void(std::shared_ptr<Part>&)> func,
std::vector<std::pair<GraphSpaceID, PartitionID>>& existParts);

void unregisterOnNewPartAdded(const std::string& funcName) { onNewPartAdded_.erase(funcName); }

Expand Down
21 changes: 19 additions & 2 deletions src/kvstore/Part.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,18 @@ void Part::asyncRemovePeer(const HostAddr& peer, KVCallback cb) {

void Part::setBlocking(bool sign) { blocking_ = sign; }

void Part::onLostLeadership(TermID term) { VLOG(1) << "Lost the leadership for the term " << term; }
void Part::onLostLeadership(TermID term) {
VLOG(1) << "Lost the leadership for the term " << term;

CallbackOptions opt;
opt.spaceId = spaceId_;
opt.partId = partId_;
opt.term = term_;

for (auto& cb : leaderLostCB_) {
cb(opt);
}
}

void Part::onElected(TermID term) {
VLOG(1) << "Being elected as the leader for the term: " << term;
Expand All @@ -191,7 +202,9 @@ void Part::onLeaderReady(TermID term) {
}
}

void Part::registerOnLeaderReady(LeaderReadyCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }
void Part::registerOnLeaderReady(LeaderChagneCB cb) { leaderReadyCB_.emplace_back(std::move(cb)); }

void Part::registerOnLeaderLost(LeaderChagneCB cb) { leaderLostCB_.emplace_back(std::move(cb)); }

void Part::onDiscoverNewLeader(HostAddr nLeader) {
LOG(INFO) << idStr_ << "Find the new leader " << nLeader;
Expand Down Expand Up @@ -231,6 +244,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
// Make the number of values are an even number
DCHECK_EQ((kvs.size() + 1) / 2, kvs.size() / 2);
for (size_t i = 0; i < kvs.size(); i += 2) {
VLOG(1) << "OP_MULTI_PUT " << folly::hexlify(kvs[i])
<< ", val = " << folly::hexlify(kvs[i + 1]);
auto code = batch->put(kvs[i], kvs[i + 1]);
if (code != nebula::cpp2::ErrorCode::SUCCEEDED) {
LOG(ERROR) << idStr_ << "Failed to call WriteBatch::put()";
Expand Down Expand Up @@ -272,6 +287,8 @@ cpp2::ErrorCode Part::commitLogs(std::unique_ptr<LogIterator> iter, bool wait) {
case OP_BATCH_WRITE: {
auto data = decodeBatchValue(log);
for (auto& op : data) {
VLOG(1) << "OP_BATCH_WRITE: " << folly::hexlify(op.second.first)
<< ", val=" << folly::hexlify(op.second.second);
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
if (op.first == BatchLogType::OP_BATCH_PUT) {
code = batch->put(op.second.first, op.second.second);
Expand Down
9 changes: 6 additions & 3 deletions src/kvstore/Part.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,18 @@ class Part : public raftex::RaftPart {
TermID term;
};

using LeaderReadyCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderReadyCB cb);
using LeaderChagneCB = std::function<void(const CallbackOptions& opt)>;
void registerOnLeaderReady(LeaderChagneCB cb);

void registerOnLeaderLost(LeaderChagneCB cb);

protected:
GraphSpaceID spaceId_;
PartitionID partId_;
std::string walPath_;
NewLeaderCallback newLeaderCb_ = nullptr;
std::vector<LeaderReadyCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderReadyCB_;
std::vector<LeaderChagneCB> leaderLostCB_;

private:
KVEngine* engine_ = nullptr;
Expand Down
4 changes: 4 additions & 0 deletions src/kvstore/raftex/RaftPart.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "common/thrift/ThriftClientManager.h"
#include "common/time/WallClock.h"
#include "interface/gen-cpp2/RaftexServiceAsyncClient.h"
#include "kvstore/LogEncoder.h"
#include "kvstore/raftex/Host.h"
#include "kvstore/raftex/LogStrListIterator.h"
#include "kvstore/wal/FileBasedWal.h"
Expand Down Expand Up @@ -1335,6 +1336,9 @@ void RaftPart::processAskForVoteRequest(const cpp2::AskForVoteRequest& req,
<< " i did not commit when i was leader, rollback to " << lastLogId_;
wal_->rollbackToLog(lastLogId_);
}
if (role_ == Role::LEADER) {
bgWorkers_->addTask([self = shared_from_this(), term] { self->onLostLeadership(term); });
}
role_ = Role::FOLLOWER;
votedAddr_ = candidate;
proposedTerm_ = req.get_term();
Expand Down
67 changes: 53 additions & 14 deletions src/storage/transaction/ChainAddEdgesProcessorLocal.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,19 +47,26 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::prepareLocal() {

auto [pro, fut] = folly::makePromiseContract<Code>();
auto primes = makePrime();
std::vector<kvstore::KV> debugPrimes;
if (FLAGS_trace_toss) {
for (auto& kv : primes) {
VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first);
}
debugPrimes = primes;
}

erasePrime();
env_->kvstore_->asyncMultiPut(
spaceId_, localPartId_, std::move(primes), [p = std::move(pro), this](auto rc) mutable {
spaceId_,
localPartId_,
std::move(primes),
[p = std::move(pro), debugPrimes, this](auto rc) mutable {
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
primeInserted_ = true;
if (FLAGS_trace_toss) {
for (auto& kv : debugPrimes) {
VLOG(1) << uuid_ << " put prime " << folly::hexlify(kv.first);
}
}
} else {
LOG(WARNING) << "kvstore err: " << apache::thrift::util::enumNameSafe(rc);
LOG(WARNING) << uuid_ << "kvstore err: " << apache::thrift::util::enumNameSafe(rc);
}

p.setValue(rc);
Expand All @@ -85,10 +92,14 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
VLOG(1) << uuid_ << " processRemote(), code = " << apache::thrift::util::enumNameSafe(code);
}

bool remoteFailed{true};

if (code == Code::SUCCEEDED) {
// do nothing
remoteFailed = false;
} else if (code == Code::E_RPC_FAILURE) {
code_ = Code::SUCCEEDED;
remoteFailed = false;
} else {
code_ = code;
}
Expand All @@ -106,7 +117,7 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::processLocal(Code code) {
if (code_ == Code::SUCCEEDED) {
return forwardToDelegateProcessor();
} else {
if (primeInserted_) {
if (primeInserted_ && remoteFailed) {
return abort();
}
}
Expand Down Expand Up @@ -142,7 +153,7 @@ bool ChainAddEdgesProcessorLocal::prepareRequest(const cpp2::AddEdgesRequest& re
pushResultCode(nebula::error(part), localPartId_);
return false;
}
localTerm_ = (nebula::value(part))->termId();
restrictTerm_ = (nebula::value(part))->termId();

auto vidLen = env_->schemaMan_->getSpaceVidLen(spaceId_);
if (!vidLen.ok()) {
Expand All @@ -164,7 +175,13 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::forwardToDelegateProcessor(
auto [pro, fut] = folly::makePromiseContract<Code>();
std::move(futProc).thenValue([&, p = std::move(pro)](auto&& resp) mutable {
auto rc = extractRpcError(resp);
if (rc != Code::SUCCEEDED) {
if (rc == Code::SUCCEEDED) {
if (FLAGS_trace_toss) {
for (auto& k : kvErased_) {
VLOG(1) << uuid_ << " erase prime " << folly::hexlify(k);
}
}
} else {
VLOG(1) << uuid_
<< " forwardToDelegateProcessor(), code = " << apache::thrift::util::enumNameSafe(rc);
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
Expand Down Expand Up @@ -194,7 +211,7 @@ void ChainAddEdgesProcessorLocal::doRpc(folly::Promise<Code>&& promise,
auto* iClient = env_->txnMan_->getInternalClient();
folly::Promise<Code> p;
auto f = p.getFuture();
iClient->chainAddEdges(req, localTerm_, edgeVer_, std::move(p));
iClient->chainAddEdges(req, restrictTerm_, edgeVer_, std::move(p));

std::move(f).thenTry([=, p = std::move(promise)](auto&& t) mutable {
auto code = t.hasValue() ? t.value() : Code::E_RPC_FAILURE;
Expand Down Expand Up @@ -229,14 +246,26 @@ folly::SemiFuture<Code> ChainAddEdgesProcessorLocal::abort() {
if (kvErased_.empty()) {
return Code::SUCCEEDED;
}

std::vector<std::string> debugErased;
if (FLAGS_trace_toss) {
debugErased = kvErased_;
}

auto [pro, fut] = folly::makePromiseContract<Code>();
env_->kvstore_->asyncMultiRemove(
req_.get_space_id(),
localPartId_,
std::move(kvErased_),
[p = std::move(pro), this](auto rc) mutable {
[p = std::move(pro), debugErased, this](auto rc) mutable {
VLOG(1) << uuid_ << " abort()=" << apache::thrift::util::enumNameSafe(rc);
if (rc != Code::SUCCEEDED) {
if (rc == Code::SUCCEEDED) {
if (FLAGS_trace_toss) {
for (auto& k : debugErased) {
VLOG(1) << uuid_ << "erase prime " << folly::hexlify(k);
}
}
} else {
addUnfinishedEdge(ResumeType::RESUME_CHAIN);
}
p.setValue(rc);
Expand Down Expand Up @@ -313,9 +342,19 @@ bool ChainAddEdgesProcessorLocal::lockEdges(const cpp2::AddEdgesRequest& req) {
bool ChainAddEdgesProcessorLocal::checkTerm(const cpp2::AddEdgesRequest& req) {
auto space = req.get_space_id();
auto partId = req.get_parts().begin()->first;
auto ret = env_->txnMan_->checkTerm(space, partId, localTerm_);
LOG_IF(WARNING, !ret) << "check term failed, localTerm_ = " << localTerm_;
return ret;

auto part = env_->kvstore_->part(space, partId);
if (!nebula::ok(part)) {
pushResultCode(nebula::error(part), localPartId_);
return false;
}
auto curTerm = (nebula::value(part))->termId();
if (restrictTerm_ != curTerm) {
VLOG(1) << folly::sformat(
"check term failed, restrictTerm_={}, currTerm={}", restrictTerm_, curTerm);
return false;
}
return true;
}

// check if current edge is not newer than the one trying to resume.
Expand Down
3 changes: 2 additions & 1 deletion src/storage/transaction/ChainAddEdgesProcessorLocal.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,8 @@ class ChainAddEdgesProcessorLocal : public BaseProcessor<cpp2::ExecResponse>,
cpp2::AddEdgesRequest req_;
std::unique_ptr<TransactionManager::LockGuard> lk_{nullptr};
int retryLimit_{10};
TermID localTerm_{-1};
// need to restrict all the phase in the same term.
TermID restrictTerm_{-1};
// set to true when prime insert succeed
// in processLocal(), we check this to determine if need to do abort()
bool primeInserted_{false};
Expand Down
20 changes: 16 additions & 4 deletions src/storage/transaction/ChainAddEdgesProcessorRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,16 @@ namespace nebula {
namespace storage {

void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req) {
VLOG(1) << this << ConsistUtil::dumpParts(req.get_parts());
if (FLAGS_trace_toss) {
uuid_ = ConsistUtil::strUUID();
}
VLOG(1) << uuid_ << ConsistUtil::dumpParts(req.get_parts());
auto partId = req.get_parts().begin()->first;
auto code = nebula::cpp2::ErrorCode::SUCCEEDED;
do {
if (!checkTerm(req)) {
LOG(WARNING) << "invalid term, incoming part " << partId << ", term = " << req.get_term();
LOG(WARNING) << uuid_ << " invalid term, incoming part " << partId
<< ", term = " << req.get_term();
code = nebula::cpp2::ErrorCode::E_OUTDATED_TERM;
break;
}
Expand All @@ -35,6 +39,13 @@ void ChainAddEdgesProcessorRemote::process(const cpp2::ChainAddEdgesRequest& req
} while (0);

if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
if (FLAGS_trace_toss) {
// need to do this after set spaceVidLen_
auto keys = getStrEdgeKeys(req);
for (auto& key : keys) {
LOG(INFO) << uuid_ << ", key = " << folly::hexlify(key);
}
}
forwardRequest(req);
} else {
pushResultCode(code, partId);
Expand All @@ -53,13 +64,14 @@ void ChainAddEdgesProcessorRemote::forwardRequest(const cpp2::ChainAddEdgesReque
proc->getFuture().thenValue([=](auto&& resp) {
Code rc = Code::SUCCEEDED;
for (auto& part : resp.get_result().get_failed_parts()) {
rc = part.code;
handleErrorCode(part.code, spaceId, part.get_part_id());
}
VLOG(1) << this << " " << apache::thrift::util::enumNameSafe(rc);
VLOG(1) << uuid_ << " " << apache::thrift::util::enumNameSafe(rc);
this->result_ = resp.get_result();
this->onFinished();
});
proc->process(ConsistUtil::makeDirectAddReq(req));
proc->process(ConsistUtil::toAddEdgesRequest(req));
}

bool ChainAddEdgesProcessorRemote::checkVersion(const cpp2::ChainAddEdgesRequest& req) {
Expand Down
3 changes: 3 additions & 0 deletions src/storage/transaction/ChainAddEdgesProcessorRemote.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ class ChainAddEdgesProcessorRemote : public BaseProcessor<cpp2::ExecResponse> {
void forwardRequest(const cpp2::ChainAddEdgesRequest& req);

std::vector<std::string> getStrEdgeKeys(const cpp2::ChainAddEdgesRequest& req);

private:
std::string uuid_; // for debug purpose
};

} // namespace storage
Expand Down
13 changes: 9 additions & 4 deletions src/storage/transaction/ChainResumeProcessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,23 @@ void ChainResumeProcessor::process() {
auto edgeKey = std::string(it->first.c_str() + sizeof(GraphSpaceID),
it->first.size() - sizeof(GraphSpaceID));
auto partId = NebulaKeyUtils::getPart(edgeKey);
VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex=" << folly::hexlify(edgeKey);
auto prefix = (it->second == ResumeType::RESUME_CHAIN) ? ConsistUtil::primeTable()
: ConsistUtil::doublePrimeTable();
auto key = prefix + edgeKey;
std::string val;
auto rc = env_->kvstore_->get(spaceId, partId, key, &val);
VLOG(1) << "resume edge space=" << spaceId << ", part=" << partId
<< ", hex = " << folly::hexlify(edgeKey)
<< ", rc = " << apache::thrift::util::enumNameSafe(rc);
if (rc == nebula::cpp2::ErrorCode::SUCCEEDED) {
// do nothing
} else if (rc == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
// not leader any more, stop trying resume
env_->txnMan_->delPrime(spaceId, edgeKey);
VLOG(1) << "kvstore->get() leader changed";
auto getPart = env_->kvstore_->part(spaceId, partId);
if (nebula::ok(getPart) && !nebula::value(getPart)->isLeader()) {
// not leader any more, stop trying resume
env_->txnMan_->delPrime(spaceId, edgeKey);
}
continue;
} else {
LOG(WARNING) << "kvstore->get() failed, " << apache::thrift::util::enumNameSafe(rc);
Expand Down
7 changes: 6 additions & 1 deletion src/storage/transaction/ConsistUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ int64_t ConsistUtil::getTimestamp(const std::string& val) noexcept {
return *reinterpret_cast<const int64_t*>(val.data() + (val.size() - sizeof(int64_t)));
}

cpp2::AddEdgesRequest ConsistUtil::makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req) {
cpp2::AddEdgesRequest ConsistUtil::toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req) {
cpp2::AddEdgesRequest ret;
ret.set_space_id(req.get_space_id());
ret.set_parts(req.get_parts());
Expand Down Expand Up @@ -177,6 +177,11 @@ std::pair<int64_t, nebula::cpp2::ErrorCode> ConsistUtil::versionOfUpdateReq(

std::string ConsistUtil::dumpAddEdgeReq(const cpp2::AddEdgesRequest& req) {
std::stringstream oss;
oss << "prop_names.size() = " << req.get_prop_names().size() << " ";
for (auto& name : req.get_prop_names()) {
oss << name << " ";
}
oss << " ";
for (auto& part : req.get_parts()) {
// oss << dumpParts(part.second);
for (auto& edge : part.second) {
Expand Down
2 changes: 1 addition & 1 deletion src/storage/transaction/ConsistUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ConsistUtil final {

static int64_t getTimestamp(const std::string& val) noexcept;

static cpp2::AddEdgesRequest makeDirectAddReq(const cpp2::ChainAddEdgesRequest& req);
static cpp2::AddEdgesRequest toAddEdgesRequest(const cpp2::ChainAddEdgesRequest& req);

static cpp2::EdgeKey reverseEdgeKey(const cpp2::EdgeKey& edgeKey);

Expand Down
Loading

0 comments on commit 63ab756

Please sign in to comment.