Skip to content

Commit

Permalink
cherry-pick base toss
Browse files Browse the repository at this point in the history
  • Loading branch information
liuyu85cn committed Sep 16, 2021
1 parent e204786 commit bae49d7
Show file tree
Hide file tree
Showing 83 changed files with 4,963 additions and 3,768 deletions.
7 changes: 3 additions & 4 deletions src/clients/meta/MetaClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2247,17 +2247,16 @@ bool MetaClient::checkShadowAccountFromCache(const std::string& account) const {
return false;
}

TermID MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) const {
static TermID notFound = -1;
StatusOr<TermID> MetaClient::getTermFromCache(GraphSpaceID spaceId, PartitionID partId) const {
folly::RWSpinLock::ReadHolder holder(localCacheLock_);
auto spaceInfo = localCache_.find(spaceId);
if (spaceInfo == localCache_.end()) {
return notFound;
return Status::Error("Term not found!");
}

auto termInfo = spaceInfo->second->termOfPartition_.find(partId);
if (termInfo == spaceInfo->second->termOfPartition_.end()) {
return notFound;
return Status::Error("Term not found!");
}

return termInfo->second;
Expand Down
12 changes: 10 additions & 2 deletions src/clients/meta/MetaClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
DECLARE_int32(meta_client_retry_times);
DECLARE_int32(heartbeat_interval_secs);

namespace nebula {
namespace storage {
class MetaClientTestUpdater;
} // namespace storage
} // namespace nebula

namespace nebula {
namespace meta {

Expand Down Expand Up @@ -182,6 +188,8 @@ class MetaClient {
FRIEND_TEST(MetaClientTest, RetryUntilLimitTest);
FRIEND_TEST(MetaClientTest, RocksdbOptionsTest);
friend class KillQueryMetaWrapper;
FRIEND_TEST(ChainAddEdgesTest, AddEdgesLocalTest);
friend class storage::MetaClientTestUpdater;

public:
MetaClient(std::shared_ptr<folly::IOThreadPoolExecutor> ioThreadPool,
Expand Down Expand Up @@ -552,9 +560,9 @@ class MetaClient {

bool authCheckFromCache(const std::string& account, const std::string& password) const;

bool checkShadowAccountFromCache(const std::string& account) const;
StatusOr<TermID> getTermFromCache(GraphSpaceID spaceId, PartitionID) const;

TermID getTermFromCache(GraphSpaceID spaceId, PartitionID) const;
bool checkShadowAccountFromCache(const std::string& account) const;

StatusOr<std::vector<HostAddr>> getStorageHosts() const;

Expand Down
2 changes: 1 addition & 1 deletion src/clients/storage/GraphStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ folly::SemiFuture<StorageRpcResponse<cpp2::ExecResponse>> GraphStorageClient::ad
evb,
std::move(requests),
[=](cpp2::GraphStorageServiceAsyncClient* client, const cpp2::AddEdgesRequest& r) {
return useToss ? client->future_addEdgesAtomic(r) : client->future_addEdges(r);
return useToss ? client->future_chainAddEdges(r) : client->future_addEdges(r);
});
}

Expand Down
168 changes: 77 additions & 91 deletions src/clients/storage/InternalStorageClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ namespace nebula {
namespace storage {

template <typename T>
nebula::cpp2::ErrorCode extractErrorCode(T& tryResp) {
::nebula::cpp2::ErrorCode getErrorCode(T& tryResp) {
if (!tryResp.hasValue()) {
LOG(ERROR) << tryResp.exception().what();
return nebula::cpp2::ErrorCode::E_RPC_FAILURE;
Expand All @@ -37,112 +37,98 @@ nebula::cpp2::ErrorCode extractErrorCode(T& tryResp) {
return nebula::cpp2::ErrorCode::SUCCEEDED;
}

StatusOr<HostAddr> InternalStorageClient::getFuzzyLeader(GraphSpaceID spaceId,
PartitionID partId) const {
return getLeader(spaceId, partId);
}

folly::SemiFuture<nebula::cpp2::ErrorCode> InternalStorageClient::forwardTransaction(
int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb) {
auto c = folly::makePromiseContract<nebula::cpp2::ErrorCode>();
forwardTransactionImpl(txnId, spaceId, partId, std::move(data), std::move(c.first), evb);
return std::move(c.second);
}

void InternalStorageClient::forwardTransactionImpl(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<nebula::cpp2::ErrorCode> p,
folly::EventBase* evb) {
VLOG(1) << "forwardTransactionImpl txnId=" << txnId;
auto statusOrLeader = getFuzzyLeader(spaceId, partId);
if (!statusOrLeader.ok()) {
p.setValue(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
void InternalStorageClient::chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedRequest,
TermID termOfSrc,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = reversedRequest.get_space_id();
auto partId = reversedRequest.get_part_id();
auto optLeader = getLeader(spaceId, partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}. ", spaceId, partId)
<< optLeader.status();
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& dest = statusOrLeader.value();
dest.port += kInternalPortOffset;

cpp2::InternalTxnRequest interReq;
interReq.set_txn_id(txnId);
interReq.set_space_id(spaceId);
interReq.set_part_id(partId);
interReq.set_position(1);
(*interReq.data_ref()).resize(2);
(*interReq.data_ref()).back().emplace_back(data);
getResponse(
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;

cpp2::ChainUpdateEdgeRequest chainReq;
chainReq.set_update_edge_request(reversedRequest);
chainReq.set_term(termOfSrc);
if (optVersion) {
chainReq.set_edge_version(optVersion.value());
}
auto resp = getResponse(
evb,
std::make_pair(dest, interReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::InternalTxnRequest& r) {
return client->future_forwardTransaction(r);
})
.thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = extractErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return forwardTransactionImpl(txnId, spaceId, partId, std::move(data), std::move(p), evb);
} else {
p.setValue(code);
}
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainUpdateEdgeRequest& r) {
return client->future_chainUpdateEdge(r);
});
}

folly::SemiFuture<ErrOrVal> InternalStorageClient::getValue(size_t vIdLen,
GraphSpaceID spaceId,
folly::StringPiece key,
folly::EventBase* evb) {
auto srcVid = key.subpiece(sizeof(PartitionID), vIdLen);
auto partId = metaClient_->partId(spaceId, srcVid.str());

auto c = folly::makePromiseContract<ErrOrVal>();
getValueImpl(spaceId, partId, key.str(), std::move(c.first), evb);
return std::move(c.second);
}

void InternalStorageClient::getValueImpl(GraphSpaceID spaceId,
PartitionID partId,
std::string&& key,
folly::Promise<ErrOrVal> p,
folly::EventBase* evb) {
std::pair<HostAddr, cpp2::GetValueRequest> req;
auto stLeaderHost = getFuzzyLeader(spaceId, partId);
if (!stLeaderHost.ok()) {
if (stLeaderHost.status().toString().find("partid")) {
p.setValue(nebula::cpp2::ErrorCode::E_PART_NOT_FOUND);
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == ::nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
chainUpdateEdge(reversedRequest, termOfSrc, optVersion, std::move(p));
} else {
p.setValue(nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
p.setValue(code);
}
return;
}
req.first = stLeaderHost.value();
req.first.port += kInternalPortOffset;
});
}

req.second.set_space_id(spaceId);
req.second.set_part_id(partId);
req.second.set_key(key);
void InternalStorageClient::chainAddEdges(cpp2::AddEdgesRequest& directReq,
TermID termId,
folly::Optional<int64_t> optVersion,
folly::Promise<nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb) {
auto spaceId = directReq.get_space_id();
auto partId = directReq.get_parts().begin()->first;
auto optLeader = getLeader(directReq.get_space_id(), partId);
if (!optLeader.ok()) {
LOG(WARNING) << folly::sformat("failed to get leader, space {}, part {}", spaceId, partId);
p.setValue(::nebula::cpp2::ErrorCode::E_SPACE_NOT_FOUND);
return;
}
HostAddr& leader = optLeader.value();
leader.port += kInternalPortOffset;
LOG(INFO) << "leader host: " << leader;

auto remote = [](cpp2::InternalStorageServiceAsyncClient* client,
const cpp2::GetValueRequest& r) { return client->future_getValue(r); };
cpp2::ChainAddEdgesRequest chainReq = makeChainAddReq(directReq, termId, optVersion);
auto resp = getResponse(
evb,
std::make_pair(leader, chainReq),
[](cpp2::InternalStorageServiceAsyncClient* client, const cpp2::ChainAddEdgesRequest& r) {
return client->future_chainAddEdges(r);
});

auto cb = [=, p = std::move(p)](auto&& t) mutable {
auto code = extractErrorCode(t);
if (code == nebula::cpp2::ErrorCode::SUCCEEDED) {
p.setValue(t.value().value().get_value());
} else if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
// retry directly may easily get same error
std::move(resp).thenTry([=, p = std::move(p)](auto&& t) mutable {
auto code = getErrorCode(t);
if (code == nebula::cpp2::ErrorCode::E_LEADER_CHANGED) {
std::this_thread::sleep_for(std::chrono::milliseconds(500));
return getValueImpl(spaceId, partId, std::move(key), std::move(p), evb);
chainAddEdges(directReq, termId, optVersion, std::move(p));
} else {
p.setValue(code);
}
};
return;
});
}

getResponse(evb, std::move(req), remote).thenTry(std::move(cb));
cpp2::ChainAddEdgesRequest InternalStorageClient::makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> ver) {
cpp2::ChainAddEdgesRequest ret;
ret.set_space_id(req.get_space_id());
ret.set_parts(req.get_parts());
ret.set_prop_names(req.get_prop_names());
ret.set_if_not_exists(req.get_if_not_exists());
ret.set_term(termId);
if (ver) {
ret.set_edge_version(ver.value());
}
return ret;
}

} // namespace storage
Expand Down
44 changes: 16 additions & 28 deletions src/clients/storage/InternalStorageClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
namespace nebula {
namespace storage {

typedef ErrorOr<nebula::cpp2::ErrorCode, std::string> ErrOrVal;

/**
* A wrapper class for InternalStorageServiceAsyncClient thrift API
*
Expand All @@ -33,32 +31,22 @@ class InternalStorageClient : public StorageClientBase<cpp2::InternalStorageServ
: Parent(ioThreadPool, metaClient) {}
virtual ~InternalStorageClient() = default;

folly::SemiFuture<nebula::cpp2::ErrorCode> forwardTransaction(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::EventBase* evb = nullptr);

folly::SemiFuture<ErrOrVal> getValue(size_t vIdLen,
GraphSpaceID spaceId,
folly::StringPiece key,
folly::EventBase* evb = nullptr);

protected:
StatusOr<HostAddr> getFuzzyLeader(GraphSpaceID spaceId, PartitionID partId) const;

void forwardTransactionImpl(int64_t txnId,
GraphSpaceID spaceId,
PartitionID partId,
std::string&& data,
folly::Promise<nebula::cpp2::ErrorCode> p,
folly::EventBase* evb);

void getValueImpl(GraphSpaceID spaceId,
PartitionID partId,
std::string&& key,
folly::Promise<ErrOrVal> p,
folly::EventBase* evb = nullptr);
virtual void chainUpdateEdge(cpp2::UpdateEdgeRequest& reversedRequest,
TermID termOfSrc,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

virtual void chainAddEdges(cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> optVersion,
folly::Promise<::nebula::cpp2::ErrorCode>&& p,
folly::EventBase* evb = nullptr);

private:
cpp2::ChainAddEdgesRequest makeChainAddReq(const cpp2::AddEdgesRequest& req,
TermID termId,
folly::Optional<int64_t> optVersion);
};

} // namespace storage
Expand Down
12 changes: 6 additions & 6 deletions src/clients/storage/StorageClientBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,13 +209,13 @@ class StorageClientBase {
return {req.get_part_id()};
}

std::vector<PartitionID> getReqPartsId(const cpp2::InternalTxnRequest& req) const {
return {req.get_part_id()};
}
// std::vector<PartitionID> getReqPartsId(const cpp2::InternalTxnRequest& req) const {
// return {req.get_part_id()};
// }

std::vector<PartitionID> getReqPartsId(const cpp2::GetValueRequest& req) const {
return {req.get_part_id()};
}
// std::vector<PartitionID> getReqPartsId(const cpp2::GetValueRequest& req) const {
// return {req.get_part_id()};
// }

std::vector<PartitionID> getReqPartsId(const cpp2::ScanEdgeRequest& req) const {
return {req.get_part_id()};
Expand Down
7 changes: 7 additions & 0 deletions src/common/utils/MemoryLockWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,13 @@ class MemoryLockGuard {
return *iter_;
}

// this will manual set the lock to unlocked state
// which mean will not release all locks automaticly
// please make sure you really know the side effect
void forceLock() { locked_ = true; }

void forceUnlock() { locked_ = false; }

protected:
MemoryLockCore<Key>* lock_;
std::vector<Key> keys_;
Expand Down
3 changes: 2 additions & 1 deletion src/graph/executor/mutate/InsertExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

#include "graph/context/QueryContext.h"
#include "graph/planner/plan/Mutate.h"
#include "graph/service/GraphFlags.h"
#include "graph/util/ScopedTimer.h"

namespace nebula {
Expand Down Expand Up @@ -55,7 +56,7 @@ folly::Future<Status> InsertEdgesExecutor::insertEdges() {
ieNode->getPropNames(),
ieNode->getIfNotExists(),
nullptr,
ieNode->useChainInsert())
FLAGS_enable_experimental_feature)
.via(runner())
.ensure(
[addEdgeTime]() { VLOG(1) << "Add edge time: " << addEdgeTime.elapsedInUSec() << "us"; })
Expand Down
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,5 @@ DEFINE_bool(disable_octal_escape_char,
false,
"Octal escape character will be disabled"
" in next version to ensure compatibility with cypher.");

DEFINE_bool(enable_experimental_feature, false, "Whether to enable experimental feature");
2 changes: 2 additions & 0 deletions src/graph/service/GraphFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,6 @@ DECLARE_int64(max_allowed_connections);

DECLARE_string(local_ip);

DECLARE_bool(enable_experimental_feature);

#endif // GRAPH_GRAPHFLAGS_H_
2 changes: 0 additions & 2 deletions src/graph/validator/AdminValidator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,6 @@ Status CreateSpaceValidator::validateImpl() {
case SpaceOptItem::ATOMIC_EDGE: {
if (item->getAtomicEdge()) {
spaceDesc_.set_isolation_level(meta::cpp2::IsolationLevel::TOSS);
// for 2.0 GA, no matter how this option set, don't use toss.
return ::nebula::Status::NotSupported("not support enable toss in 2.0 GA");
} else {
spaceDesc_.set_isolation_level(meta::cpp2::IsolationLevel::DEFAULT);
}
Expand Down
Loading

0 comments on commit bae49d7

Please sign in to comment.